Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent multiple attempts to publish segments for the same sequence #14995

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ public enum Status
private final String stream;

private final Set<String> publishingSequences = Sets.newConcurrentHashSet();
private final Set<String> publishedSequences = Sets.newConcurrentHashSet();
private final List<ListenableFuture<SegmentsAndCommitMetadata>> publishWaitList = new ArrayList<>();
private final List<ListenableFuture<SegmentsAndCommitMetadata>> handOffWaitList = new ArrayList<>();

Expand Down Expand Up @@ -806,7 +807,8 @@ public void onFailure(Throwable t)
List<SequenceMetadata<PartitionIdType, SequenceOffsetType>> sequencesSnapshot = new ArrayList<>(sequences);
for (int i = 0; i < sequencesSnapshot.size(); i++) {
final SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceMetadata = sequencesSnapshot.get(i);
if (!publishingSequences.contains(sequenceMetadata.getSequenceName())) {
if (!publishingSequences.contains(sequenceMetadata.getSequenceName())
&& !publishedSequences.contains(sequenceMetadata.getSequenceName())) {
final boolean isLast = i == (sequencesSnapshot.size() - 1);
if (isLast) {
// Shorten endOffsets of the last sequence to match currOffsets.
Expand Down Expand Up @@ -1009,6 +1011,7 @@ public void onSuccess(SegmentsAndCommitMetadata publishedSegmentsAndCommitMetada
);
log.infoSegments(publishedSegmentsAndCommitMetadata.getSegments(), "Published segments");

publishedSequences.add(sequenceMetadata.getSequenceName());
sequences.remove(sequenceMetadata);
publishingSequences.remove(sequenceMetadata.getSequenceName());

Expand Down Expand Up @@ -1157,7 +1160,9 @@ private void maybePersistAndPublishSequences(Supplier<Committer> committerSuppli
{
for (SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceMetadata : sequences) {
sequenceMetadata.updateAssignments(currOffsets, this::isMoreToReadBeforeReadingRecord);
if (!sequenceMetadata.isOpen() && !publishingSequences.contains(sequenceMetadata.getSequenceName())) {
if (!sequenceMetadata.isOpen() &&
!publishingSequences.contains(sequenceMetadata.getSequenceName())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: symmetry with the next condition.

Suggested change
if (!sequenceMetadata.isOpen() &&
!publishingSequences.contains(sequenceMetadata.getSequenceName())
if (!sequenceMetadata.isOpen()
&& !publishingSequences.contains(sequenceMetadata.getSequenceName())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

&& !publishedSequences.contains(sequenceMetadata.getSequenceName())) {
publishingSequences.add(sequenceMetadata.getSequenceName());
try {
final Object result = driver.persist(committerSupplier.get());
Expand Down