-
Notifications
You must be signed in to change notification settings - Fork 467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Preventing duplicate delivery due to unacknowledged event while completing the subscription #596
Changes from all commits
8d2b501
6f24734
452dde1
b5f97fa
2432575
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -64,6 +64,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { | |
ThrowableType.ACQUIRE_TIMEOUT); | ||
private static final ThrowableCategory READ_TIMEOUT_CATEGORY = new ThrowableCategory(ThrowableType.READ_TIMEOUT); | ||
private static final int MAX_EVENT_BURST_FROM_SERVICE = 10; | ||
private static final long TIME_TO_WAIT_FOR_FINAL_ACK_MILLIS = 1000; | ||
|
||
private final KinesisAsyncClient kinesis; | ||
private final String shardId; | ||
|
@@ -82,6 +83,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { | |
private long availableQueueSpace = 0; | ||
|
||
private BlockingQueue<RecordsRetrievedContext> recordsDeliveryQueue = new LinkedBlockingQueue<>(MAX_EVENT_BURST_FROM_SERVICE); | ||
// Flag to indicate if the active subscription is being torn down. | ||
private boolean pendingActiveSubscriptionShutdown = false; | ||
|
||
@Override | ||
public void start(ExtendedSequenceNumber extendedSequenceNumber, | ||
|
@@ -132,6 +135,13 @@ public void notify(RecordsDeliveryAck recordsDeliveryAck) { | |
triggeringFlow = evictAckedEventAndScheduleNextEvent(recordsDeliveryAck); | ||
} catch (Throwable t) { | ||
errorOccurred(triggeringFlow, t); | ||
} finally { | ||
// Notify all the actors who are waiting for the records ack event. | ||
// Here, when the active subscription is being torn down, the completing thread will | ||
// wait for the last delivered records to send back the ack, to prevent sending duplicate records. | ||
if(pendingActiveSubscriptionShutdown) { | ||
lockObject.notifyAll(); | ||
} | ||
} | ||
if (triggeringFlow != null) { | ||
updateAvailableQueueSpaceAndRequestUpstream(triggeringFlow); | ||
|
@@ -160,8 +170,8 @@ RecordFlow evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliver | |
// Update the triggering flow for post scheduling upstream request. | ||
flowToBeReturned = recordsRetrievedContext.getRecordFlow(); | ||
// Try scheduling the next event in the queue, if available. | ||
if (recordsDeliveryQueue.peek() != null) { | ||
subscriber.onNext(recordsDeliveryQueue.peek().getRecordsRetrieved()); | ||
if (!recordsDeliveryQueue.isEmpty()) { | ||
scheduleNextEvent(recordsDeliveryQueue.peek().getRecordsRetrieved()); | ||
} | ||
} else { | ||
// Check if the mismatched event belongs to active flow. If publisher receives an ack for a | ||
|
@@ -194,7 +204,7 @@ void bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved recordsRetrieved, | |
recordsDeliveryQueue.add(recordsRetrievedContext); | ||
// If the current batch is the only element in the queue, then try scheduling the event delivery. | ||
if (recordsDeliveryQueue.size() == 1) { | ||
subscriber.onNext(recordsRetrieved); | ||
scheduleNextEvent(recordsRetrieved); | ||
} | ||
} catch (IllegalStateException e) { | ||
log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {} ", | ||
|
@@ -206,6 +216,14 @@ void bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved recordsRetrieved, | |
} | ||
} | ||
|
||
// This method is not thread-safe. You need to acquire a lock in the caller in order to execute this. | ||
// Schedule the next event only when the active subscription is not pending shutdown. | ||
private void scheduleNextEvent(RecordsRetrieved recordsRetrieved) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit : what about checkSubscriptionStatusAndScheduleNextEvent ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmmm we might add more checks in future and we can't keep extending the name. Hence just mentioning the core functionality. |
||
if (!pendingActiveSubscriptionShutdown) { | ||
subscriber.onNext(recordsRetrieved); | ||
} | ||
} | ||
|
||
@Data | ||
private static final class RecordsRetrievedContext { | ||
private final RecordsRetrieved recordsRetrieved; | ||
|
@@ -223,8 +241,8 @@ private boolean hasValidFlow() { | |
|
||
private void subscribeToShard(String sequenceNumber) { | ||
synchronized (lockObject) { | ||
// Clear the queue so that any stale entries from previous subscription are discarded. | ||
clearRecordsDeliveryQueue(); | ||
// Clear the delivery queue so that any stale entries from previous subscription are discarded. | ||
resetRecordsDeliveryStateOnSubscriptionOnInit(); | ||
SubscribeToShardRequest.Builder builder = KinesisRequestsBuilder.subscribeToShardRequestBuilder() | ||
.shardId(shardId).consumerARN(consumerArn); | ||
SubscribeToShardRequest request; | ||
|
@@ -263,7 +281,7 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) { | |
} | ||
|
||
// Clear the delivery buffer so that next subscription don't yield duplicate records. | ||
clearRecordsDeliveryQueue(); | ||
resetRecordsDeliveryStateOnSubscriptionShutdown(); | ||
|
||
Throwable propagationThrowable = t; | ||
ThrowableCategory category = throwableCategory(propagationThrowable); | ||
|
@@ -313,8 +331,39 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) { | |
} | ||
} | ||
|
||
private void clearRecordsDeliveryQueue() { | ||
recordsDeliveryQueue.clear(); | ||
// This method is not thread safe. This needs to be executed after acquiring lock on this.lockObject | ||
private void resetRecordsDeliveryStateOnSubscriptionOnInit() { | ||
// Clear any lingering records in the queue. | ||
if (!recordsDeliveryQueue.isEmpty()) { | ||
log.warn("{}: Found non-empty queue while starting subscription. This indicates unsuccessful clean up of" | ||
+ "previous subscription - {} ", shardId, subscribeToShardId); | ||
recordsDeliveryQueue.clear(); | ||
} | ||
if(pendingActiveSubscriptionShutdown) { | ||
log.warn("{}: Found current subscription to be in pendingShutdown state while initializing. This indicates unsuccessful clean up of" | ||
+ "previous subscription - {} ", shardId, subscribeToShardId); | ||
// Set pendingActiveSubscriptionShutdown to default value. | ||
pendingActiveSubscriptionShutdown = false; | ||
} | ||
} | ||
|
||
// This method is not thread safe. This needs to be executed after acquiring lock on this.lockObject | ||
private void resetRecordsDeliveryStateOnSubscriptionShutdown() { | ||
// Wait for final event notification during the end of the subscription. | ||
if (!recordsDeliveryQueue.isEmpty()) { | ||
// This will prevent further events from getting scheduled, during the wait period. | ||
pendingActiveSubscriptionShutdown = true; | ||
try { | ||
// Wait for the configured time to get a notification for already delivered event, if any. | ||
lockObject.wait(TIME_TO_WAIT_FOR_FINAL_ACK_MILLIS); | ||
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
} | ||
// Clear the queue to remove any remaining entries from the queue. | ||
recordsDeliveryQueue.clear(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why cant we clear the queue before waiting ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we clear before waiting, the ack received will be marked as stale and records will be duplicated. |
||
// Set pendingActiveSubscriptionShutdown to default value. | ||
pendingActiveSubscriptionShutdown = false; | ||
} | ||
} | ||
|
||
protected void logAcquireTimeoutMessage(Throwable t) { | ||
|
@@ -445,6 +494,9 @@ private void onComplete(RecordFlow triggeringFlow) { | |
synchronized (lockObject) { | ||
log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId, | ||
triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); | ||
|
||
resetRecordsDeliveryStateOnSubscriptionShutdown(); | ||
|
||
triggeringFlow.cancel(); | ||
if (!hasValidSubscriber()) { | ||
log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is critical line, worth adding comments explaining the design.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure