diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 6a264ad708dae..6fed139440b30 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -437,12 +437,20 @@ class MicroBatchExecution( cd.dataType, cd.timeZoneId) } + // Pre-resolve new attributes to ensure all attributes are resolved before + // accessing schema of logical plan. Note that it only leverages the information + // of attributes, so we don't need to concern about the value of literals. + + val newAttrPlanPreResolvedForSchema = newAttributePlan transformAllExpressions { + case cbt: CurrentBatchTimestamp => cbt.toLiteral + } + val triggerLogicalPlan = sink match { case _: Sink => newAttributePlan case s: StreamWriteSupport => val writer = s.createStreamWriter( s"$runId", - newAttributePlan.schema, + newAttrPlanPreResolvedForSchema.schema, outputMode, new DataSourceOptions(extraOptions.asJava)) if (writer.isInstanceOf[SupportsWriteInternalRow]) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index c65e5d3dd75c2..a2d4477a97eb2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -33,6 +33,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.catalyst.streaming.InternalOutputModes +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreId, StateStoreProvider} @@ -825,6 +826,51 @@ class StreamSuite extends StreamTest { assert(query.exception.isEmpty) } } + + test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp / current_date" + + " to Dataset - use v2 sink") { + testCurrentTimestampOnStreamingQuery(useV2Sink = true) + } + + test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp / current_date" + + " to Dataset - use v1 sink") { + testCurrentTimestampOnStreamingQuery(useV2Sink = false) + } + + private def testCurrentTimestampOnStreamingQuery(useV2Sink: Boolean): Unit = { + val input = MemoryStream[Int] + val df = input.toDS() + .withColumn("cur_timestamp", lit(current_timestamp())) + .withColumn("cur_date", lit(current_date())) + + def assertBatchOutputAndUpdateLastTimestamp( + rows: Seq[Row], + curTimestamp: Long, + curDate: Int, + expectedValue: Int): Long = { + assert(rows.size === 1) + val row = rows.head + assert(row.getInt(0) === expectedValue) + assert(row.getTimestamp(1).getTime >= curTimestamp) + val days = DateTimeUtils.millisToDays(row.getDate(2).getTime) + assert(days == curDate || days == curDate + 1) + row.getTimestamp(1).getTime + } + + var lastTimestamp = System.currentTimeMillis() + val currentDate = DateTimeUtils.millisToDays(lastTimestamp) + testStream(df, useV2Sink = useV2Sink) ( + AddData(input, 1), + CheckLastBatch { rows: Seq[Row] => + lastTimestamp = assertBatchOutputAndUpdateLastTimestamp(rows, lastTimestamp, currentDate, 1) + }, + Execute { _ => Thread.sleep(1000) }, + AddData(input, 2), + CheckLastBatch { rows: Seq[Row] => + lastTimestamp = assertBatchOutputAndUpdateLastTimestamp(rows, lastTimestamp, currentDate, 2) + } + ) + } } abstract class FakeSource extends StreamSourceProvider {