Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

assign all splits correctly to source readers #37

Merged
merged 3 commits into from
Jan 27, 2025
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,27 @@
import io.synadia.flink.v0.source.split.NatsSubjectSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.*;

import static io.synadia.flink.v0.utils.MiscUtils.generatePrefixedId;
import static org.apache.flink.util.Preconditions.checkNotNull;

public class NatsSourceEnumerator implements SplitEnumerator<NatsSubjectSplit, Collection<NatsSubjectSplit>> {
private static final Logger LOG = LoggerFactory.getLogger(NatsSourceEnumerator.class);

private final String id;
private final SplitEnumeratorContext<NatsSubjectSplit> context;
private final Queue<NatsSubjectSplit> remainingSplits;

// assumes splits to be less than or equal to parallelism
private int minimumSplitsToAssign = -1;


public NatsSourceEnumerator(String sourceId,
SplitEnumeratorContext<NatsSubjectSplit> context,
Collection<NatsSubjectSplit> splits)
Expand All @@ -32,6 +38,17 @@ public NatsSourceEnumerator(String sourceId,

@Override
public void start() {
int noOfSplits = remainingSplits.size();
int parallelism = context.currentParallelism();

// let the splits be evenly distributed
if (noOfSplits <= parallelism) {
this.minimumSplitsToAssign = -1;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance this can be = 1; Also, can you make this an if-else, the return like this is unusual in a void return type method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made the changes to remove -1. and remove all instances of return; for voids

return;
}

// minimum splits that needs to be assigned to reader
this.minimumSplitsToAssign = noOfSplits / parallelism;
}

@Override
Expand All @@ -40,12 +57,37 @@ public void close() {

@Override
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
final NatsSubjectSplit nextSplit = remainingSplits.poll();
if (remainingSplits.isEmpty()) {
context.signalNoMoreSplits(subtaskId);
return;
}

List<NatsSubjectSplit> nextSplits = new ArrayList<>();
for (int i = 0; i < this.minimumSplitsToAssign; i++) {
NatsSubjectSplit nextSplit = remainingSplits.poll();
if (nextSplit == null) {
break;
}

nextSplits.add(nextSplit);
}

if (!nextSplits.isEmpty()) {
Map<Integer, List<NatsSubjectSplit>> assignedSplits = new HashMap<>();
assignedSplits.put(subtaskId, nextSplits);

// assign the splits back to the source reader
context.assignSplits(new SplitsAssignment<>(assignedSplits));
LOG.debug("{} | Assigned splits to subtask: {}", id, subtaskId);
}

// Perform round-robin assignment for leftover splits
// Assign only one split at a time since the number of leftover splits will always be less than the parallelism.
// Each leftover split can be assigned to any reader, and the list will be exhausted quickly.
NatsSubjectSplit nextSplit = remainingSplits.poll();
if (nextSplit != null) {
context.assignSplit(nextSplit, subtaskId);
}
else {
context.signalNoMoreSplits(subtaskId);
LOG.debug("{} | Assigned split in round-robin to subtask: {}", id, subtaskId);
}
}

Expand Down
Loading