-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
assign all splits correctly to source readers #37
base: main
Are you sure you want to change the base?
assign all splits correctly to source readers #37
Conversation
Signed-off-by: Tilak Raj <[email protected]>
Signed-off-by: Tilak Raj <[email protected]>
3b773b7
to
3ea2253
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I wrote the original connector, I had to consider this exact same thing. Also, the flink-connector-[aws/kafka/pulsar] are useful examples.
We definitely need to sort of test or just code that we can use to debug this.
|
||
// let the splits be evenly distributed | ||
if (noOfSplits <= parallelism) { | ||
this.minimumSplitsToAssign = -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any chance this can be = 1;
Also, can you make this an if-else, the return like this is unusual in a void return type method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made the changes to remove -1. and remove all instances of return; for voids
@scottf added test cases as well. Pls check. |
054127c
to
4593bd9
Compare
Signed-off-by: Tilak Raj <[email protected]>
4593bd9
to
b059460
Compare
This PR improves the enumerator logic to better handle split assignments, particularly in scenarios where the number of splits exceeds the parallelism. Previously, the enumerator assigned one split per source reader, which worked well when the number of splits was less than or equal to the parallelism. However, when the number of splits exceeded the parallelism, the leftover splits were never assigned to source readers, leading to data not being processed by the sink.
When running the
SourceToSinkExample
file and adding one more subject (i.e., split ID), the sink output was missing data because the additional split was never assigned by the enumerator to a source reader.Below are logs from the observed issue:
As it can be seen from the logs,
data-source3-15
never reached sink because it was never assigned to reader.adding logs to
handleSplitRequest
also verifies that one of the splits was not assigned to reader.Output of above log:
After PR fix:
Output:
Logs: