-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23099][SS] Migrate foreach sink to DataSourceV2 #20552
Conversation
/cc @tdas |
Test build #87230 has finished for PR 20552 at commit
|
44de1ea
to
87d0bc8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fine, just a few comments. But one major concern I have is the way we are indirectly exposing the batchId to the DataWriter (using job properties). If we are not exposing it directly, then essentially we are saying is that other data source build by general developers are not supposed to use batch ids in the executors for any purpose. In a way that is very odd: there is some transactional logic that you can write with ForeachWriter (which exposes the id), but you cannot write the same logic with DataSource V2 APIs.
I know that the alternative is also not great; we will have to again split the StreamWriter to separate APIs for MicroBatch and Continuous.
} | ||
|
||
case class ForeachInternalWriter[T: Encoder]( | ||
writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: params on different lines
} | ||
} | ||
|
||
case class ForeachInternalWriter[T: Encoder]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This is really a small class. Maybe inline this rather than define a confusing name...InternalWriter
} | ||
} | ||
|
||
case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], encoder: ExpressionEncoder[T]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similarly ... maybe inline this class as well. its very small.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually.. probably should not inline this. its outer closure may not be serializable in that case.
} | ||
} | ||
|
||
class ForeachDataWriter[T : Encoder]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add docs describing the implementation of this DataWriter, especially the lifecycle of ForeachWriter (should go here than inline comments).
private def openAndSetState(epochId: Long) = { | ||
// Create a new writer by roundtripping through the serialization for compatibility. | ||
// In the old API, a writer instantiation would never get reused. | ||
val byteStream = new ByteArrayOutputStream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you serializing and deserializing here? If you are reserializing the ForeachWriter, doesnt this mean that you are going to retain state (of the non-transient fields) across them? Is that what you want?
seems the best thing to do is to serialize the writer at the driver, send the bytes to the task, and then deserialize repeatedly. then you only incur the cost of deserializing between epochs and you always start with a fresh copy of the ForeachWriter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right; this suggestion is what we really want.
} | ||
|
||
class ForeachDataWriter[T : Encoder]( | ||
private var writer: ForeachWriter[T], encoder: ExpressionEncoder[T], partitionId: Int) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
params in separate lines.
@@ -255,6 +255,32 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf | |||
query.stop() | |||
} | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there should be a test with continuous processing + foreach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good instinct, it didn't quite work. Added the test.
Test build #87231 has finished for PR 20552 at commit
|
It's my intent to say that other data sources built by general developers aren't supposed to use batch ids in the executors for any purpose. In addition to the issue you mentioned, I don't think there's a compelling reason to do so in the DataSourceV2 model, and I worry the functionality would make it easy to write transactional logic which seems correct but isn't. Since this interface is still evolving, I think it makes sense to revisit the question if we notice a scenario where it's infeasible to rewrite a piece of transactional logic to not use the batch ID in the executor. |
Test build #87241 has finished for PR 20552 at commit
|
Filed SPARK-23416 for the unrelated failure in build 87241. |
Test build #87417 has finished for PR 20552 at commit
|
This is obsolete - we're changing the lifecycle of DataWriter. |
## What changes were proposed in this pull request? Migrate foreach sink to DataSourceV2. Since the previous attempt at this PR apache#20552, we've changed and strictly defined the lifecycle of writer components. This means we no longer need the complicated lifecycle shim from that PR; it just naturally works. ## How was this patch tested? existing tests Author: Jose Torres <[email protected]> Closes apache#20951 from jose-torres/foreach.
## What changes were proposed in this pull request? Migrate foreach sink to DataSourceV2. Since the previous attempt at this PR apache#20552, we've changed and strictly defined the lifecycle of writer components. This means we no longer need the complicated lifecycle shim from that PR; it just naturally works. ## How was this patch tested? existing tests Author: Jose Torres <[email protected]> Closes apache#20951 from jose-torres/foreach.
## What changes were proposed in this pull request? Migrate foreach sink to DataSourceV2. Since the previous attempt at this PR apache#20552, we've changed and strictly defined the lifecycle of writer components. This means we no longer need the complicated lifecycle shim from that PR; it just naturally works. ## How was this patch tested? existing tests Author: Jose Torres <[email protected]> Closes apache#20951 from jose-torres/foreach.
What changes were proposed in this pull request?
Migrate the foreach sink to the DataSourceV2 API.
How was this patch tested?
existing unit tests, and new test to verify edge case