Skip to content

Commit

Permalink
[SPARK-26379][SS][FOLLOWUP] Use dummy TimeZoneId to avoid UnresolvedE…
Browse files Browse the repository at this point in the history
…xception in CurrentBatchTimestamp

## What changes were proposed in this pull request?

Spark replaces `CurrentTimestamp` with `CurrentBatchTimestamp`.
However, `CurrentBatchTimestamp` is `TimeZoneAwareExpression` while `CurrentTimestamp` isn't.
Without TimeZoneId, `CurrentBatchTimestamp` becomes unresolved and raises `UnresolvedException`.

Since `CurrentDate` is `TimeZoneAwareExpression`, there is no problem with `CurrentDate`.

This PR reverts the [previous patch](apache#23609) on `MicroBatchExecution` and fixes the root cause.

## How was this patch tested?

Pass the Jenkins with the updated test cases.

Closes apache#23660 from dongjoon-hyun/SPARK-26379.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
dongjoon-hyun authored and jackylee-ch committed Feb 18, 2019
1 parent 948b5a4 commit b2e814c
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -501,27 +501,22 @@ class MicroBatchExecution(
// Rewire the plan to use the new attributes that were returned by the source.
val newAttributePlan = newBatchesPlan transformAllExpressions {
case ct: CurrentTimestamp =>
// CurrentTimestamp is not TimeZoneAwareExpression while CurrentBatchTimestamp is.
// Without TimeZoneId, CurrentBatchTimestamp is unresolved. Here, we use an explicit
// dummy string to prevent UnresolvedException and to prevent to be used in the future.
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
ct.dataType)
ct.dataType, Some("Dummy TimeZoneId"))
case cd: CurrentDate =>
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
cd.dataType, cd.timeZoneId)
}

// Pre-resolve new attributes to ensure all attributes are resolved before
// accessing schema of logical plan. Note that it only leverages the information
// of attributes, so we don't need to concern about the value of literals.

val newAttrPlanPreResolvedForSchema = newAttributePlan transformAllExpressions {
case cbt: CurrentBatchTimestamp => cbt.toLiteral
}

val triggerLogicalPlan = sink match {
case _: Sink => newAttributePlan
case s: StreamingWriteSupportProvider =>
val writer = s.createStreamingWriteSupport(
s"$runId",
newAttrPlanPreResolvedForSchema.schema,
newAttributePlan.schema,
outputMode,
new DataSourceOptions(extraOptions.asJava))
WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, writer), newAttributePlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1081,21 +1081,19 @@ class StreamSuite extends StreamTest {
}
}

test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp / current_date" +
test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp " +
" to Dataset - use v2 sink") {
testCurrentTimestampOnStreamingQuery(useV2Sink = true)
}

test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp / current_date" +
test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp " +
" to Dataset - use v1 sink") {
testCurrentTimestampOnStreamingQuery(useV2Sink = false)
}

private def testCurrentTimestampOnStreamingQuery(useV2Sink: Boolean): Unit = {
val input = MemoryStream[Int]
val df = input.toDS()
.withColumn("cur_timestamp", lit(current_timestamp()))
.withColumn("cur_date", lit(current_date()))
val df = input.toDS().withColumn("cur_timestamp", lit(current_timestamp()))

def assertBatchOutputAndUpdateLastTimestamp(
rows: Seq[Row],
Expand All @@ -1106,8 +1104,6 @@ class StreamSuite extends StreamTest {
val row = rows.head
assert(row.getInt(0) === expectedValue)
assert(row.getTimestamp(1).getTime >= curTimestamp)
val days = DateTimeUtils.millisToDays(row.getDate(2).getTime)
assert(days == curDate || days == curDate + 1)
row.getTimestamp(1).getTime
}

Expand Down

0 comments on commit b2e814c

Please sign in to comment.