From 949d6bbae66ecbb9c11a0e9d0d14f09f51f5c46a Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Fri, 11 Jul 2014 10:22:29 -0700 Subject: [PATCH] When creating a SchemaRDD for a JSON dataset, users can apply an existing schema. --- .../org/apache/spark/sql/SQLContext.scala | 35 +++++++++++++++---- .../org/apache/spark/sql/json/JsonRDD.scala | 4 ++- 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 21ada3b859980..628f7cb84c61f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -128,15 +128,23 @@ class SQLContext(@transient val sparkContext: SparkContext) * * @group userf */ - def jsonFile(path: String): SchemaRDD = jsonFile(path, 1.0) + def jsonFile(path: String): SchemaRDD = jsonFile(path, 1.0, None) + + /** + * Loads a JSON file (one object per line) and applies the given schema, + * returning the result as a [[SchemaRDD]]. + * + * @group userf + */ + def jsonFile(path: String, schema: StructType): SchemaRDD = jsonFile(path, 1.0, Option(schema)) /** * :: Experimental :: */ @Experimental - def jsonFile(path: String, samplingRatio: Double): SchemaRDD = { + def jsonFile(path: String, samplingRatio: Double, schema: Option[StructType]): SchemaRDD = { val json = sparkContext.textFile(path) - jsonRDD(json, samplingRatio) + jsonRDD(json, samplingRatio, schema) } /** @@ -146,15 +154,28 @@ class SQLContext(@transient val sparkContext: SparkContext) * * @group userf */ - def jsonRDD(json: RDD[String]): SchemaRDD = jsonRDD(json, 1.0) + def jsonRDD(json: RDD[String]): SchemaRDD = jsonRDD(json, 1.0, None) + + /** + * Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema, + * returning the result as a [[SchemaRDD]]. + * + * @group userf + */ + def jsonRDD(json: RDD[String], schema: StructType): SchemaRDD = jsonRDD(json, 1.0, Option(schema)) /** * :: Experimental :: */ @Experimental - def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD = { - val schema = JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json, samplingRatio)) - applySchemaToPartitions(json, schema, JsonRDD.jsonStringToRow(schema, _: Iterator[String])) + def jsonRDD(json: RDD[String], samplingRatio: Double, schema: Option[StructType]): SchemaRDD = { + val appliedSchema = + schema.getOrElse(JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json, samplingRatio))) + + applySchemaToPartitions( + json, + appliedSchema, + JsonRDD.jsonStringToRow(appliedSchema, _: Iterator[String])) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index da384ab6c673a..4fd55ff13dcce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -31,7 +31,9 @@ import org.apache.spark.sql.Logging private[sql] object JsonRDD extends Logging { - private[sql] def jsonStringToRow(schema: StructType, jsonIter: Iterator[String]): Iterator[Row] = { + private[sql] def jsonStringToRow( + schema: StructType, + jsonIter: Iterator[String]): Iterator[Row] = { parseJson(jsonIter).map(parsed => asRow(parsed, schema)) }