-
Notifications
You must be signed in to change notification settings - Fork 467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix to prevent the onNext event going to stale subscription when restart happens in poller #606
Conversation
…art happens in poller
@@ -216,12 +216,15 @@ public void restartFrom(RecordsRetrieved recordsRetrieved) { | |||
PrefetchRecordsRetrieved prefetchRecordsRetrieved = (PrefetchRecordsRetrieved) recordsRetrieved; | |||
resetLock.writeLock().lock(); | |||
try { | |||
// Reset the demand from ShardConsumer, to prevent this publisher from delivering events to stale RX-Java | |||
// Subscriber. Publishing will be unblocked when the demand is communicated by the new Rx-Java subscriber. | |||
requestedResponses.set(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this class keep track of so much state, and they all need to be reset at restartFrom call. I'm wondering if pulling out the state tracking variable in one common place, and call reset on that one to start with clean state helps longer term maintainability of this class. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. Updated.
…l shifting logic for publishing with monitor based control
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot Ashwin. I think this approach did end up simplifying the original implementation. Leaving minor comments to build more onto the new construct.
} | ||
|
||
// Take action on successful event delivery. | ||
RecordsRetrieved eventDeliveredAction(String shardId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: handleEventDeliveryNotification()
@@ -181,23 +223,13 @@ private void throwOnIllegalState() { | |||
|
|||
private RecordsRetrieved peekNextResult() { | |||
throwOnIllegalState(); | |||
final PrefetchRecordsRetrieved result = getRecordsResultQueue.peek(); | |||
return result == null ? result : result.prepareForPublish(); | |||
return publisherSession.prefetchRecordsQueue().peek(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be nice to abstract away the queue within the session. i.e. publisherSession.peekNext()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -257,16 +277,9 @@ public synchronized void notify(RecordsDeliveryAck recordsDeliveryAck) { | |||
// Verify if the ack matches the head of the queue and evict it. | |||
if (recordsToCheck != null && recordsToCheck.batchUniqueIdentifier() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this if/else logic moved into session? i.e. handleRecordsDeliveryAck(RecordsDeliveryAck ack)
I see we're checking for illegalState in evictPublishedEvent, which we can do before calling handleRecordsDeliveryAck()
This will simplify the code a bit, will be no need to have methods/access to the queue directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java
Outdated
Show resolved
Hide resolved
...client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java
Outdated
Show resolved
Hide resolved
...client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java
Show resolved
Hide resolved
@@ -404,7 +404,7 @@ public void run() { | |||
|
|||
private void makeRetrievalAttempt() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
something to consider: does this method fit better into session now -- there will be direct access to counters, and waiting etc.
Only when retrievalAttempt call to session returns success, then PrefetchRecordsPublisher would call drainQueue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leaving minor feedback to address before pushing out. thank you!
} | ||
|
||
@VisibleForTesting | ||
RecordsRetrieved pollNextResultAndUpdatePrefetchCounters() { | ||
RecordsRetrieved evictPublishedEvent() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seem to be used only in tests after the refactoring. Let's have the tests use session, or have this helper use session directly, without any other actions.
} | ||
|
||
@Override | ||
public void cancel() { | ||
requestedResponses.set(0); | ||
publisherSession.requestedResponses().set(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels like it should be resetting, instead of setting demand to 0. We already discussed this offline, will be good to leave a comment.
Fix to prevent the onNext event going to stale subscription when restart happens in poller
Issue #, if available:
Description of changes:
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.