Skip to content

Commit

Permalink
add null check for next splits
Browse files Browse the repository at this point in the history
Signed-off-by: Tilak Raj <[email protected]>
  • Loading branch information
tilakraj94 committed Jan 16, 2025
1 parent d0d9975 commit 3b773b7
Showing 1 changed file with 16 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ public class NatsSourceEnumerator implements SplitEnumerator<NatsSubjectSplit, C
private final String id;
private final SplitEnumeratorContext<NatsSubjectSplit> context;
private final Queue<NatsSubjectSplit> remainingSplits;
private Integer minimumSplitsToAssign = 1;

// assumes splits to be less than or equal to parallelism
private int minimumSplitsToAssign = -1;


public NatsSourceEnumerator(String sourceId,
Expand All @@ -39,6 +41,12 @@ public void start() {
int noOfSplits = remainingSplits.size();
int parallelism = context.currentParallelism();

// let the splits be evenly distributed
if (noOfSplits <= parallelism) {
this.minimumSplitsToAssign = -1;
return;
}

// minimum splits that needs to be assigned to reader
this.minimumSplitsToAssign = noOfSplits / parallelism;
}
Expand All @@ -60,12 +68,14 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname
nextSplits.add(nextSplit);
}

Map<Integer, List<NatsSubjectSplit>> assignedSplits = new HashMap<>();
assignedSplits.put(subtaskId, nextSplits);
if (!nextSplits.isEmpty()) {
Map<Integer, List<NatsSubjectSplit>> assignedSplits = new HashMap<>();
assignedSplits.put(subtaskId, nextSplits);

// assign the splits back to the source reader
context.assignSplits(new SplitsAssignment<>(assignedSplits));
LOG.debug("{} | Assigned splits to subtask: {}", id, subtaskId);
// assign the splits back to the source reader
context.assignSplits(new SplitsAssignment<>(assignedSplits));
LOG.debug("{} | Assigned splits to subtask: {}", id, subtaskId);
}

// Perform round-robin assignment for leftover splits
// Assign only one split at a time since the number of leftover splits will always be less than the parallelism.
Expand Down

0 comments on commit 3b773b7

Please sign in to comment.