diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java index 53c3d715b..8aea87351 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRecordsPublisher.java @@ -64,6 +64,7 @@ public class FanOutRecordsPublisher implements RecordsPublisher { ThrowableType.ACQUIRE_TIMEOUT); private static final ThrowableCategory READ_TIMEOUT_CATEGORY = new ThrowableCategory(ThrowableType.READ_TIMEOUT); private static final int MAX_EVENT_BURST_FROM_SERVICE = 10; + private static final long TIME_TO_WAIT_FOR_FINAL_ACK_MILLIS = 1000; private final KinesisAsyncClient kinesis; private final String shardId; @@ -82,6 +83,8 @@ public class FanOutRecordsPublisher implements RecordsPublisher { private long availableQueueSpace = 0; private BlockingQueue recordsDeliveryQueue = new LinkedBlockingQueue<>(MAX_EVENT_BURST_FROM_SERVICE); + // Flag to indicate if the active subscription is being torn down. + private boolean pendingActiveSubscriptionShutdown = false; @Override public void start(ExtendedSequenceNumber extendedSequenceNumber, @@ -132,6 +135,13 @@ public void notify(RecordsDeliveryAck recordsDeliveryAck) { triggeringFlow = evictAckedEventAndScheduleNextEvent(recordsDeliveryAck); } catch (Throwable t) { errorOccurred(triggeringFlow, t); + } finally { + // Notify all the actors who are waiting for the records ack event. + // Here, when the active subscription is being torn down, the completing thread will + // wait for the last delivered records to send back the ack, to prevent sending duplicate records. + if(pendingActiveSubscriptionShutdown) { + lockObject.notifyAll(); + } } if (triggeringFlow != null) { updateAvailableQueueSpaceAndRequestUpstream(triggeringFlow); @@ -160,8 +170,8 @@ RecordFlow evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliver // Update the triggering flow for post scheduling upstream request. flowToBeReturned = recordsRetrievedContext.getRecordFlow(); // Try scheduling the next event in the queue, if available. - if (recordsDeliveryQueue.peek() != null) { - subscriber.onNext(recordsDeliveryQueue.peek().getRecordsRetrieved()); + if (!recordsDeliveryQueue.isEmpty()) { + scheduleNextEvent(recordsDeliveryQueue.peek().getRecordsRetrieved()); } } else { // Check if the mismatched event belongs to active flow. If publisher receives an ack for a @@ -194,7 +204,7 @@ void bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved recordsRetrieved, recordsDeliveryQueue.add(recordsRetrievedContext); // If the current batch is the only element in the queue, then try scheduling the event delivery. if (recordsDeliveryQueue.size() == 1) { - subscriber.onNext(recordsRetrieved); + scheduleNextEvent(recordsRetrieved); } } catch (IllegalStateException e) { log.warn("{}: Unable to enqueue the payload due to capacity restrictions in delivery queue with remaining capacity {} ", @@ -206,6 +216,14 @@ void bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved recordsRetrieved, } } + // This method is not thread-safe. You need to acquire a lock in the caller in order to execute this. + // Schedule the next event only when the active subscription is not pending shutdown. + private void scheduleNextEvent(RecordsRetrieved recordsRetrieved) { + if (!pendingActiveSubscriptionShutdown) { + subscriber.onNext(recordsRetrieved); + } + } + @Data private static final class RecordsRetrievedContext { private final RecordsRetrieved recordsRetrieved; @@ -223,8 +241,8 @@ private boolean hasValidFlow() { private void subscribeToShard(String sequenceNumber) { synchronized (lockObject) { - // Clear the queue so that any stale entries from previous subscription are discarded. - clearRecordsDeliveryQueue(); + // Clear the delivery queue so that any stale entries from previous subscription are discarded. + resetRecordsDeliveryStateOnSubscriptionOnInit(); SubscribeToShardRequest.Builder builder = KinesisRequestsBuilder.subscribeToShardRequestBuilder() .shardId(shardId).consumerARN(consumerArn); SubscribeToShardRequest request; @@ -263,7 +281,7 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) { } // Clear the delivery buffer so that next subscription don't yield duplicate records. - clearRecordsDeliveryQueue(); + resetRecordsDeliveryStateOnSubscriptionShutdown(); Throwable propagationThrowable = t; ThrowableCategory category = throwableCategory(propagationThrowable); @@ -313,8 +331,39 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) { } } - private void clearRecordsDeliveryQueue() { - recordsDeliveryQueue.clear(); + // This method is not thread safe. This needs to be executed after acquiring lock on this.lockObject + private void resetRecordsDeliveryStateOnSubscriptionOnInit() { + // Clear any lingering records in the queue. + if (!recordsDeliveryQueue.isEmpty()) { + log.warn("{}: Found non-empty queue while starting subscription. This indicates unsuccessful clean up of" + + "previous subscription - {} ", shardId, subscribeToShardId); + recordsDeliveryQueue.clear(); + } + if(pendingActiveSubscriptionShutdown) { + log.warn("{}: Found current subscription to be in pendingShutdown state while initializing. This indicates unsuccessful clean up of" + + "previous subscription - {} ", shardId, subscribeToShardId); + // Set pendingActiveSubscriptionShutdown to default value. + pendingActiveSubscriptionShutdown = false; + } + } + + // This method is not thread safe. This needs to be executed after acquiring lock on this.lockObject + private void resetRecordsDeliveryStateOnSubscriptionShutdown() { + // Wait for final event notification during the end of the subscription. + if (!recordsDeliveryQueue.isEmpty()) { + // This will prevent further events from getting scheduled, during the wait period. + pendingActiveSubscriptionShutdown = true; + try { + // Wait for the configured time to get a notification for already delivered event, if any. + lockObject.wait(TIME_TO_WAIT_FOR_FINAL_ACK_MILLIS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + // Clear the queue to remove any remaining entries from the queue. + recordsDeliveryQueue.clear(); + // Set pendingActiveSubscriptionShutdown to default value. + pendingActiveSubscriptionShutdown = false; + } } protected void logAcquireTimeoutMessage(Throwable t) { @@ -445,6 +494,9 @@ private void onComplete(RecordFlow triggeringFlow) { synchronized (lockObject) { log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId, triggeringFlow.connectionStartedAt, triggeringFlow.subscribeToShardId); + + resetRecordsDeliveryStateOnSubscriptionShutdown(); + triggeringFlow.cancel(); if (!hasValidSubscriber()) { log.debug("{}: [SubscriptionLifetime]: (FanOutRecordsPublisher#onComplete) @ {} id: {}", shardId,