-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26379][SS][FOLLOWUP] Use dummy TimeZoneId to avoid UnresolvedException in CurrentBatchTimestamp #23660
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -501,27 +501,22 @@ class MicroBatchExecution( | |
// Rewire the plan to use the new attributes that were returned by the source. | ||
val newAttributePlan = newBatchesPlan transformAllExpressions { | ||
case ct: CurrentTimestamp => | ||
// CurrentTimestamp is not TimeZoneAwareExpression while CurrentBatchTimestamp is. | ||
// Without TimeZoneId, CurrentBatchTimestamp is unresolved. Here, we use an explicit | ||
// dummy string to prevent UnresolvedException and to prevent to be used in the future. | ||
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, | ||
ct.dataType) | ||
ct.dataType, Some("Dummy TimeZoneId")) | ||
case cd: CurrentDate => | ||
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, | ||
cd.dataType, cd.timeZoneId) | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The following lines (511 ~ 519) are just a revert of the previous patch. |
||
// Pre-resolve new attributes to ensure all attributes are resolved before | ||
// accessing schema of logical plan. Note that it only leverages the information | ||
// of attributes, so we don't need to concern about the value of literals. | ||
|
||
val newAttrPlanPreResolvedForSchema = newAttributePlan transformAllExpressions { | ||
case cbt: CurrentBatchTimestamp => cbt.toLiteral | ||
} | ||
|
||
val triggerLogicalPlan = sink match { | ||
case _: Sink => newAttributePlan | ||
case s: StreamingWriteSupportProvider => | ||
val writer = s.createStreamingWriteSupport( | ||
s"$runId", | ||
newAttrPlanPreResolvedForSchema.schema, | ||
newAttributePlan.schema, | ||
outputMode, | ||
new DataSourceOptions(extraOptions.asJava)) | ||
WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, writer), newAttributePlan) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1081,21 +1081,19 @@ class StreamSuite extends StreamTest { | |
} | ||
} | ||
|
||
test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp / current_date" + | ||
test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp " + | ||
" to Dataset - use v2 sink") { | ||
testCurrentTimestampOnStreamingQuery(useV2Sink = true) | ||
} | ||
|
||
test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp / current_date" + | ||
test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp " + | ||
" to Dataset - use v1 sink") { | ||
testCurrentTimestampOnStreamingQuery(useV2Sink = false) | ||
} | ||
|
||
private def testCurrentTimestampOnStreamingQuery(useV2Sink: Boolean): Unit = { | ||
val input = MemoryStream[Int] | ||
val df = input.toDS() | ||
.withColumn("cur_timestamp", lit(current_timestamp())) | ||
.withColumn("cur_date", lit(current_date())) | ||
val df = input.toDS().withColumn("cur_timestamp", lit(current_timestamp())) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, adding I added only Nice finding! |
||
|
||
def assertBatchOutputAndUpdateLastTimestamp( | ||
rows: Seq[Row], | ||
|
@@ -1106,8 +1104,6 @@ class StreamSuite extends StreamTest { | |
val row = rows.head | ||
assert(row.getInt(0) === expectedValue) | ||
assert(row.getTimestamp(1).getTime >= curTimestamp) | ||
val days = DateTimeUtils.millisToDays(row.getDate(2).getTime) | ||
assert(days == curDate || days == curDate + 1) | ||
row.getTimestamp(1).getTime | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may try to use
DateTimeUtils.defaultTimeZone().getID()
orconf.sessionLocalTimeZone
(asResolveTimeZone
does forTimeZoneAwareExpression
).However,
IncrementalExecution
doesn't use timezone info in case ofCurrentBatchTimestamp(_, TimestampType, _)
. And, we want to prevent to useTimeZone
for thisCurrentTimestamp
expression because this is originally non-TimeZoneAwareExpression
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for providing nice analysis and looks like this patch is simpler and more concise. Nice!