From 9e7a8a471835d5e93a729c15d166451e79567447 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Thu, 14 Sep 2017 12:26:46 +0800 Subject: [PATCH 1/4] Read JDBC table use custom schema support specify partial fields. --- docs/sql-programming-guide.md | 2 +- .../datasources/jdbc/JdbcUtils.scala | 33 +++++++-------- .../datasources/jdbc/JdbcUtilsSuite.scala | 41 +++++-------------- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 12 ++++-- 4 files changed, 38 insertions(+), 50 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 95d704014742c..01bc69a3363c5 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1333,7 +1333,7 @@ the following case-insensitive options: customSchema - The custom schema to use for reading data from JDBC connectors. For example, "id DECIMAL(38, 0), name STRING"). The column names should be identical to the corresponding column names of JDBC table. Users can specify the corresponding data types of Spark SQL instead of using the defaults. This option applies only to reading. + The custom schema to use for reading data from JDBC connectors. For example, "id DECIMAL(38, 0), name STRING". You can also specify partial fields, others use default values. For example, "id DECIMAL(38, 0)". The column names should be identical to the corresponding column names of JDBC table. Users can specify the corresponding data types of Spark SQL instead of using the defaults. This option applies only to reading. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 75327f0d38c2e..bddfd4783d8e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -775,24 +775,25 @@ object JdbcUtils extends Logging { tableSchema: StructType, customSchema: String, nameEquality: Resolver): StructType = { - val userSchema = CatalystSqlParser.parseTableSchema(customSchema) - - SchemaUtils.checkColumnNameDuplication( - userSchema.map(_.name), "in the customSchema option value", nameEquality) - - val colNames = tableSchema.fieldNames.mkString(",") - val errorMsg = s"Please provide all the columns, all columns are: $colNames" - if (userSchema.size != tableSchema.size) { - throw new AnalysisException(errorMsg) - } - - // This is resolved by names, only check the column names. - userSchema.fieldNames.foreach { col => - tableSchema.find(f => nameEquality(f.name, col)).getOrElse { - throw new AnalysisException(errorMsg) + if (null != customSchema && customSchema.nonEmpty) { + val userSchema = CatalystSqlParser.parseTableSchema(customSchema) + + SchemaUtils.checkColumnNameDuplication( + userSchema.map(_.name), "in the customSchema option value", nameEquality) + + // This is resolved by names, use the custom filed dataType to replace the default dateType. + val newSchema = tableSchema.map { col => + userSchema.find(f => nameEquality(f.name, col.name)) match { + case Some(c) => + col.copy(dataType = c.dataType, nullable = c.nullable) + case None => + col + } } + StructType(newSchema) + } else { + tableSchema } - userSchema } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala index 1255f262bce94..1b1abe1c64576 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala @@ -30,18 +30,19 @@ class JdbcUtilsSuite extends SparkFunSuite { val caseInsensitive = org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution test("Parse user specified column types") { - assert( - JdbcUtils.getCustomSchema(tableSchema, "C1 DATE, C2 STRING", caseInsensitive) === - StructType(Seq(StructField("C1", DateType, true), StructField("C2", StringType, true)))) - assert(JdbcUtils.getCustomSchema(tableSchema, "C1 DATE, C2 STRING", caseSensitive) === - StructType(Seq(StructField("C1", DateType, true), StructField("C2", StringType, true)))) + assert(JdbcUtils.getCustomSchema(tableSchema, null, caseInsensitive) === tableSchema) + assert(JdbcUtils.getCustomSchema(tableSchema, "", caseInsensitive) === tableSchema) + + assert(JdbcUtils.getCustomSchema(tableSchema, "c1 DATE", caseInsensitive) === + StructType(Seq(StructField("C1", DateType, true), StructField("C2", IntegerType, false)))) + assert(JdbcUtils.getCustomSchema(tableSchema, "c1 DATE", caseSensitive) === + StructType(Seq(StructField("C1", StringType, false), StructField("C2", IntegerType, false)))) + assert( JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, C2 STRING", caseInsensitive) === - StructType(Seq(StructField("c1", DateType, true), StructField("C2", StringType, true)))) - assert(JdbcUtils.getCustomSchema( - tableSchema, "c1 DECIMAL(38, 0), C2 STRING", caseInsensitive) === - StructType(Seq(StructField("c1", DecimalType(38, 0), true), - StructField("C2", StringType, true)))) + StructType(Seq(StructField("C1", DateType, true), StructField("C2", StringType, true)))) + assert(JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, C2 STRING", caseSensitive) === + StructType(Seq(StructField("C1", StringType, false), StructField("C2", StringType, true)))) // Throw AnalysisException val duplicate = intercept[AnalysisException]{ @@ -51,26 +52,6 @@ class JdbcUtilsSuite extends SparkFunSuite { assert(duplicate.getMessage.contains( "Found duplicate column(s) in the customSchema option value")) - val allColumns = intercept[AnalysisException]{ - JdbcUtils.getCustomSchema(tableSchema, "C1 STRING", caseSensitive) === - StructType(Seq(StructField("C1", DateType, true))) - } - assert(allColumns.getMessage.contains("Please provide all the columns,")) - - val caseSensitiveColumnNotFound = intercept[AnalysisException]{ - JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, C2 STRING", caseSensitive) === - StructType(Seq(StructField("c1", DateType, true), StructField("C2", StringType, true))) - } - assert(caseSensitiveColumnNotFound.getMessage.contains( - "Please provide all the columns, all columns are: C1,C2;")) - - val caseInsensitiveColumnNotFound = intercept[AnalysisException]{ - JdbcUtils.getCustomSchema(tableSchema, "c3 DATE, C2 STRING", caseInsensitive) === - StructType(Seq(StructField("c3", DateType, true), StructField("C2", StringType, true))) - } - assert(caseInsensitiveColumnNotFound.getMessage.contains( - "Please provide all the columns, all columns are: C1,C2;")) - // Throw ParseException val dataTypeNotSupported = intercept[ParseException]{ JdbcUtils.getCustomSchema(tableSchema, "c3 DATEE, C2 STRING", caseInsensitive) === diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 40179261ab200..99af3242d7173 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -971,12 +971,15 @@ class JDBCSuite extends SparkFunSuite test("jdbc API support custom schema") { val parts = Array[String]("THEID < 2", "THEID >= 2") val props = new Properties() - props.put("customSchema", "NAME STRING, THEID BIGINT") + props.put("customSchema", "name STRING, THEID BIGINT") val schema = StructType(Seq( StructField("NAME", StringType, true), StructField("THEID", LongType, true))) val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, props) assert(df.schema.size === 2) - assert(df.schema === schema) + df.schema.zip(schema).foreach { + case (c, v) => + assert(c.dataType === v.dataType) + } assert(df.count() === 3) } @@ -993,7 +996,10 @@ class JDBCSuite extends SparkFunSuite Seq(StructField("NAME", StringType, true), StructField("THEID", IntegerType, true))) val df = sql("select * from people_view") assert(df.schema.size === 2) - assert(df.schema === schema) + df.schema.zip(schema).foreach { + case (c, v) => + assert(c.dataType === v.dataType) + } assert(df.count() === 3) } } From c0edad203499738a18e3798d43bb19433f4836ee Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Fri, 15 Sep 2017 00:25:44 +0800 Subject: [PATCH 2/4] Improve test. --- docs/sql-programming-guide.md | 2 +- .../datasources/jdbc/JdbcUtils.scala | 6 ++--- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 23 +++++++------------ 3 files changed, 11 insertions(+), 20 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 01bc69a3363c5..5db60cc996e75 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1333,7 +1333,7 @@ the following case-insensitive options: customSchema - The custom schema to use for reading data from JDBC connectors. For example, "id DECIMAL(38, 0), name STRING". You can also specify partial fields, others use default values. For example, "id DECIMAL(38, 0)". The column names should be identical to the corresponding column names of JDBC table. Users can specify the corresponding data types of Spark SQL instead of using the defaults. This option applies only to reading. + The custom schema to use for reading data from JDBC connectors. For example, "id DECIMAL(38, 0), name STRING". You can also specify partial fields, and the others use the default type mapping. For example, "id DECIMAL(38, 0)". The column names should be identical to the corresponding column names of JDBC table. Users can specify the corresponding data types of Spark SQL instead of using the defaults. This option applies only to reading. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index bddfd4783d8e9..b7e431da2ba01 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -784,10 +784,8 @@ object JdbcUtils extends Logging { // This is resolved by names, use the custom filed dataType to replace the default dateType. val newSchema = tableSchema.map { col => userSchema.find(f => nameEquality(f.name, col.name)) match { - case Some(c) => - col.copy(dataType = c.dataType, nullable = c.nullable) - case None => - col + case Some(c) => col.copy(dataType = c.dataType, metadata = Metadata.empty) + case None => col } } StructType(newSchema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 99af3242d7173..689f4106824aa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -26,6 +26,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.sql.{AnalysisException, DataFrame, Row} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.command.ExplainCommand @@ -970,36 +971,28 @@ class JDBCSuite extends SparkFunSuite test("jdbc API support custom schema") { val parts = Array[String]("THEID < 2", "THEID >= 2") + val customSchema = "NAME STRING, THEID INT" val props = new Properties() - props.put("customSchema", "name STRING, THEID BIGINT") - val schema = StructType(Seq( - StructField("NAME", StringType, true), StructField("THEID", LongType, true))) + props.put("customSchema", customSchema) val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, props) assert(df.schema.size === 2) - df.schema.zip(schema).foreach { - case (c, v) => - assert(c.dataType === v.dataType) - } + assert(df.schema === CatalystSqlParser.parseTableSchema(customSchema)) assert(df.count() === 3) } test("jdbc API custom schema DDL-like strings.") { withTempView("people_view") { + val customSchema = "NAME STRING, THEID INT" sql( s""" |CREATE TEMPORARY VIEW people_view |USING org.apache.spark.sql.jdbc |OPTIONS (uRl '$url', DbTaBlE 'TEST.PEOPLE', User 'testUser', PassWord 'testPass', - |customSchema 'NAME STRING, THEID INT') + |customSchema '$customSchema') """.stripMargin.replaceAll("\n", " ")) - val schema = StructType( - Seq(StructField("NAME", StringType, true), StructField("THEID", IntegerType, true))) val df = sql("select * from people_view") - assert(df.schema.size === 2) - df.schema.zip(schema).foreach { - case (c, v) => - assert(c.dataType === v.dataType) - } + assert(df.schema.length === 2) + assert(df.schema === CatalystSqlParser.parseTableSchema(customSchema)) assert(df.count() === 3) } } From 1ee4ea0a23b257caa6c3fc7c2b2b73e154314f02 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Fri, 15 Sep 2017 07:32:20 +0800 Subject: [PATCH 3/4] Remove metadata. --- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 7 +++---- .../execution/datasources/jdbc/JdbcUtilsSuite.scala | 12 ++++++------ 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index b7e431da2ba01..fe9ae636520b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -301,12 +301,11 @@ object JdbcUtils extends Logging { } else { rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls } - val metadata = new MetadataBuilder() - .putLong("scale", fieldScale) + val metadata = new MetadataBuilder().putLong("scale", fieldScale) val columnType = dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse( getCatalystType(dataType, fieldSize, fieldScale, isSigned)) - fields(i) = StructField(columnName, columnType, nullable, metadata.build()) + fields(i) = StructField(columnName, columnType, nullable) i = i + 1 } new StructType(fields) @@ -784,7 +783,7 @@ object JdbcUtils extends Logging { // This is resolved by names, use the custom filed dataType to replace the default dateType. val newSchema = tableSchema.map { col => userSchema.find(f => nameEquality(f.name, col.name)) match { - case Some(c) => col.copy(dataType = c.dataType, metadata = Metadata.empty) + case Some(c) => col.copy(dataType = c.dataType) case None => col } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala index 1b1abe1c64576..7d277c1ffaffe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala @@ -34,20 +34,20 @@ class JdbcUtilsSuite extends SparkFunSuite { assert(JdbcUtils.getCustomSchema(tableSchema, "", caseInsensitive) === tableSchema) assert(JdbcUtils.getCustomSchema(tableSchema, "c1 DATE", caseInsensitive) === - StructType(Seq(StructField("C1", DateType, true), StructField("C2", IntegerType, false)))) + StructType(Seq(StructField("C1", DateType, false), StructField("C2", IntegerType, false)))) assert(JdbcUtils.getCustomSchema(tableSchema, "c1 DATE", caseSensitive) === StructType(Seq(StructField("C1", StringType, false), StructField("C2", IntegerType, false)))) assert( JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, C2 STRING", caseInsensitive) === - StructType(Seq(StructField("C1", DateType, true), StructField("C2", StringType, true)))) + StructType(Seq(StructField("C1", DateType, false), StructField("C2", StringType, false)))) assert(JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, C2 STRING", caseSensitive) === - StructType(Seq(StructField("C1", StringType, false), StructField("C2", StringType, true)))) + StructType(Seq(StructField("C1", StringType, false), StructField("C2", StringType, false)))) // Throw AnalysisException val duplicate = intercept[AnalysisException]{ JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, c1 STRING", caseInsensitive) === - StructType(Seq(StructField("c1", DateType, true), StructField("c1", StringType, true))) + StructType(Seq(StructField("c1", DateType, false), StructField("c1", StringType, false))) } assert(duplicate.getMessage.contains( "Found duplicate column(s) in the customSchema option value")) @@ -55,13 +55,13 @@ class JdbcUtilsSuite extends SparkFunSuite { // Throw ParseException val dataTypeNotSupported = intercept[ParseException]{ JdbcUtils.getCustomSchema(tableSchema, "c3 DATEE, C2 STRING", caseInsensitive) === - StructType(Seq(StructField("c3", DateType, true), StructField("C2", StringType, true))) + StructType(Seq(StructField("c3", DateType, false), StructField("C2", StringType, false))) } assert(dataTypeNotSupported.getMessage.contains("DataType datee is not supported")) val mismatchedInput = intercept[ParseException]{ JdbcUtils.getCustomSchema(tableSchema, "c3 DATE. C2 STRING", caseInsensitive) === - StructType(Seq(StructField("c3", DateType, true), StructField("C2", StringType, true))) + StructType(Seq(StructField("c3", DateType, false), StructField("C2", StringType, false))) } assert(mismatchedInput.getMessage.contains("mismatched input '.' expecting")) } From 06095f52454a000d15d3df5845a383cb1e1dbddc Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Fri, 15 Sep 2017 08:14:34 +0800 Subject: [PATCH 4/4] Update comment --- .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index fe9ae636520b3..71133666b3249 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -767,8 +767,8 @@ object JdbcUtils extends Logging { } /** - * Parses the user specified customSchema option value to DataFrame schema, - * and returns it if it's all columns are equals to default schema's. + * Parses the user specified customSchema option value to DataFrame schema, and + * returns a schema that is replaced by the custom schema's dataType if column name is matched. */ def getCustomSchema( tableSchema: StructType, @@ -780,7 +780,7 @@ object JdbcUtils extends Logging { SchemaUtils.checkColumnNameDuplication( userSchema.map(_.name), "in the customSchema option value", nameEquality) - // This is resolved by names, use the custom filed dataType to replace the default dateType. + // This is resolved by names, use the custom filed dataType to replace the default dataType. val newSchema = tableSchema.map { col => userSchema.find(f => nameEquality(f.name, col.name)) match { case Some(c) => col.copy(dataType = c.dataType)