Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22002][SQL] Read JDBC table use custom schema support specify partial fields. #19231

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1333,7 +1333,7 @@ the following case-insensitive options:
<tr>
<td><code>customSchema</code></td>
<td>
The custom schema to use for reading data from JDBC connectors. For example, "id DECIMAL(38, 0), name STRING"). The column names should be identical to the corresponding column names of JDBC table. Users can specify the corresponding data types of Spark SQL instead of using the defaults. This option applies only to reading.
The custom schema to use for reading data from JDBC connectors. For example, <code>"id DECIMAL(38, 0), name STRING"</code>. You can also specify partial fields, and the others use the default type mapping. For example, <code>"id DECIMAL(38, 0)"</code>. The column names should be identical to the corresponding column names of JDBC table. Users can specify the corresponding data types of Spark SQL instead of using the defaults. This option applies only to reading.
</td>
</tr>
</table>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,24 +775,23 @@ object JdbcUtils extends Logging {
tableSchema: StructType,
customSchema: String,
nameEquality: Resolver): StructType = {
val userSchema = CatalystSqlParser.parseTableSchema(customSchema)
if (null != customSchema && customSchema.nonEmpty) {
val userSchema = CatalystSqlParser.parseTableSchema(customSchema)

SchemaUtils.checkColumnNameDuplication(
userSchema.map(_.name), "in the customSchema option value", nameEquality)

val colNames = tableSchema.fieldNames.mkString(",")
val errorMsg = s"Please provide all the columns, all columns are: $colNames"
if (userSchema.size != tableSchema.size) {
throw new AnalysisException(errorMsg)
}
SchemaUtils.checkColumnNameDuplication(
userSchema.map(_.name), "in the customSchema option value", nameEquality)

// This is resolved by names, only check the column names.
userSchema.fieldNames.foreach { col =>
tableSchema.find(f => nameEquality(f.name, col)).getOrElse {
throw new AnalysisException(errorMsg)
// This is resolved by names, use the custom filed dataType to replace the default dateType.
val newSchema = tableSchema.map { col =>
userSchema.find(f => nameEquality(f.name, col.name)) match {
case Some(c) => col.copy(dataType = c.dataType, metadata = Metadata.empty)
Copy link
Member Author

@wangyum wangyum Sep 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reset metadata to empty, otherwise it is not equal to the schema generated by CatalystSqlParser.parseTableSchema.
Anyway, the type is fixed and don't need it to infer column types.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not changing the following line https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L309
to

fields(i) = StructField(columnName, columnType, nullable)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case None => col
}
}
StructType(newSchema)
} else {
tableSchema
}
userSchema
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,19 @@ class JdbcUtilsSuite extends SparkFunSuite {
val caseInsensitive = org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution

test("Parse user specified column types") {
assert(
JdbcUtils.getCustomSchema(tableSchema, "C1 DATE, C2 STRING", caseInsensitive) ===
StructType(Seq(StructField("C1", DateType, true), StructField("C2", StringType, true))))
assert(JdbcUtils.getCustomSchema(tableSchema, "C1 DATE, C2 STRING", caseSensitive) ===
StructType(Seq(StructField("C1", DateType, true), StructField("C2", StringType, true))))
assert(JdbcUtils.getCustomSchema(tableSchema, null, caseInsensitive) === tableSchema)
assert(JdbcUtils.getCustomSchema(tableSchema, "", caseInsensitive) === tableSchema)

assert(JdbcUtils.getCustomSchema(tableSchema, "c1 DATE", caseInsensitive) ===
StructType(Seq(StructField("C1", DateType, true), StructField("C2", IntegerType, false))))
assert(JdbcUtils.getCustomSchema(tableSchema, "c1 DATE", caseSensitive) ===
StructType(Seq(StructField("C1", StringType, false), StructField("C2", IntegerType, false))))

assert(
JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, C2 STRING", caseInsensitive) ===
StructType(Seq(StructField("c1", DateType, true), StructField("C2", StringType, true))))
assert(JdbcUtils.getCustomSchema(
tableSchema, "c1 DECIMAL(38, 0), C2 STRING", caseInsensitive) ===
StructType(Seq(StructField("c1", DecimalType(38, 0), true),
StructField("C2", StringType, true))))
StructType(Seq(StructField("C1", DateType, true), StructField("C2", StringType, true))))
assert(JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, C2 STRING", caseSensitive) ===
StructType(Seq(StructField("C1", StringType, false), StructField("C2", StringType, true))))

// Throw AnalysisException
val duplicate = intercept[AnalysisException]{
Expand All @@ -51,26 +52,6 @@ class JdbcUtilsSuite extends SparkFunSuite {
assert(duplicate.getMessage.contains(
"Found duplicate column(s) in the customSchema option value"))

val allColumns = intercept[AnalysisException]{
JdbcUtils.getCustomSchema(tableSchema, "C1 STRING", caseSensitive) ===
StructType(Seq(StructField("C1", DateType, true)))
}
assert(allColumns.getMessage.contains("Please provide all the columns,"))

val caseSensitiveColumnNotFound = intercept[AnalysisException]{
JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, C2 STRING", caseSensitive) ===
StructType(Seq(StructField("c1", DateType, true), StructField("C2", StringType, true)))
}
assert(caseSensitiveColumnNotFound.getMessage.contains(
"Please provide all the columns, all columns are: C1,C2;"))

val caseInsensitiveColumnNotFound = intercept[AnalysisException]{
JdbcUtils.getCustomSchema(tableSchema, "c3 DATE, C2 STRING", caseInsensitive) ===
StructType(Seq(StructField("c3", DateType, true), StructField("C2", StringType, true)))
}
assert(caseInsensitiveColumnNotFound.getMessage.contains(
"Please provide all the columns, all columns are: C1,C2;"))

// Throw ParseException
val dataTypeNotSupported = intercept[ParseException]{
JdbcUtils.getCustomSchema(tableSchema, "c3 DATEE, C2 STRING", caseInsensitive) ===
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}

import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.command.ExplainCommand
Expand Down Expand Up @@ -970,30 +971,28 @@ class JDBCSuite extends SparkFunSuite

test("jdbc API support custom schema") {
val parts = Array[String]("THEID < 2", "THEID >= 2")
val customSchema = "NAME STRING, THEID INT"
val props = new Properties()
props.put("customSchema", "NAME STRING, THEID BIGINT")
val schema = StructType(Seq(
StructField("NAME", StringType, true), StructField("THEID", LongType, true)))
props.put("customSchema", customSchema)
val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, props)
assert(df.schema.size === 2)
assert(df.schema === schema)
assert(df.schema === CatalystSqlParser.parseTableSchema(customSchema))
assert(df.count() === 3)
}

test("jdbc API custom schema DDL-like strings.") {
withTempView("people_view") {
val customSchema = "NAME STRING, THEID INT"
sql(
s"""
|CREATE TEMPORARY VIEW people_view
|USING org.apache.spark.sql.jdbc
|OPTIONS (uRl '$url', DbTaBlE 'TEST.PEOPLE', User 'testUser', PassWord 'testPass',
|customSchema 'NAME STRING, THEID INT')
|customSchema '$customSchema')
""".stripMargin.replaceAll("\n", " "))
val schema = StructType(
Seq(StructField("NAME", StringType, true), StructField("THEID", IntegerType, true)))
val df = sql("select * from people_view")
assert(df.schema.size === 2)
assert(df.schema === schema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert it back.

Change the following line https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L309
to

fields(i) = StructField(columnName, columnType, nullable)

You also need to update some test cases due to the above change, I think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert(df.schema.length === 2)
assert(df.schema === CatalystSqlParser.parseTableSchema(customSchema))
assert(df.count() === 3)
}
}
Expand Down