From cec965c87f4f4289dd2b61fa00832eaaef5c5cda Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Fri, 25 Jan 2019 14:58:03 -0800 Subject: [PATCH] [SPARK-26379][SS] Fix issue on adding current_timestamp/current_date to streaming query ## What changes were proposed in this pull request? This patch proposes to fix issue on adding `current_timestamp` / `current_date` with streaming query. The root reason is that Spark transforms `CurrentTimestamp`/`CurrentDate` to `CurrentBatchTimestamp` in MicroBatchExecution which makes transformed attributes not-yet-resolved. They will be resolved by IncrementalExecution. (In ContinuousExecution, Spark doesn't allow using `current_timestamp` and `current_date` so it has been OK.) It's OK for DataSource V1 sink because it simply leverages transformed logical plan and don't evaluate until they're resolved, but for DataSource V2 sink, Spark tries to extract the schema of transformed logical plan in prior to IncrementalExecution, and unresolved attributes will raise errors. This patch fixes the issue via having separate pre-resolved logical plan to pass the schema to StreamingWriteSupport safely. ## How was this patch tested? Added UT. Closes #23609 from HeartSaVioR/SPARK-26379. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Dongjoon Hyun --- .../streaming/MicroBatchExecution.scala | 10 +++- .../spark/sql/streaming/StreamSuite.scala | 46 +++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 6a264ad708dae..6fed139440b30 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -437,12 +437,20 @@ class MicroBatchExecution( cd.dataType, cd.timeZoneId) } + // Pre-resolve new attributes to ensure all attributes are resolved before + // accessing schema of logical plan. Note that it only leverages the information + // of attributes, so we don't need to concern about the value of literals. + + val newAttrPlanPreResolvedForSchema = newAttributePlan transformAllExpressions { + case cbt: CurrentBatchTimestamp => cbt.toLiteral + } + val triggerLogicalPlan = sink match { case _: Sink => newAttributePlan case s: StreamWriteSupport => val writer = s.createStreamWriter( s"$runId", - newAttributePlan.schema, + newAttrPlanPreResolvedForSchema.schema, outputMode, new DataSourceOptions(extraOptions.asJava)) if (writer.isInstanceOf[SupportsWriteInternalRow]) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index c65e5d3dd75c2..a2d4477a97eb2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -33,6 +33,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.catalyst.streaming.InternalOutputModes +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreId, StateStoreProvider} @@ -825,6 +826,51 @@ class StreamSuite extends StreamTest { assert(query.exception.isEmpty) } } + + test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp / current_date" + + " to Dataset - use v2 sink") { + testCurrentTimestampOnStreamingQuery(useV2Sink = true) + } + + test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp / current_date" + + " to Dataset - use v1 sink") { + testCurrentTimestampOnStreamingQuery(useV2Sink = false) + } + + private def testCurrentTimestampOnStreamingQuery(useV2Sink: Boolean): Unit = { + val input = MemoryStream[Int] + val df = input.toDS() + .withColumn("cur_timestamp", lit(current_timestamp())) + .withColumn("cur_date", lit(current_date())) + + def assertBatchOutputAndUpdateLastTimestamp( + rows: Seq[Row], + curTimestamp: Long, + curDate: Int, + expectedValue: Int): Long = { + assert(rows.size === 1) + val row = rows.head + assert(row.getInt(0) === expectedValue) + assert(row.getTimestamp(1).getTime >= curTimestamp) + val days = DateTimeUtils.millisToDays(row.getDate(2).getTime) + assert(days == curDate || days == curDate + 1) + row.getTimestamp(1).getTime + } + + var lastTimestamp = System.currentTimeMillis() + val currentDate = DateTimeUtils.millisToDays(lastTimestamp) + testStream(df, useV2Sink = useV2Sink) ( + AddData(input, 1), + CheckLastBatch { rows: Seq[Row] => + lastTimestamp = assertBatchOutputAndUpdateLastTimestamp(rows, lastTimestamp, currentDate, 1) + }, + Execute { _ => Thread.sleep(1000) }, + AddData(input, 2), + CheckLastBatch { rows: Seq[Row] => + lastTimestamp = assertBatchOutputAndUpdateLastTimestamp(rows, lastTimestamp, currentDate, 2) + } + ) + } } abstract class FakeSource extends StreamSourceProvider {