Skip to content

Commit

Permalink
clear precomputedSplitAssignments
Browse files Browse the repository at this point in the history
Signed-off-by: Tilak Raj <[email protected]>
  • Loading branch information
tilakraj94 committed Jan 24, 2025
1 parent 6ba9b30 commit 054127c
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,22 +63,17 @@ private List<List<NatsSubjectSplit>> preComputeSplitsAssignments (int parallelis

// Distribute splits evenly among subtasks
for (int j = 0; j < parallelism; j++) {
for (int i = 0; i < minimumSplitsPerReader; i++) {
NatsSubjectSplit split = remainingSplits.poll();
if (split != null) {
splitAssignments.get(j).add(split);
}
List<NatsSubjectSplit> readerSplits = splitAssignments.get(j);

// Assign minimum splits to each reader
for (int i = 0; i < minimumSplitsPerReader && !remainingSplits.isEmpty(); i++) {
readerSplits.add(remainingSplits.poll());
}

// Assign one leftover split if available
if (leftoverSplits > 0 && !remainingSplits.isEmpty()) {
NatsSubjectSplit extraSplit = remainingSplits.poll();
if (extraSplit != null) {
splitAssignments.get(j).add(extraSplit);

// Reduce leftover count
leftoverSplits--;
}
readerSplits.add(remainingSplits.poll());
leftoverSplits--;
}
}

Expand All @@ -87,6 +82,8 @@ private List<List<NatsSubjectSplit>> preComputeSplitsAssignments (int parallelis

@Override
public void close() {
// remove precomputed split assignments if any
precomputedSplitAssignments.clear();
}

@Override
Expand All @@ -98,6 +95,8 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname
context.signalNoMoreSplits(subtaskId);
} else {
// O(1) operation with LinkedList
// Remove the first element from the list
// and assign splits to subtask
List<NatsSubjectSplit> splits = precomputedSplitAssignments.remove(0);
if (splits.isEmpty()) {
LOG.debug("{} | Empty split assignment for subtask {}", id, subtaskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ void testHandleSplitRequestWithSplitsGreaterThanParallelism() {
});
}

// Expected: Each subtask gets at least 2 splits (6 splits, 3 subtasks)
assertEquals(5, assignedSplits.size());
assertEquals(List.of("split1", "split2", "split3", "split4", "split5"), assignedSplits);
}
Expand Down Expand Up @@ -143,7 +142,6 @@ void testHandleSplitRequestWithSplitsEqualToParallelism() {
});
}

// Expected: Each subtask gets at least 2 splits (6 splits, 3 subtasks)
assertEquals(3, assignedSplits.size());
assertEquals(List.of("split1", "split2", "split3"), assignedSplits);
}
Expand Down Expand Up @@ -188,7 +186,6 @@ void testHandleSplitRequestWithLargeParallelism() {
});
}

// Expected: Each subtask gets at least 2 splits (6 splits, 3 subtasks)
assertEquals(9, assignedSplits.size());
assertEquals(List.of("split1", "split2", "split3", "split4", "split5", "split6", "split7", "split8", "split9"), assignedSplits);
}
Expand Down Expand Up @@ -227,7 +224,6 @@ void testHandleSplitRequestWithLargeSplits() {
});
}

// Expected: Each subtask gets at least 2 splits (6 splits, 3 subtasks)
assertEquals(100, assignedSplits.size());
}
}

0 comments on commit 054127c

Please sign in to comment.