Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enumerator Fails to Properly Assign Splits When Parallelism is Less Than the Number of Splits #38

Open
tilakraj94 opened this issue Jan 16, 2025 · 1 comment

Comments

@tilakraj94
Copy link
Contributor

tilakraj94 commented Jan 16, 2025

Observed behavior

When the number of splits exceeded the parallelism, the leftover splits were never assigned to source readers, leading to data not being processed by the sink.

When running the SourceToSinkExample file and adding one more subject (i.e., split ID), the sink output was missing data because the additional split was never assigned by the enumerator to a source reader.

Below are logs from the observed issue:

2025-01-16 14:11:34:470+0530 [Source: NatsSource -> Sink: Writer (1/2)#0] [DEBUG] NatsSourceReader - suJm-0w49cRQOsz | addSplits [Subject: source1]
2025-01-16 14:11:34:470+0530 [Source: NatsSource -> Sink: Writer (2/2)#0] [DEBUG] NatsSourceReader - suJm-0w49cRQOrS | addSplits [Subject: source2]

Publishing. Subject: source1 Payload: data-source1-13
Publishing. Subject: source2 Payload: data-source2-14
Publishing. Subject: source3 Payload: data-source3-15
Listening. Subject: sink1 Payload: data-source2-14
Listening. Subject: sink2 Payload: data-source2-14
Listening. Subject: sink3 Payload: data-source2-14
Listening. Subject: sink1 Payload: data-source1-13
Listening. Subject: sink2 Payload: data-source1-13
Listening. Subject: sink3 Payload: data-source1-13

As it can be seen from the logs, data-source3-15 never reached sink because it was never assigned to reader. Only source1 and source2 has been assigned to readers.

adding logs to handleSplitRequest also verifies that one of the splits was not assigned to reader.

public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
    final NatsSubjectSplit nextSplit = remainingSplits.poll();
    if (nextSplit != null) {
        context.assignSplit(nextSplit, subtaskId);
    }
    else {
        context.signalNoMoreSplits(subtaskId);
    }

    System.out.println("should be 0 at one time, current value: " +  remainingSplits.size());
}

Output of above log:

should be 0 at one time, current value: 2
should be 0 at one time, current value: 1

Expected behavior

Each splits should be assigned properly to all the source readers. No split should be left.

Server and client version

nats-server --version : nats-server: v2.10.22
natscli --version : 0.1.5
flink --version : Version: 1.17.2, Commit ID: c0027e5

Host environment

Apple M2 Pro, OS : 14.5 (23F79)

Steps to reproduce

  1. Run example in SourceToSinkExample.
  2. set env.setParallelism(2);
  3. add source.subjects=source1,source2,source3 and sink.subjects=sink1,sink2,sink3 in application.properties.
  4. Check the output of the sink. It will be missing data from the third subject.
@tilakraj94
Copy link
Contributor Author

Fixed in PR: #37

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant