Skip to content

Commit

Permalink
Read JDBC table use custom schema support specify partial fields.
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyum committed Sep 14, 2017
1 parent 17edfec commit 9e7a8a4
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 50 deletions.
2 changes: 1 addition & 1 deletion docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1333,7 +1333,7 @@ the following case-insensitive options:
<tr>
<td><code>customSchema</code></td>
<td>
The custom schema to use for reading data from JDBC connectors. For example, "id DECIMAL(38, 0), name STRING"). The column names should be identical to the corresponding column names of JDBC table. Users can specify the corresponding data types of Spark SQL instead of using the defaults. This option applies only to reading.
The custom schema to use for reading data from JDBC connectors. For example, <code>"id DECIMAL(38, 0), name STRING"</code>. You can also specify partial fields, others use default values. For example, <code>"id DECIMAL(38, 0)"</code>. The column names should be identical to the corresponding column names of JDBC table. Users can specify the corresponding data types of Spark SQL instead of using the defaults. This option applies only to reading.
</td>
</tr>
</table>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,24 +775,25 @@ object JdbcUtils extends Logging {
tableSchema: StructType,
customSchema: String,
nameEquality: Resolver): StructType = {
val userSchema = CatalystSqlParser.parseTableSchema(customSchema)

SchemaUtils.checkColumnNameDuplication(
userSchema.map(_.name), "in the customSchema option value", nameEquality)

val colNames = tableSchema.fieldNames.mkString(",")
val errorMsg = s"Please provide all the columns, all columns are: $colNames"
if (userSchema.size != tableSchema.size) {
throw new AnalysisException(errorMsg)
}

// This is resolved by names, only check the column names.
userSchema.fieldNames.foreach { col =>
tableSchema.find(f => nameEquality(f.name, col)).getOrElse {
throw new AnalysisException(errorMsg)
if (null != customSchema && customSchema.nonEmpty) {
val userSchema = CatalystSqlParser.parseTableSchema(customSchema)

SchemaUtils.checkColumnNameDuplication(
userSchema.map(_.name), "in the customSchema option value", nameEquality)

// This is resolved by names, use the custom filed dataType to replace the default dateType.
val newSchema = tableSchema.map { col =>
userSchema.find(f => nameEquality(f.name, col.name)) match {
case Some(c) =>
col.copy(dataType = c.dataType, nullable = c.nullable)
case None =>
col
}
}
StructType(newSchema)
} else {
tableSchema
}
userSchema
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,19 @@ class JdbcUtilsSuite extends SparkFunSuite {
val caseInsensitive = org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution

test("Parse user specified column types") {
assert(
JdbcUtils.getCustomSchema(tableSchema, "C1 DATE, C2 STRING", caseInsensitive) ===
StructType(Seq(StructField("C1", DateType, true), StructField("C2", StringType, true))))
assert(JdbcUtils.getCustomSchema(tableSchema, "C1 DATE, C2 STRING", caseSensitive) ===
StructType(Seq(StructField("C1", DateType, true), StructField("C2", StringType, true))))
assert(JdbcUtils.getCustomSchema(tableSchema, null, caseInsensitive) === tableSchema)
assert(JdbcUtils.getCustomSchema(tableSchema, "", caseInsensitive) === tableSchema)

assert(JdbcUtils.getCustomSchema(tableSchema, "c1 DATE", caseInsensitive) ===
StructType(Seq(StructField("C1", DateType, true), StructField("C2", IntegerType, false))))
assert(JdbcUtils.getCustomSchema(tableSchema, "c1 DATE", caseSensitive) ===
StructType(Seq(StructField("C1", StringType, false), StructField("C2", IntegerType, false))))

assert(
JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, C2 STRING", caseInsensitive) ===
StructType(Seq(StructField("c1", DateType, true), StructField("C2", StringType, true))))
assert(JdbcUtils.getCustomSchema(
tableSchema, "c1 DECIMAL(38, 0), C2 STRING", caseInsensitive) ===
StructType(Seq(StructField("c1", DecimalType(38, 0), true),
StructField("C2", StringType, true))))
StructType(Seq(StructField("C1", DateType, true), StructField("C2", StringType, true))))
assert(JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, C2 STRING", caseSensitive) ===
StructType(Seq(StructField("C1", StringType, false), StructField("C2", StringType, true))))

// Throw AnalysisException
val duplicate = intercept[AnalysisException]{
Expand All @@ -51,26 +52,6 @@ class JdbcUtilsSuite extends SparkFunSuite {
assert(duplicate.getMessage.contains(
"Found duplicate column(s) in the customSchema option value"))

val allColumns = intercept[AnalysisException]{
JdbcUtils.getCustomSchema(tableSchema, "C1 STRING", caseSensitive) ===
StructType(Seq(StructField("C1", DateType, true)))
}
assert(allColumns.getMessage.contains("Please provide all the columns,"))

val caseSensitiveColumnNotFound = intercept[AnalysisException]{
JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, C2 STRING", caseSensitive) ===
StructType(Seq(StructField("c1", DateType, true), StructField("C2", StringType, true)))
}
assert(caseSensitiveColumnNotFound.getMessage.contains(
"Please provide all the columns, all columns are: C1,C2;"))

val caseInsensitiveColumnNotFound = intercept[AnalysisException]{
JdbcUtils.getCustomSchema(tableSchema, "c3 DATE, C2 STRING", caseInsensitive) ===
StructType(Seq(StructField("c3", DateType, true), StructField("C2", StringType, true)))
}
assert(caseInsensitiveColumnNotFound.getMessage.contains(
"Please provide all the columns, all columns are: C1,C2;"))

// Throw ParseException
val dataTypeNotSupported = intercept[ParseException]{
JdbcUtils.getCustomSchema(tableSchema, "c3 DATEE, C2 STRING", caseInsensitive) ===
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -971,12 +971,15 @@ class JDBCSuite extends SparkFunSuite
test("jdbc API support custom schema") {
val parts = Array[String]("THEID < 2", "THEID >= 2")
val props = new Properties()
props.put("customSchema", "NAME STRING, THEID BIGINT")
props.put("customSchema", "name STRING, THEID BIGINT")
val schema = StructType(Seq(
StructField("NAME", StringType, true), StructField("THEID", LongType, true)))
val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, props)
assert(df.schema.size === 2)
assert(df.schema === schema)
df.schema.zip(schema).foreach {
case (c, v) =>
assert(c.dataType === v.dataType)
}
assert(df.count() === 3)
}

Expand All @@ -993,7 +996,10 @@ class JDBCSuite extends SparkFunSuite
Seq(StructField("NAME", StringType, true), StructField("THEID", IntegerType, true)))
val df = sql("select * from people_view")
assert(df.schema.size === 2)
assert(df.schema === schema)
df.schema.zip(schema).foreach {
case (c, v) =>
assert(c.dataType === v.dataType)
}
assert(df.count() === 3)
}
}
Expand Down

0 comments on commit 9e7a8a4

Please sign in to comment.