From 7787ec713dfd398bef28c0319f4df12f009fa128 Mon Sep 17 00:00:00 2001 From: wangfei Date: Tue, 30 Dec 2014 16:29:48 +0800 Subject: [PATCH] added SchemaRelationProvider --- .../apache/spark/sql/json/JSONRelation.scala | 4 +-- .../apache/spark/sql/parquet/newParquet.scala | 4 +-- .../org/apache/spark/sql/sources/ddl.scala | 3 ++- .../apache/spark/sql/sources/interfaces.scala | 27 ++++++++++++++++++- .../spark/sql/sources/FilteredScanSuite.scala | 5 ++-- .../spark/sql/sources/PrunedScanSuite.scala | 5 ++-- .../spark/sql/sources/TableScanSuite.scala | 5 ++-- 7 files changed, 41 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index 52b79c1ca8c29..8f835b8517e61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -21,12 +21,12 @@ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.types.StructType import org.apache.spark.sql.sources._ -private[sql] class DefaultSource extends RelationProvider { +private[sql] class DefaultSource extends SchemaRelationProvider { /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String], - schema: Option[StructType]): BaseRelation = { + schema: Option[StructType] = None): BaseRelation = { val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified")) val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index a237d27794a9e..6a4a41686eb28 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -43,12 +43,12 @@ import scala.collection.JavaConversions._ * required is `path`, which should be the location of a collection of, optionally partitioned, * parquet files. */ -class DefaultSource extends RelationProvider { +class DefaultSource extends SchemaRelationProvider { /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, parameters: Map[String, String], - schema: Option[StructType]): BaseRelation = { + schema: Option[StructType] = None): BaseRelation = { val path = parameters.getOrElse("path", sys.error("'path' must be specified for parquet tables.")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 69fa64affd961..258729c475b6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -217,7 +217,8 @@ private[sql] case class CreateTableUsing( sys.error(s"Failed to load class for data source: $provider") } } - val dataSource = clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.RelationProvider] + val dataSource = + clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] val relation = dataSource.createRelation( sqlContext, new CaseInsensitiveMap(options), Some(StructType(tableCols))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 5f9e8a35ef84e..a23e53e5bf9b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -41,10 +41,35 @@ trait RelationProvider { * Note: the parameters' keywords are case insensitive and this insensitivity is enforced * by the Map that is passed to the function. */ + def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String]): BaseRelation +} + +/** + * ::DeveloperApi:: + * Implemented by objects that produce relations for a specific kind of data source. When + * Spark SQL is given a DDL operation with a USING clause specified and user defined schema optionally, + * this interface is used to pass in the parameters specified by a user. + * + * Users may specify the fully qualified class name of a given data source. When that class is + * not found Spark SQL will append the class name `DefaultSource` to the path, allowing for + * less verbose invocation. For example, 'org.apache.spark.sql.json' would resolve to the + * data source 'org.apache.spark.sql.json.DefaultSource' + * + * A new instance of this class with be instantiated each time a DDL call is made. + */ +@DeveloperApi +trait SchemaRelationProvider { + /** + * Returns a new base relation with the given parameters and user defined schema. + * Note: the parameters' keywords are case insensitive and this insensitivity is enforced + * by the Map that is passed to the function. + */ def createRelation( sqlContext: SQLContext, parameters: Map[String, String], - schema: Option[StructType]): BaseRelation + schema: Option[StructType] = None): BaseRelation } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala index 939b3c0c66de7..7ebcc1227e5cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala @@ -21,10 +21,11 @@ import scala.language.existentials import org.apache.spark.sql._ -class FilteredScanSource extends RelationProvider { +class FilteredScanSource extends SchemaRelationProvider { override def createRelation( sqlContext: SQLContext, - parameters: Map[String, String]): BaseRelation = { + parameters: Map[String, String], + schema: Option[StructType] = None): BaseRelation = { SimpleFilteredScan(parameters("from").toInt, parameters("to").toInt)(sqlContext) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala index fee2e22611cdc..db6684ba22ab8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala @@ -19,10 +19,11 @@ package org.apache.spark.sql.sources import org.apache.spark.sql._ -class PrunedScanSource extends RelationProvider { +class PrunedScanSource extends SchemaRelationProvider { override def createRelation( sqlContext: SQLContext, - parameters: Map[String, String]): BaseRelation = { + parameters: Map[String, String], + schema: Option[StructType] = None): BaseRelation = { SimplePrunedScan(parameters("from").toInt, parameters("to").toInt)(sqlContext) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala index 3cd7b0115d567..63e2648178e22 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala @@ -21,10 +21,11 @@ import org.apache.spark.sql._ class DefaultSource extends SimpleScanSource -class SimpleScanSource extends RelationProvider { +class SimpleScanSource extends SchemaRelationProvider { override def createRelation( sqlContext: SQLContext, - parameters: Map[String, String]): BaseRelation = { + parameters: Map[String, String], + schema: Option[StructType] = None): BaseRelation = { SimpleScan(parameters("from").toInt, parameters("TO").toInt)(sqlContext) } }