Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to prevent the onNext event going to stale subscription when restart happens in poller #606

Merged
merged 5 commits into from
Sep 19, 2019

Conversation

ashwing
Copy link
Contributor

@ashwing ashwing commented Sep 9, 2019

Fix to prevent the onNext event going to stale subscription when restart happens in poller

Issue #, if available:

Description of changes:

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@@ -216,12 +216,15 @@ public void restartFrom(RecordsRetrieved recordsRetrieved) {
PrefetchRecordsRetrieved prefetchRecordsRetrieved = (PrefetchRecordsRetrieved) recordsRetrieved;
resetLock.writeLock().lock();
try {
// Reset the demand from ShardConsumer, to prevent this publisher from delivering events to stale RX-Java
// Subscriber. Publishing will be unblocked when the demand is communicated by the new Rx-Java subscriber.
requestedResponses.set(0);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this class keep track of so much state, and they all need to be reset at restartFrom call. I'm wondering if pulling out the state tracking variable in one common place, and call reset on that one to start with clean state helps longer term maintainability of this class. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Updated.

…l shifting logic for publishing with monitor based control
Copy link

@yasemin-amzn yasemin-amzn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot Ashwin. I think this approach did end up simplifying the original implementation. Leaving minor comments to build more onto the new construct.

}

// Take action on successful event delivery.
RecordsRetrieved eventDeliveredAction(String shardId) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: handleEventDeliveryNotification()

@@ -181,23 +223,13 @@ private void throwOnIllegalState() {

private RecordsRetrieved peekNextResult() {
throwOnIllegalState();
final PrefetchRecordsRetrieved result = getRecordsResultQueue.peek();
return result == null ? result : result.prepareForPublish();
return publisherSession.prefetchRecordsQueue().peek();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to abstract away the queue within the session. i.e. publisherSession.peekNext()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -257,16 +277,9 @@ public synchronized void notify(RecordsDeliveryAck recordsDeliveryAck) {
// Verify if the ack matches the head of the queue and evict it.
if (recordsToCheck != null && recordsToCheck.batchUniqueIdentifier()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this if/else logic moved into session? i.e. handleRecordsDeliveryAck(RecordsDeliveryAck ack)

I see we're checking for illegalState in evictPublishedEvent, which we can do before calling handleRecordsDeliveryAck()

This will simplify the code a bit, will be no need to have methods/access to the queue directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -404,7 +404,7 @@ public void run() {

private void makeRetrievalAttempt() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something to consider: does this method fit better into session now -- there will be direct access to counters, and waiting etc.
Only when retrievalAttempt call to session returns success, then PrefetchRecordsPublisher would call drainQueue.

Copy link

@yasemin-amzn yasemin-amzn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leaving minor feedback to address before pushing out. thank you!

}

@VisibleForTesting
RecordsRetrieved pollNextResultAndUpdatePrefetchCounters() {
RecordsRetrieved evictPublishedEvent() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seem to be used only in tests after the refactoring. Let's have the tests use session, or have this helper use session directly, without any other actions.

}

@Override
public void cancel() {
requestedResponses.set(0);
publisherSession.requestedResponses().set(0);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like it should be resetting, instead of setting demand to 0. We already discussed this offline, will be good to leave a comment.

@micah-jaffe micah-jaffe merged commit 3fead19 into awslabs:master Sep 19, 2019
@ashwing ashwing mentioned this pull request Sep 23, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants