Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

assign all splits correctly to source readers #37

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

tilakraj94
Copy link
Contributor

@tilakraj94 tilakraj94 commented Jan 16, 2025

This PR improves the enumerator logic to better handle split assignments, particularly in scenarios where the number of splits exceeds the parallelism. Previously, the enumerator assigned one split per source reader, which worked well when the number of splits was less than or equal to the parallelism. However, when the number of splits exceeded the parallelism, the leftover splits were never assigned to source readers, leading to data not being processed by the sink.

When running the SourceToSinkExample file and adding one more subject (i.e., split ID), the sink output was missing data because the additional split was never assigned by the enumerator to a source reader.

Below are logs from the observed issue:

2025-01-16 14:11:34:470+0530 [Source: NatsSource -> Sink: Writer (1/2)#0] [DEBUG] NatsSourceReader - suJm-0w49cRQOsz | addSplits [Subject: source1]
2025-01-16 14:11:34:470+0530 [Source: NatsSource -> Sink: Writer (2/2)#0] [DEBUG] NatsSourceReader - suJm-0w49cRQOrS | addSplits [Subject: source2]

Publishing. Subject: source1 Payload: data-source1-13
Publishing. Subject: source2 Payload: data-source2-14
Publishing. Subject: source3 Payload: data-source3-15
Listening. Subject: sink1 Payload: data-source2-14
Listening. Subject: sink2 Payload: data-source2-14
Listening. Subject: sink3 Payload: data-source2-14
Listening. Subject: sink1 Payload: data-source1-13
Listening. Subject: sink2 Payload: data-source1-13
Listening. Subject: sink3 Payload: data-source1-13

As it can be seen from the logs, data-source3-15 never reached sink because it was never assigned to reader.

adding logs to handleSplitRequest also verifies that one of the splits was not assigned to reader.

public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
    final NatsSubjectSplit nextSplit = remainingSplits.poll();
    if (nextSplit != null) {
        context.assignSplit(nextSplit, subtaskId);
    }
    else {
        context.signalNoMoreSplits(subtaskId);
    }

    System.out.println("should be 0 at one time, current value: " +  remainingSplits.size());
}

Output of above log:

should be 0 at one time, current value: 2
should be 0 at one time, current value: 1

After PR fix:

Output:

Publishing. Subject: source1 Payload: data-source1-13
Publishing. Subject: source2 Payload: data-source2-14
Publishing. Subject: source3 Payload: data-source3-15
Listening. Subject: sink1 Payload: data-source3-15
Listening. Subject: sink2 Payload: data-source3-15
Listening. Subject: sink3 Payload: data-source3-15
Listening. Subject: sink1 Payload: data-source1-13
Listening. Subject: sink2 Payload: data-source1-13
Listening. Subject: sink3 Payload: data-source1-13
Listening. Subject: sink1 Payload: data-source2-14
Listening. Subject: sink2 Payload: data-source2-14
Listening. Subject: sink3 Payload: data-source2-14

Logs:

2025-01-16 14:17:39:026+0530 [SourceCoordinator-Source: NatsSource] [DEBUG] NatsSourceEnumerator - iHwM-3TNbT3A9wG | Assigned splits to subtask: 1
2025-01-16 14:17:39:026+0530 [SourceCoordinator-Source: NatsSource] [DEBUG] NatsSourceEnumerator - iHwM-3TNbT3A9wG | Assigned split in round-robin to subtask: 1
2025-01-16 14:17:39:027+0530 [SourceCoordinator-Source: NatsSource] [DEBUG] NatsSourceEnumerator - iHwM-3TNbT3A9wG | Assigned splits to subtask: 0
2025-01-16 14:17:39:028+0530 [Source: NatsSource -> Sink: Writer (1/2)#0] [DEBUG] NatsSourceReader - iHwM-3TNbT3AA1S | addSplits [Subject: source3]
2025-01-16 14:17:39:028+0530 [Source: NatsSource -> Sink: Writer (2/2)#0] [DEBUG] NatsSourceReader - iHwM-3TNbT3A9yr | addSplits [Subject: source1]
2025-01-16 14:17:39:028+0530 [Source: NatsSource -> Sink: Writer (2/2)#0] [DEBUG] NatsSourceReader - iHwM-3TNbT3A9yr | addSplits [Subject: source2]

@tilakraj94 tilakraj94 force-pushed the fix-enumerator-split-assign branch from 3b773b7 to 3ea2253 Compare January 17, 2025 05:45
Copy link
Collaborator

@scottf scottf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I wrote the original connector, I had to consider this exact same thing. Also, the flink-connector-[aws/kafka/pulsar] are useful examples.

We definitely need to sort of test or just code that we can use to debug this.


// let the splits be evenly distributed
if (noOfSplits <= parallelism) {
this.minimumSplitsToAssign = -1;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance this can be = 1; Also, can you make this an if-else, the return like this is unusual in a void return type method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made the changes to remove -1. and remove all instances of return; for voids

@tilakraj94
Copy link
Contributor Author

tilakraj94 commented Jan 24, 2025

@scottf added test cases as well. Pls check.

@tilakraj94 tilakraj94 force-pushed the fix-enumerator-split-assign branch 3 times, most recently from 054127c to 4593bd9 Compare January 24, 2025 12:34
@tilakraj94 tilakraj94 force-pushed the fix-enumerator-split-assign branch from 4593bd9 to b059460 Compare January 24, 2025 12:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants