-
Notifications
You must be signed in to change notification settings - Fork 467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Preventing duplicate delivery due to unacknowledged event while completing the subscription #596
Changes from 1 commit
8d2b501
6f24734
452dde1
b5f97fa
2432575
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -64,6 +64,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { | |
ThrowableType.ACQUIRE_TIMEOUT); | ||
private static final ThrowableCategory READ_TIMEOUT_CATEGORY = new ThrowableCategory(ThrowableType.READ_TIMEOUT); | ||
private static final int MAX_EVENT_BURST_FROM_SERVICE = 10; | ||
private static final long TIME_TO_WAIT_FOR_FINAL_ACK_MILLIS = 1000; | ||
|
||
private final KinesisAsyncClient kinesis; | ||
private final String shardId; | ||
|
@@ -82,6 +83,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { | |
private long availableQueueSpace = 0; | ||
|
||
private BlockingQueue<RecordsRetrievedContext> recordsDeliveryQueue = new LinkedBlockingQueue<>(MAX_EVENT_BURST_FROM_SERVICE); | ||
private boolean isAwaitingFinalAckForCurrentSubscription = false; | ||
|
||
@Override | ||
public void start(ExtendedSequenceNumber extendedSequenceNumber, | ||
|
@@ -132,6 +134,10 @@ public void notify(RecordsDeliveryAck recordsDeliveryAck) { | |
triggeringFlow = evictAckedEventAndScheduleNextEvent(recordsDeliveryAck); | ||
} catch (Throwable t) { | ||
errorOccurred(triggeringFlow, t); | ||
} finally { | ||
if(isAwaitingFinalAckForCurrentSubscription) { | ||
lockObject.notifyAll(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is critical line, worth adding comments explaining the design. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure |
||
} | ||
} | ||
if (triggeringFlow != null) { | ||
updateAvailableQueueSpaceAndRequestUpstream(triggeringFlow); | ||
|
@@ -160,7 +166,7 @@ RecordFlow evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliver | |
// Update the triggering flow for post scheduling upstream request. | ||
flowToBeReturned = recordsRetrievedContext.getRecordFlow(); | ||
// Try scheduling the next event in the queue, if available. | ||
if (recordsDeliveryQueue.peek() != null) { | ||
if (!isAwaitingFinalAckForCurrentSubscription && recordsDeliveryQueue.peek() != null) { | ||
subscriber.onNext(recordsDeliveryQueue.peek().getRecordsRetrieved()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. given we schedule the next event under two different conditions (here and in bufferCurrentEventAndScheduleIfRequired method) , and both required isShuttingDown check, let's move out the schedule call it its own method, with the check. And add comment on why the check is necessary. i.e. scheduleNextEvent(event) {if !pendingShutdown; then subscriber.onNext(event)} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ya. makes sense |
||
} | ||
} else { | ||
|
@@ -193,7 +199,7 @@ void bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved recordsRetrieved, | |
// Note: This does not block wait to enqueue. | ||
recordsDeliveryQueue.add(recordsRetrievedContext); | ||
// If the current batch is the only element in the queue, then try scheduling the event delivery. | ||
if (recordsDeliveryQueue.size() == 1) { | ||
if (!isAwaitingFinalAckForCurrentSubscription && recordsDeliveryQueue.size() == 1) { | ||
subscriber.onNext(recordsRetrieved); | ||
} | ||
} catch (IllegalStateException e) { | ||
|
@@ -224,7 +230,7 @@ private boolean hasValidFlow() { | |
private void subscribeToShard(String sequenceNumber) { | ||
synchronized (lockObject) { | ||
// Clear the queue so that any stale entries from previous subscription are discarded. | ||
clearRecordsDeliveryQueue(); | ||
clearRecordsDeliveryQueue(false); | ||
SubscribeToShardRequest.Builder builder = KinesisRequestsBuilder.subscribeToShardRequestBuilder() | ||
.shardId(shardId).consumerARN(consumerArn); | ||
SubscribeToShardRequest request; | ||
|
@@ -263,7 +269,7 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) { | |
} | ||
|
||
// Clear the delivery buffer so that next subscription don't yield duplicate records. | ||
clearRecordsDeliveryQueue(); | ||
clearRecordsDeliveryQueue(true); | ||
|
||
Throwable propagationThrowable = t; | ||
ThrowableCategory category = throwableCategory(propagationThrowable); | ||
|
@@ -313,8 +319,24 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) { | |
} | ||
} | ||
|
||
private void clearRecordsDeliveryQueue() { | ||
// This method is not thread safe. This needs to be executed after acquiring lock on this.lockObject | ||
private void clearRecordsDeliveryQueue(boolean isSubscriptionCompleting) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method seems to be no-op if the delivery q is empty. Just check that in the beginning and return and continue if not. Below seems more easier to read as we just clear the queue immediately and then wait instead of doing the opposite since setting isAwaitingFinalAckForCurrentSubscription to true has anyways stopped events from getting scheduled. Also we can update the method name to say clearRecordsDeliveryQueueAndWaitForSubscriberNotification() or something with more context
// Set isAwaitingFinalAckForCurrentSubscription to default value. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ack. Will make changes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Couple comment on this method: Why it's necessary to introduce split-brain logic here? Do we expect queue to be full, by the time subscribeToShard call is made? That invocation seems to be backup plan. We can either pull that out to its own method, or just let the start of a new subscription wait as well. isAwaitingFinalAck variable is tightly couple with the queue status, why not update that within the if? i.e. if !queue.isEmpty -> then set the variable, and lock.wait() There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
if(isSubscriptionCompleting) { | ||
// This will prevent further events from getting scheduled | ||
isAwaitingFinalAckForCurrentSubscription = true; | ||
try { | ||
if (!recordsDeliveryQueue.isEmpty()) { | ||
// Wait for the configured time to get a notification for already delivered event, if any. | ||
lockObject.wait(TIME_TO_WAIT_FOR_FINAL_ACK_MILLIS); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we not waiting if the delivery queue is empty. There might still be a event in which was already delivered? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. when the delivery queue is empty then it is guaranteed that there will be no valid in-flight event. |
||
} | ||
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
} | ||
} | ||
// Clear the queue to remove any remaining entries from the queue. | ||
recordsDeliveryQueue.clear(); | ||
// Set isAwaitingFinalAckForCurrentSubscription to default value. | ||
isAwaitingFinalAckForCurrentSubscription = false; | ||
} | ||
|
||
protected void logAcquireTimeoutMessage(Throwable t) { | ||
|
@@ -445,6 +467,9 @@ private void onComplete(RecordFlow triggeringFlow) { | |
synchronized (lockObject) { | ||
log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId, | ||
triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); | ||
|
||
clearRecordsDeliveryQueue(true); | ||
|
||
triggeringFlow.cancel(); | ||
if (!hasValidSubscriber()) { | ||
log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about introducing this variable as a more generic status indication boolean, i.e. pendingShutdown.
Waiting to get the ACK for the last submitted batch is current design, that flips the pendingShutdown status. We can later wait to drain all events in the queue etc..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pendingShutdown might be misleading, as we are not letting know what is being shutdown. Do you think pendingCurrentSubscriptionShutdown or pendingActiveSubscriptionShutdown would be better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, either one is good.