From 8d2b50157da9996adb9720a388f8b9c7d05b5a56 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Fri, 16 Aug 2019 17:23:08 -0700 Subject: [PATCH 1/5] Preventing duplicate delivery due to unacknowledged event while completing the subscription --- .../fanout/FanOutRecordsPublisher.java | 35 ++++++++++++++++--- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 53c3d715b..b9b0b7732 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -64,6 +64,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { ThrowableType.ACQUIRE_TIMEOUT); private static final ThrowableCategory READ_TIMEOUT_CATEGORY = new ThrowableCategory(ThrowableType.READ_TIMEOUT); private static final int MAX_EVENT_BURST_FROM_SERVICE = 10; + private static final long TIME_TO_WAIT_FOR_FINAL_ACK_MILLIS = 1000; private final KinesisAsyncClient kinesis; private final String shardId; @@ -82,6 +83,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private long availableQueueSpace = 0; private BlockingQueue recordsDeliveryQueue = new LinkedBlockingQueue<>(MAX_EVENT_BURST_FROM_SERVICE); + private boolean isAwaitingFinalAckForCurrentSubscription = false; @Override public void start(ExtendedSequenceNumber extendedSequenceNumber, @@ -132,6 +134,10 @@ public void notify(RecordsDeliveryAck recordsDeliveryAck) { triggeringFlow = evictAckedEventAndScheduleNextEvent(recordsDeliveryAck); } catch (Throwable t) { errorOccurred(triggeringFlow, t); + } finally { + if(isAwaitingFinalAckForCurrentSubscription) { + lockObject.notifyAll(); + } } if (triggeringFlow != null) { updateAvailableQueueSpaceAndRequestUpstream(triggeringFlow); @@ -160,7 +166,7 @@ RecordFlow evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliver // Update the triggering flow for post scheduling upstream request. flowToBeReturned = recordsRetrievedContext.getRecordFlow(); // Try scheduling the next event in the queue, if available. - if (recordsDeliveryQueue.peek() != null) { + if (!isAwaitingFinalAckForCurrentSubscription && recordsDeliveryQueue.peek() != null) { subscriber.onNext(recordsDeliveryQueue.peek().getRecordsRetrieved()); } } else { @@ -193,7 +199,7 @@ void bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved recordsRetrieved, // Note: This does not block wait to enqueue. recordsDeliveryQueue.add(recordsRetrievedContext); // If the current batch is the only element in the queue, then try scheduling the event delivery. - if (recordsDeliveryQueue.size() == 1) { + if (!isAwaitingFinalAckForCurrentSubscription && recordsDeliveryQueue.size() == 1) { subscriber.onNext(recordsRetrieved); } } catch (IllegalStateException e) { @@ -224,7 +230,7 @@ private boolean hasValidFlow() { private void subscribeToShard(String sequenceNumber) { synchronized (lockObject) { // Clear the queue so that any stale entries from previous subscription are discarded. - clearRecordsDeliveryQueue(); + clearRecordsDeliveryQueue(false); SubscribeToShardRequest.Builder builder = KinesisRequestsBuilder.subscribeToShardRequestBuilder() .shardId(shardId).consumerARN(consumerArn); SubscribeToShardRequest request; @@ -263,7 +269,7 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) { } // Clear the delivery buffer so that next subscription don't yield duplicate records. - clearRecordsDeliveryQueue(); + clearRecordsDeliveryQueue(true); Throwable propagationThrowable = t; ThrowableCategory category = throwableCategory(propagationThrowable); @@ -313,8 +319,24 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) { } } - private void clearRecordsDeliveryQueue() { + // This method is not thread safe. This needs to be executed after acquiring lock on this.lockObject + private void clearRecordsDeliveryQueue(boolean isSubscriptionCompleting) { + if(isSubscriptionCompleting) { + // This will prevent further events from getting scheduled + isAwaitingFinalAckForCurrentSubscription = true; + try { + if (!recordsDeliveryQueue.isEmpty()) { + // Wait for the configured time to get a notification for already delivered event, if any. + lockObject.wait(TIME_TO_WAIT_FOR_FINAL_ACK_MILLIS); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + // Clear the queue to remove any remaining entries from the queue. recordsDeliveryQueue.clear(); + // Set isAwaitingFinalAckForCurrentSubscription to default value. + isAwaitingFinalAckForCurrentSubscription = false; } protected void logAcquireTimeoutMessage(Throwable t) { @@ -445,6 +467,9 @@ private void onComplete(RecordFlow triggeringFlow) { synchronized (lockObject) { log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); + + clearRecordsDeliveryQueue(true); + triggeringFlow.cancel(); if (!hasValidSubscriber()) { log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId, From 6f24734c9737ae0d581a13e0d2d77a9f29ec5cde Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 19 Aug 2019 10:37:33 -0700 Subject: [PATCH 2/5] Refactored clearRecordsDeliveryQueue logic and added comments --- .../retrieval/fanout/FanOutRecordsPublisher.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index b9b0b7732..c246950a4 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -321,14 +321,18 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) { // This method is not thread safe. This needs to be executed after acquiring lock on this.lockObject private void clearRecordsDeliveryQueue(boolean isSubscriptionCompleting) { + // If the queue is empty, we can immediately return without any further action. + if (recordsDeliveryQueue.isEmpty()) { + return; + } + + // Wait for final event notification during the end of the subscription. if(isSubscriptionCompleting) { - // This will prevent further events from getting scheduled + // This will prevent further events from getting scheduled, during the wait period. isAwaitingFinalAckForCurrentSubscription = true; try { - if (!recordsDeliveryQueue.isEmpty()) { - // Wait for the configured time to get a notification for already delivered event, if any. - lockObject.wait(TIME_TO_WAIT_FOR_FINAL_ACK_MILLIS); - } + // Wait for the configured time to get a notification for already delivered event, if any. + lockObject.wait(TIME_TO_WAIT_FOR_FINAL_ACK_MILLIS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } From 452dde1673a3fe502327e632fb9e5905bc73d304 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 19 Aug 2019 11:22:38 -0700 Subject: [PATCH 3/5] Code refactoring as per review comments --- .../fanout/FanOutRecordsPublisher.java | 57 ++++++++++++------- 1 file changed, 37 insertions(+), 20 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index c246950a4..8aa4aa45c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -83,7 +83,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private long availableQueueSpace = 0; private BlockingQueue recordsDeliveryQueue = new LinkedBlockingQueue<>(MAX_EVENT_BURST_FROM_SERVICE); - private boolean isAwaitingFinalAckForCurrentSubscription = false; + // Flag to indicate if the active subscription is being torn down. + private boolean pendingActiveSubscriptionShutdown = false; @Override public void start(ExtendedSequenceNumber extendedSequenceNumber, @@ -135,7 +136,10 @@ public void notify(RecordsDeliveryAck recordsDeliveryAck) { } catch (Throwable t) { errorOccurred(triggeringFlow, t); } finally { - if(isAwaitingFinalAckForCurrentSubscription) { + // Notify all the actors who are waiting for the records ack event. + // Here, when the active subscription is being torn down, the completing thread will + // wait for the last delivered records to send back the ack, to prevent sending duplicate records. + if(pendingActiveSubscriptionShutdown) { lockObject.notifyAll(); } } @@ -166,8 +170,8 @@ RecordFlow evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliver // Update the triggering flow for post scheduling upstream request. flowToBeReturned = recordsRetrievedContext.getRecordFlow(); // Try scheduling the next event in the queue, if available. - if (!isAwaitingFinalAckForCurrentSubscription && recordsDeliveryQueue.peek() != null) { - subscriber.onNext(recordsDeliveryQueue.peek().getRecordsRetrieved()); + if (recordsDeliveryQueue.peek() != null) { + scheduleNextEvent(recordsDeliveryQueue.peek().getRecordsRetrieved()); } } else { // Check if the mismatched event belongs to active flow. If publisher receives an ack for a @@ -199,8 +203,8 @@ void bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved recordsRetrieved, // Note: This does not block wait to enqueue. recordsDeliveryQueue.add(recordsRetrievedContext); // If the current batch is the only element in the queue, then try scheduling the event delivery. - if (!isAwaitingFinalAckForCurrentSubscription && recordsDeliveryQueue.size() == 1) { - subscriber.onNext(recordsRetrieved); + if (recordsDeliveryQueue.size() == 1) { + scheduleNextEvent(recordsRetrieved); } } catch (IllegalStateException e) { log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {} ", @@ -212,6 +216,14 @@ void bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved recordsRetrieved, } } + // This method is not thread-safe. You need to acquire a lock in the caller in order to execute this. + // Schedule the next event only when the active subscription is not pending shutdown. + private void scheduleNextEvent(RecordsRetrieved recordsRetrieved) { + if (!pendingActiveSubscriptionShutdown) { + subscriber.onNext(recordsRetrieved); + } + } + @Data private static final class RecordsRetrievedContext { private final RecordsRetrieved recordsRetrieved; @@ -229,8 +241,8 @@ private boolean hasValidFlow() { private void subscribeToShard(String sequenceNumber) { synchronized (lockObject) { - // Clear the queue so that any stale entries from previous subscription are discarded. - clearRecordsDeliveryQueue(false); + // Clear the delivery queue so that any stale entries from previous subscription are discarded. + resetRecordsDeliveryStateOnSubscriptionInit(); SubscribeToShardRequest.Builder builder = KinesisRequestsBuilder.subscribeToShardRequestBuilder() .shardId(shardId).consumerARN(consumerArn); SubscribeToShardRequest request; @@ -269,7 +281,7 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) { } // Clear the delivery buffer so that next subscription don't yield duplicate records. - clearRecordsDeliveryQueue(true); + resetRecordsDeliveryStateOnSubscriptionShutdown(); Throwable propagationThrowable = t; ThrowableCategory category = throwableCategory(propagationThrowable); @@ -320,27 +332,32 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) { } // This method is not thread safe. This needs to be executed after acquiring lock on this.lockObject - private void clearRecordsDeliveryQueue(boolean isSubscriptionCompleting) { - // If the queue is empty, we can immediately return without any further action. - if (recordsDeliveryQueue.isEmpty()) { - return; + private void resetRecordsDeliveryStateOnSubscriptionInit() { + // Clear any lingering records in the queue. + if (!recordsDeliveryQueue.isEmpty()) { + recordsDeliveryQueue.clear(); } + // Set pendingActiveSubscriptionShutdown to default value. + pendingActiveSubscriptionShutdown = false; + } + // This method is not thread safe. This needs to be executed after acquiring lock on this.lockObject + private void resetRecordsDeliveryStateOnSubscriptionShutdown() { // Wait for final event notification during the end of the subscription. - if(isSubscriptionCompleting) { + if (!recordsDeliveryQueue.isEmpty()) { // This will prevent further events from getting scheduled, during the wait period. - isAwaitingFinalAckForCurrentSubscription = true; + pendingActiveSubscriptionShutdown = true; try { // Wait for the configured time to get a notification for already delivered event, if any. lockObject.wait(TIME_TO_WAIT_FOR_FINAL_ACK_MILLIS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } + // Clear the queue to remove any remaining entries from the queue. + recordsDeliveryQueue.clear(); + // Set pendingActiveSubscriptionShutdown to default value. + pendingActiveSubscriptionShutdown = false; } - // Clear the queue to remove any remaining entries from the queue. - recordsDeliveryQueue.clear(); - // Set isAwaitingFinalAckForCurrentSubscription to default value. - isAwaitingFinalAckForCurrentSubscription = false; } protected void logAcquireTimeoutMessage(Throwable t) { @@ -472,7 +489,7 @@ private void onComplete(RecordFlow triggeringFlow) { log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); - clearRecordsDeliveryQueue(true); + resetRecordsDeliveryStateOnSubscriptionShutdown(); triggeringFlow.cancel(); if (!hasValidSubscriber()) { From b5f97fa76231756700cc0e65fccf53b514e2e890 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 19 Aug 2019 12:43:29 -0700 Subject: [PATCH 4/5] Nit fix --- .../amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 8aa4aa45c..c97a74a6a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -170,7 +170,7 @@ RecordFlow evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliver // Update the triggering flow for post scheduling upstream request. flowToBeReturned = recordsRetrievedContext.getRecordFlow(); // Try scheduling the next event in the queue, if available. - if (recordsDeliveryQueue.peek() != null) { + if (!recordsDeliveryQueue.isEmpty()) { scheduleNextEvent(recordsDeliveryQueue.peek().getRecordsRetrieved()); } } else { From 2432575f36861f60b403feeb6626e7e88a318f8f Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 19 Aug 2019 14:25:16 -0700 Subject: [PATCH 5/5] Add logging to unexpected subscription state scenario --- .../retrieval/fanout/FanOutRecordsPublisher.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index c97a74a6a..8aea87351 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -242,7 +242,7 @@ private boolean hasValidFlow() { private void subscribeToShard(String sequenceNumber) { synchronized (lockObject) { // Clear the delivery queue so that any stale entries from previous subscription are discarded. - resetRecordsDeliveryStateOnSubscriptionInit(); + resetRecordsDeliveryStateOnSubscriptionOnInit(); SubscribeToShardRequest.Builder builder = KinesisRequestsBuilder.subscribeToShardRequestBuilder() .shardId(shardId).consumerARN(consumerArn); SubscribeToShardRequest request; @@ -332,13 +332,19 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) { } // This method is not thread safe. This needs to be executed after acquiring lock on this.lockObject - private void resetRecordsDeliveryStateOnSubscriptionInit() { + private void resetRecordsDeliveryStateOnSubscriptionOnInit() { // Clear any lingering records in the queue. if (!recordsDeliveryQueue.isEmpty()) { + log.warn("{}: Found non-empty queue while starting subscription. This indicates unsuccessful clean up of" + + "previous subscription - {} ", shardId, subscribeToShardId); recordsDeliveryQueue.clear(); } - // Set pendingActiveSubscriptionShutdown to default value. - pendingActiveSubscriptionShutdown = false; + if(pendingActiveSubscriptionShutdown) { + log.warn("{}: Found current subscription to be in pendingShutdown state while initializing. This indicates unsuccessful clean up of" + + "previous subscription - {} ", shardId, subscribeToShardId); + // Set pendingActiveSubscriptionShutdown to default value. + pendingActiveSubscriptionShutdown = false; + } } // This method is not thread safe. This needs to be executed after acquiring lock on this.lockObject