Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][SPARK-29221][SQL] LocalTableScanExec: handle the case where executors are accessing "null" rows #25913

Closed
wants to merge 2 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ case class LocalTableScanExec(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

@transient private lazy val unsafeRows: Array[InternalRow] = {
if (rows.isEmpty) {
if (rows == null || rows.isEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot remove @transient instead of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding transient is to avoid serializing and copying whole rows, so I guess it's preferred one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related commits are here:
256358f
f70f46d

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. I'm a bit worried that this value could be different between a driver and executors. Which code touches this variable in executors? What's the stacktrace for unsafeRows?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh, that's actually a kind of defensive programming, as it would bring NPE if there's no check for null. I haven't seen any actual stack trace to refer to unsafeRows.

Copy link
Member

@maropu maropu Sep 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally think unsafeRows is not expected to be called in executor sides, so we need to check assert(rows != null) instead of the current approach. If it fails then, we need to fix some code not to call this variable in executors.

Array.empty
} else {
val proj = UnsafeProjection.create(output, output)
Expand All @@ -59,7 +59,9 @@ case class LocalTableScanExec(
}

override protected def stringArgs: Iterator[Any] = {
if (rows.isEmpty) {
if (rows == null) {
Iterator("<unknown>", output)
} else if (rows.isEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok to return different values in a driver and executors? How about computing this value in a driver then passing into executors?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Sep 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value except output should be meaningless, otherwise assumption in related comments goes wrong and we should serialize the rows. I guess that's only for visual.

Iterator("<empty>", output)
} else {
Iterator(output)
Expand Down