Skip to content

Commit

Permalink
[SPARK-47406][SQL] Handle TIMESTAMP and DATETIME in MYSQLDialect
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

In MySQL, TIMESTAMP and DATETIME are different. The former is a TIMESTAMP WITH LOCAL TIME ZONE and the latter is a TIMESTAMP WITHOUT TIME ZONE

Following [SPARK-47375](https://issues.apache.org/jira/browse/SPARK-47375), MySql TIMESTAMP goes directly to TimestampType, DATETIME's mapping is decided by preferTimestampNTZ.

### Why are the changes needed?

align the guidelines for jdbc timestamps
### Does this PR introduce _any_ user-facing change?

yes,migration guide provided

### How was this patch tested?

new tests

### Was this patch authored or co-authored using generative AI tooling?

Closes apache#45530 from yaooqinn/SPARK-47406.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
  • Loading branch information
yaooqinn committed Mar 15, 2024
1 parent 56cfc89 commit 19ac1fc
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.jdbc

import java.math.BigDecimal
import java.sql.{Connection, Date, Timestamp}
import java.time.LocalDateTime
import java.util.Properties

import org.apache.spark.sql.Row
Expand Down Expand Up @@ -134,6 +135,19 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
}
}

test("SPARK-47406: MySQL datetime types with preferTimestampNTZ") {
withDefaultTimeZone(UTC) {
val df = sqlContext.read.option("preferTimestampNTZ", true)
.jdbc(jdbcUrl, "dates", new Properties)
checkAnswer(df, Row(
Date.valueOf("1991-11-09"),
LocalDateTime.of(1970, 1, 1, 13, 31, 24),
LocalDateTime.of(1996, 1, 1, 1, 23, 45),
Timestamp.valueOf("2009-02-13 23:31:30"),
Date.valueOf("2001-01-01")))
}
}

test("String types") {
val df = sqlContext.read.jdbc(jdbcUrl, "strings", new Properties)
val rows = df.collect()
Expand Down
1 change: 1 addition & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ license: |
- Since Spark 4.0, the SQL config `spark.sql.legacy.allowZeroIndexInFormatString` is deprecated. Consider to change `strfmt` of the `format_string` function to use 1-based indexes. The first argument must be referenced by "1$", the second by "2$", etc.
- Since Spark 4.0, the function `to_csv` no longer supports input with the data type `STRUCT`, `ARRAY`, `MAP`, `VARIANT` and `BINARY` (because the `CSV specification` does not have standards for these data types and cannot be read back using `from_csv`), Spark will throw `DATATYPE_MISMATCH.UNSUPPORTED_INPUT_TYPE` exception.
- Since Spark 4.0, JDBC read option `preferTimestampNTZ=true` will not convert Postgres TIMESTAMP WITH TIME ZONE and TIME WITH TIME ZONE data types to TimestampNTZType, which is available in Spark 3.5.
- Since Spark 4.0, JDBC read option `preferTimestampNTZ=true` will not convert MySQL TIMESTAMP to TimestampNTZType, which is available in Spark 3.5. MySQL DATETIME is not affected.

## Upgrading from Spark SQL 3.4 to 3.5

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ object JdbcUtils extends Logging with SQLConfHelper {
throw QueryExecutionErrors.cannotGetJdbcTypeError(dt))
}

def getTimestampType(isTimestampNTZ: Boolean): DataType = {
if (isTimestampNTZ) TimestampNTZType else TimestampType
}

/**
* Maps a JDBC type to a Catalyst type. This function is called only when
* the JdbcDialect class corresponding to your database driver returns null.
Expand Down Expand Up @@ -211,10 +215,8 @@ object JdbcUtils extends Logging with SQLConfHelper {
case java.sql.Types.SMALLINT => IntegerType
case java.sql.Types.SQLXML => StringType
case java.sql.Types.STRUCT => StringType
case java.sql.Types.TIME if isTimestampNTZ => TimestampNTZType
case java.sql.Types.TIME => TimestampType
case java.sql.Types.TIMESTAMP if isTimestampNTZ => TimestampNTZType
case java.sql.Types.TIMESTAMP => TimestampType
case java.sql.Types.TIME => getTimestampType(isTimestampNTZ)
case java.sql.Types.TIMESTAMP => getTimestampType(isTimestampNTZ)
case java.sql.Types.TINYINT => IntegerType
case java.sql.Types.VARBINARY => BinaryType
case java.sql.Types.VARCHAR if conf.charVarcharAsString => StringType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,13 @@ abstract class JdbcDialect extends Serializable with Logging {
def getFullyQualifiedQuotedTableName(ident: Identifier): String = {
(ident.namespace() :+ ident.name()).map(quoteIdentifier).mkString(".")
}

/**
* Return TimestampType/TimestampNTZType based on the metadata.
*/
protected final def getTimestampType(md: Metadata): DataType = {
JdbcUtils.getTimestampType(md.getBoolean("isTimestampNTZ"))
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.connector.catalog.index.TableIndex
import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, NamedReference, NullOrdering, SortDirection}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, FloatType, LongType, MetadataBuilder, StringType}
import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, FloatType, LongType, MetadataBuilder, StringType, TimestampType}

private case object MySQLDialect extends JdbcDialect with SQLConfHelper {

Expand Down Expand Up @@ -92,23 +92,32 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {

override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
if (sqlType == Types.VARBINARY && typeName.equals("BIT") && size != 1) {
// This could instead be a BinaryType if we'd rather return bit-vectors of up to 64 bits as
// byte arrays instead of longs.
md.putLong("binarylong", 1)
Option(LongType)
} else if (sqlType == Types.BIT && typeName.equals("TINYINT")) {
Option(BooleanType)
} else if ("TINYTEXT".equalsIgnoreCase(typeName)) {
// TINYTEXT is Types.VARCHAR(63) from mysql jdbc, but keep it AS-IS for historical reason
Some(StringType)
} else if (sqlType == Types.VARCHAR && typeName.equals("JSON")) {
// Some MySQL JDBC drivers converts JSON type into Types.VARCHAR with a precision of -1.
// Explicitly converts it into StringType here.
Some(StringType)
} else if (sqlType == Types.TINYINT && typeName.equals("TINYINT")) {
Some(ByteType)
} else None
sqlType match {
case Types.VARBINARY if "BIT".equalsIgnoreCase(typeName) && size != 1 =>
// This could instead be a BinaryType if we'd rather return bit-vectors of up to 64 bits as
// byte arrays instead of longs.
md.putLong("binarylong", 1)
Some(LongType)
case Types.BIT if "TINYINT".equalsIgnoreCase(typeName) =>
Some(BooleanType)
case Types.VARCHAR if "TINYTEXT".equalsIgnoreCase(typeName) =>
// TINYTEXT is Types.VARCHAR(63) from mysql jdbc, but keep it AS-IS for historical reason
Some(StringType)
case Types.VARCHAR if "JSON".equalsIgnoreCase(typeName) =>
// Some MySQL JDBC drivers converts JSON type into Types.VARCHAR with a precision of -1.
// Explicitly converts it into StringType here.
Some(StringType)
case Types.TINYINT if "TINYINT".equalsIgnoreCase(typeName) =>
Some(ByteType)
case Types.TIMESTAMP if "DATETIME".equalsIgnoreCase(typeName) =>
// scalastyle:off line.size.limit
// In MYSQL, DATETIME is TIMESTAMP WITHOUT TIME ZONE
// https://github.com/mysql/mysql-connector-j/blob/8.3.0/src/main/core-api/java/com/mysql/cj/MysqlType.java#L251
// scalastyle:on line.size.limit
Some(getTimestampType(md.build()))
case Types.TIMESTAMP => Some(TimestampType)
case _ => None
}
}

override def quoteIdentifier(colName: String): String = {
Expand Down

0 comments on commit 19ac1fc

Please sign in to comment.