Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to prevent the onNext event going to stale subscription when restart happens in poller #606

Merged
merged 5 commits into from
Sep 19, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,15 @@ public void restartFrom(RecordsRetrieved recordsRetrieved) {
PrefetchRecordsRetrieved prefetchRecordsRetrieved = (PrefetchRecordsRetrieved) recordsRetrieved;
resetLock.writeLock().lock();
try {
// Reset the demand from ShardConsumer, to prevent this publisher from delivering events to stale RX-Java
// Subscriber. Publishing will be unblocked when the demand is communicated by the new Rx-Java subscriber.
requestedResponses.set(0);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this class keep track of so much state, and they all need to be reset at restartFrom call. I'm wondering if pulling out the state tracking variable in one common place, and call reset on that one to start with clean state helps longer term maintainability of this class. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Updated.


// Clear the queue, so that the publisher repopulates the queue based on sequence number from subscriber.
getRecordsResultQueue.clear();

// Give the drain control to publisher/demand-notifier thread.
log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}", shardId,
getRecordsResultQueue.size(), requestedResponses.get());
shouldDrainEventOnlyOnAck.set(false);
giveDrainingControlToPublisherOrDemandNotifier();

prefetchCounters.reset();

Expand Down Expand Up @@ -260,9 +263,7 @@ public synchronized void notify(RecordsDeliveryAck recordsDeliveryAck) {
pollNextResultAndUpdatePrefetchCounters();
// Upon evicting, check if queue is empty. if yes, then give the drain control back to publisher thread.
if (getRecordsResultQueue.isEmpty()) {
log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}",
shardId, getRecordsResultQueue.size(), requestedResponses.get());
shouldDrainEventOnlyOnAck.set(false);
giveDrainingControlToPublisherOrDemandNotifier();
} else {
// Else attempt to drain the queue.
drainQueueForRequests();
Expand Down Expand Up @@ -317,21 +318,29 @@ private synchronized void drainQueueForRequests() {
// If there is an event available to drain and if there is at least one demand,
// then schedule it for delivery
if (requestedResponses.get() > 0 && recordsToDeliver != null) {
lastEventDeliveryTime = Instant.now();
subscriber.onNext(recordsToDeliver);
if (!shouldDrainEventOnlyOnAck.get()) {
log.debug("{} : Notifier thread takes over the draining control. Queue Size : {}, Demand : {}", shardId,
getRecordsResultQueue.size(), requestedResponses.get());
shouldDrainEventOnlyOnAck.set(true);
}
lastEventDeliveryTime = Instant.now();
giveDrainingControlToEventNotifier();
} else {
// Since we haven't scheduled the event delivery, give the drain control back to publisher/demand-notifier
// thread.
if (shouldDrainEventOnlyOnAck.get()) {
log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}",
shardId, getRecordsResultQueue.size(), requestedResponses.get());
shouldDrainEventOnlyOnAck.set(false);
}
giveDrainingControlToPublisherOrDemandNotifier();
}
}

private void giveDrainingControlToEventNotifier() {
if (!shouldDrainEventOnlyOnAck.get()) {
log.debug("{} : Notifier thread takes over the draining control. Queue Size : {}, Demand : {}", shardId,
getRecordsResultQueue.size(), requestedResponses.get());
shouldDrainEventOnlyOnAck.set(true);
}
}

private void giveDrainingControlToPublisherOrDemandNotifier() {
if (shouldDrainEventOnlyOnAck.get()) {
log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}",
shardId, getRecordsResultQueue.size(), requestedResponses.get());
shouldDrainEventOnlyOnAck.set(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ public void testRetryableRetrievalExceptionContinues() {
assertThat(records.processRecordsInput().millisBehindLatest(), equalTo(response.millisBehindLatest()));
}

@Test(timeout = 20000L)
@Test(timeout = 10000L)
public void testNoDeadlockOnFullQueue() {
//
// Fixes https://github.com/awslabs/amazon-kinesis-client/issues/448
Expand Down Expand Up @@ -304,7 +304,7 @@ public void testNoDeadlockOnFullQueue() {

log.info("Queue is currently at {} starting subscriber", getRecordsCache.getRecordsResultQueue.size());
AtomicInteger receivedItems = new AtomicInteger(0);
final int expectedItems = MAX_SIZE * 1000;
final int expectedItems = MAX_SIZE * 20;

Object lock = new Object();

Expand Down Expand Up @@ -359,7 +359,7 @@ public void onComplete() {
assertThat(receivedItems.get(), equalTo(expectedItems));
}

@Test(timeout = 20000L)
@Test(timeout = 10000L)
public void testNoDeadlockOnFullQueueAndLossOfNotification() {
//
// Fixes https://github.com/awslabs/amazon-kinesis-client/issues/602
Expand All @@ -383,7 +383,7 @@ public void testNoDeadlockOnFullQueueAndLossOfNotification() {

log.info("Queue is currently at {} starting subscriber", getRecordsCache.getRecordsResultQueue.size());
AtomicInteger receivedItems = new AtomicInteger(0);
final int expectedItems = MAX_SIZE * 100;
final int expectedItems = MAX_SIZE * 50;

Object lock = new Object();

Expand Down Expand Up @@ -521,7 +521,7 @@ public GetRecordsResponse answer(InvocationOnMock invocation) throws Throwable {

private static class LossyNotificationSubscriber extends ShardConsumerNotifyingSubscriber {

private static final int LOSS_EVERY_NTH_RECORD = 100;
private static final int LOSS_EVERY_NTH_RECORD = 50;
private static int recordCounter = 0;
private static final ScheduledExecutorService consumerHealthChecker = Executors.newScheduledThreadPool(1);

Expand Down