From fcbfcd5c0fe7f2f8b4ef3d21bf67064ed0d727ed Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 25 Jan 2019 23:18:12 -0800 Subject: [PATCH] [SPARK-26379][SS][FOLLOWUP] Use dummy TimeZoneId to avoid UnresolvedException in CurrentBatchTimestamp --- .../execution/streaming/MicroBatchExecution.scala | 15 +++++---------- .../apache/spark/sql/streaming/StreamSuite.scala | 10 +++------- 2 files changed, 8 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index fde851a5c07df..792e3a38dda5d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -501,27 +501,22 @@ class MicroBatchExecution( // Rewire the plan to use the new attributes that were returned by the source. val newAttributePlan = newBatchesPlan transformAllExpressions { case ct: CurrentTimestamp => + // CurrentTimestamp is not TimeZoneAwareExpression while CurrentBatchTimestamp is. + // Without TimeZoneId, CurrentBatchTimestamp is unresolved. Here, we use an explicit + // dummy string to prevent UnresolvedException and to prevent to be used in the future. CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, - ct.dataType) + ct.dataType, Some("Dummy TimeZoneId")) case cd: CurrentDate => CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, cd.dataType, cd.timeZoneId) } - // Pre-resolve new attributes to ensure all attributes are resolved before - // accessing schema of logical plan. Note that it only leverages the information - // of attributes, so we don't need to concern about the value of literals. - - val newAttrPlanPreResolvedForSchema = newAttributePlan transformAllExpressions { - case cbt: CurrentBatchTimestamp => cbt.toLiteral - } - val triggerLogicalPlan = sink match { case _: Sink => newAttributePlan case s: StreamingWriteSupportProvider => val writer = s.createStreamingWriteSupport( s"$runId", - newAttrPlanPreResolvedForSchema.schema, + newAttributePlan.schema, outputMode, new DataSourceOptions(extraOptions.asJava)) WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, writer), newAttributePlan) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 0bbef47fd32e1..868b43cc2b88a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -1081,21 +1081,19 @@ class StreamSuite extends StreamTest { } } - test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp / current_date" + + test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp " + " to Dataset - use v2 sink") { testCurrentTimestampOnStreamingQuery(useV2Sink = true) } - test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp / current_date" + + test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp " + " to Dataset - use v1 sink") { testCurrentTimestampOnStreamingQuery(useV2Sink = false) } private def testCurrentTimestampOnStreamingQuery(useV2Sink: Boolean): Unit = { val input = MemoryStream[Int] - val df = input.toDS() - .withColumn("cur_timestamp", lit(current_timestamp())) - .withColumn("cur_date", lit(current_date())) + val df = input.toDS().withColumn("cur_timestamp", lit(current_timestamp())) def assertBatchOutputAndUpdateLastTimestamp( rows: Seq[Row], @@ -1106,8 +1104,6 @@ class StreamSuite extends StreamTest { val row = rows.head assert(row.getInt(0) === expectedValue) assert(row.getTimestamp(1).getTime >= curTimestamp) - val days = DateTimeUtils.millisToDays(row.getDate(2).getTime) - assert(days == curDate || days == curDate + 1) row.getTimestamp(1).getTime }