Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26379][SS][FOLLOWUP] Use dummy TimeZoneId to avoid UnresolvedException in CurrentBatchTimestamp #23660

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -501,27 +501,22 @@ class MicroBatchExecution(
// Rewire the plan to use the new attributes that were returned by the source.
val newAttributePlan = newBatchesPlan transformAllExpressions {
case ct: CurrentTimestamp =>
// CurrentTimestamp is not TimeZoneAwareExpression while CurrentBatchTimestamp is.
// Without TimeZoneId, CurrentBatchTimestamp is unresolved. Here, we use an explicit
// dummy string to prevent UnresolvedException and to prevent to be used in the future.
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
ct.dataType)
ct.dataType, Some("Dummy TimeZoneId"))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may try to use DateTimeUtils.defaultTimeZone().getID() or conf.sessionLocalTimeZone (as ResolveTimeZone does for TimeZoneAwareExpression).

However, IncrementalExecution doesn't use timezone info in case of CurrentBatchTimestamp(_, TimestampType, _). And, we want to prevent to use TimeZone for this CurrentTimestamp expression because this is originally non-TimeZoneAwareExpression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for providing nice analysis and looks like this patch is simpler and more concise. Nice!

case cd: CurrentDate =>
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
cd.dataType, cd.timeZoneId)
}

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jan 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following lines (511 ~ 519) are just a revert of the previous patch.

// Pre-resolve new attributes to ensure all attributes are resolved before
// accessing schema of logical plan. Note that it only leverages the information
// of attributes, so we don't need to concern about the value of literals.

val newAttrPlanPreResolvedForSchema = newAttributePlan transformAllExpressions {
case cbt: CurrentBatchTimestamp => cbt.toLiteral
}

val triggerLogicalPlan = sink match {
case _: Sink => newAttributePlan
case s: StreamingWriteSupportProvider =>
val writer = s.createStreamingWriteSupport(
s"$runId",
newAttrPlanPreResolvedForSchema.schema,
newAttributePlan.schema,
outputMode,
new DataSourceOptions(extraOptions.asJava))
WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, writer), newAttributePlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1081,21 +1081,19 @@ class StreamSuite extends StreamTest {
}
}

test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp / current_date" +
test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp " +
" to Dataset - use v2 sink") {
testCurrentTimestampOnStreamingQuery(useV2Sink = true)
}

test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp / current_date" +
test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp " +
" to Dataset - use v1 sink") {
testCurrentTimestampOnStreamingQuery(useV2Sink = false)
}

private def testCurrentTimestampOnStreamingQuery(useV2Sink: Boolean): Unit = {
val input = MemoryStream[Int]
val df = input.toDS()
.withColumn("cur_timestamp", lit(current_timestamp()))
.withColumn("cur_date", lit(current_date()))
val df = input.toDS().withColumn("cur_timestamp", lit(current_timestamp()))
Copy link
Contributor

@HeartSaVioR HeartSaVioR Jan 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, adding cur_date was the reason why UT is passed even without the patch.

I added only current_timestamp() first (checked UT failed without the patch) and added current_date() afterwards, which looks like making cur_timestamp be resolved without any of patches.
(Though I'm not sure about the mechanism why it happens...)

Nice finding!


def assertBatchOutputAndUpdateLastTimestamp(
rows: Seq[Row],
Expand All @@ -1106,8 +1104,6 @@ class StreamSuite extends StreamTest {
val row = rows.head
assert(row.getInt(0) === expectedValue)
assert(row.getTimestamp(1).getTime >= curTimestamp)
val days = DateTimeUtils.millisToDays(row.getDate(2).getTime)
assert(days == curDate || days == curDate + 1)
row.getTimestamp(1).getTime
}

Expand Down