Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent multiple attempts to publish segments for the same sequence #14995

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ public enum Status
private final String stream;

private final Set<String> publishingSequences = Sets.newConcurrentHashSet();
private final Set<String> publishedSequences = Sets.newConcurrentHashSet();
private final List<ListenableFuture<SegmentsAndCommitMetadata>> publishWaitList = new ArrayList<>();
private final List<ListenableFuture<SegmentsAndCommitMetadata>> handOffWaitList = new ArrayList<>();

Expand Down Expand Up @@ -806,7 +807,8 @@ public void onFailure(Throwable t)
List<SequenceMetadata<PartitionIdType, SequenceOffsetType>> sequencesSnapshot = new ArrayList<>(sequences);
for (int i = 0; i < sequencesSnapshot.size(); i++) {
final SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceMetadata = sequencesSnapshot.get(i);
if (!publishingSequences.contains(sequenceMetadata.getSequenceName())) {
if (!publishingSequences.contains(sequenceMetadata.getSequenceName())
&& !publishedSequences.contains(sequenceMetadata.getSequenceName())) {
final boolean isLast = i == (sequencesSnapshot.size() - 1);
if (isLast) {
// Shorten endOffsets of the last sequence to match currOffsets.
Expand Down Expand Up @@ -1009,6 +1011,7 @@ public void onSuccess(SegmentsAndCommitMetadata publishedSegmentsAndCommitMetada
);
log.infoSegments(publishedSegmentsAndCommitMetadata.getSegments(), "Published segments");

publishedSequences.add(sequenceMetadata.getSequenceName());
sequences.remove(sequenceMetadata);
publishingSequences.remove(sequenceMetadata.getSequenceName());

Expand Down Expand Up @@ -1157,7 +1160,9 @@ private void maybePersistAndPublishSequences(Supplier<Committer> committerSuppli
{
for (SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceMetadata : sequences) {
sequenceMetadata.updateAssignments(currOffsets, this::isMoreToReadBeforeReadingRecord);
if (!sequenceMetadata.isOpen() && !publishingSequences.contains(sequenceMetadata.getSequenceName())) {
if (!sequenceMetadata.isOpen()
&& !publishingSequences.contains(sequenceMetadata.getSequenceName())
&& !publishedSequences.contains(sequenceMetadata.getSequenceName())) {
publishingSequences.add(sequenceMetadata.getSequenceName());
try {
final Object result = driver.persist(committerSupplier.get());
Expand Down