Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preventing duplicate delivery due to unacknowledged event while completing the subscription #596

Merged
merged 5 commits into from
Aug 19, 2019

Conversation

ashwing
Copy link
Contributor

@ashwing ashwing commented Aug 17, 2019

Preventing duplicate delivery due to unacknowledged event while completing the subscription

Issue #, if available:

Description of changes:

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@@ -313,8 +319,24 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) {
}
}

private void clearRecordsDeliveryQueue() {
// This method is not thread safe. This needs to be executed after acquiring lock on this.lockObject
private void clearRecordsDeliveryQueue(boolean isSubscriptionCompleting) {
Copy link
Contributor

@yatins47 yatins47 Aug 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method seems to be no-op if the delivery q is empty. Just check that in the beginning and return and continue if not. Below seems more easier to read as we just clear the queue immediately and then wait instead of doing the opposite since setting isAwaitingFinalAckForCurrentSubscription to true has anyways stopped events from getting scheduled. Also we can update the method name to say clearRecordsDeliveryQueueAndWaitForSubscriberNotification() or something with more context

if (recordsDeliveryQueue.isEmpty()) {
  return;
}
// This will prevent further events from getting scheduled
isAwaitingFinalAckForCurrentSubscription = true;
recordsDeliveryQueue.clear();
if(isSubscriptionCompleting) {
        try {
            // Wait for the configured time to get a notification for already delivered event, if any.
            lockObject.wait(TIME_TO_WAIT_FOR_FINAL_ACK_MILLIS);
        
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

// Set isAwaitingFinalAckForCurrentSubscription to default value.
isAwaitingFinalAckForCurrentSubscription = false;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Will make changes

try {
if (!recordsDeliveryQueue.isEmpty()) {
// Wait for the configured time to get a notification for already delivered event, if any.
lockObject.wait(TIME_TO_WAIT_FOR_FINAL_ACK_MILLIS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we not waiting if the delivery queue is empty. There might still be a event in which was already delivered?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when the delivery queue is empty then it is guaranteed that there will be no valid in-flight event.

@@ -132,6 +134,10 @@ public void notify(RecordsDeliveryAck recordsDeliveryAck) {
triggeringFlow = evictAckedEventAndScheduleNextEvent(recordsDeliveryAck);
} catch (Throwable t) {
errorOccurred(triggeringFlow, t);
} finally {
if(isAwaitingFinalAckForCurrentSubscription) {
lockObject.notifyAll();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is critical line, worth adding comments explaining the design.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

@@ -160,7 +166,7 @@ RecordFlow evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliver
// Update the triggering flow for post scheduling upstream request.
flowToBeReturned = recordsRetrievedContext.getRecordFlow();
// Try scheduling the next event in the queue, if available.
if (recordsDeliveryQueue.peek() != null) {
if (!isAwaitingFinalAckForCurrentSubscription && recordsDeliveryQueue.peek() != null) {
subscriber.onNext(recordsDeliveryQueue.peek().getRecordsRetrieved());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given we schedule the next event under two different conditions (here and in bufferCurrentEventAndScheduleIfRequired method) , and both required isShuttingDown check, let's move out the schedule call it its own method, with the check. And add comment on why the check is necessary.

i.e. scheduleNextEvent(event) {if !pendingShutdown; then subscriber.onNext(event)}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya. makes sense

@@ -313,8 +319,24 @@ private void errorOccurred(RecordFlow triggeringFlow, Throwable t) {
}
}

private void clearRecordsDeliveryQueue() {
// This method is not thread safe. This needs to be executed after acquiring lock on this.lockObject
private void clearRecordsDeliveryQueue(boolean isSubscriptionCompleting) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple comment on this method:

Why it's necessary to introduce split-brain logic here? Do we expect queue to be full, by the time subscribeToShard call is made? That invocation seems to be backup plan. We can either pull that out to its own method, or just let the start of a new subscription wait as well.

isAwaitingFinalAck variable is tightly couple with the queue status, why not update that within the if? i.e. if !queue.isEmpty -> then set the variable, and lock.wait()

Copy link
Contributor Author

@ashwing ashwing Aug 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect queue to be full, by the time subscribeToShard call is made? - Ideally no, just for precaution.
We can either pull that out to its own method - would be preferred instead of waiting
isAwaitingFinalAck variable is tightly couple with the queue status, why not update that within the if? - Updated as part of yatins comment

@@ -82,6 +83,7 @@
private long availableQueueSpace = 0;

private BlockingQueue<RecordsRetrievedContext> recordsDeliveryQueue = new LinkedBlockingQueue<>(MAX_EVENT_BURST_FROM_SERVICE);
private boolean isAwaitingFinalAckForCurrentSubscription = false;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about introducing this variable as a more generic status indication boolean, i.e. pendingShutdown.

Waiting to get the ACK for the last submitted batch is current design, that flips the pendingShutdown status. We can later wait to drain all events in the queue etc..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pendingShutdown might be misleading, as we are not letting know what is being shutdown. Do you think pendingCurrentSubscriptionShutdown or pendingActiveSubscriptionShutdown would be better?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, either one is good.

@@ -166,8 +170,8 @@ RecordFlow evictAckedEventAndScheduleNextEvent(RecordsDeliveryAck recordsDeliver
// Update the triggering flow for post scheduling upstream request.
flowToBeReturned = recordsRetrievedContext.getRecordFlow();
// Try scheduling the next event in the queue, if available.
if (!isAwaitingFinalAckForCurrentSubscription && recordsDeliveryQueue.peek() != null) {
subscriber.onNext(recordsDeliveryQueue.peek().getRecordsRetrieved());
if (recordsDeliveryQueue.peek() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check with isEmpty() ?

@@ -212,6 +216,14 @@ void bufferCurrentEventAndScheduleIfRequired(RecordsRetrieved recordsRetrieved,
}
}

// This method is not thread-safe. You need to acquire a lock in the caller in order to execute this.
// Schedule the next event only when the active subscription is not pending shutdown.
private void scheduleNextEvent(RecordsRetrieved recordsRetrieved) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : what about checkSubscriptionStatusAndScheduleNextEvent ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm we might add more checks in future and we can't keep extending the name. Hence just mentioning the core functionality.

try {
// Wait for the configured time to get a notification for already delivered event, if any.
lockObject.wait(TIME_TO_WAIT_FOR_FINAL_ACK_MILLIS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// Clear the queue to remove any remaining entries from the queue.
recordsDeliveryQueue.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why cant we clear the queue before waiting ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we clear before waiting, the ack received will be marked as stale and records will be duplicated.

Copy link
Contributor

@yatins47 yatins47 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

recordsDeliveryQueue.clear();
}
// Set pendingActiveSubscriptionShutdown to default value.
pendingActiveSubscriptionShutdown = false;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log warning if pendingActiveSubscriptionShutdown is found true, or there are records in the queue?

Either of these cases points to something not working as expected, and logging status under such conditions can help us for RC'ing issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, they will be informational.

Copy link

@yasemin-amzn yasemin-amzn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

@micah-jaffe micah-jaffe merged commit a17d145 into awslabs:master Aug 19, 2019
@micah-jaffe micah-jaffe added this to the v2.2.2 milestone Aug 19, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants