Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to prevent data loss and stuck shards in the event of failed records delivery in Polling readers #603

Merged
merged 3 commits into from
Sep 3, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
Expand All @@ -48,6 +50,7 @@
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.metrics.ThreadSafeMetricsDelegatingFactory;
import software.amazon.kinesis.retrieval.BatchUniqueIdentifier;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsDeliveryAck;
Expand Down Expand Up @@ -97,8 +100,11 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
private final ReentrantReadWriteLock resetLock = new ReentrantReadWriteLock();
private boolean wasReset = false;

private final Semaphore eventDeliveryLock = new Semaphore(1);
private Instant eventDeliveryLockAcquireTime = Instant.EPOCH;
private Instant lastEventDeliveryTime = Instant.EPOCH;
// This flag controls who should drain the next request in the prefetch queue.
// When set to false, the publisher and demand-notifier thread would have the control.
// When set to true, the event-notifier thread would have the control.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better if we can define what each thread is doing. Maybe we can add a class comment for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay

private AtomicBoolean shouldDrainEventOnAck = new AtomicBoolean(false);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldDrainEventOnlyOnAck?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the reason for shards getting stuck in the previous semaphore solution was that we were not releasing the semaphore when the health checker calls restartFrom. If we do that, shouldn't it solve the entire problem of shards getting stuck forever?

This solution fixes the above problem by handing the control over to publisher thread when the restartFrom is called. But what do we gain by letting the ack-notifier thread drain the queue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. The blocking publisher thread on the semaphore would have acquired the readlock. In order to release the semaphore, the reset thread should acquire readlock as well. Assume the reset thread acquires the readlock and releases the semaphore, we should avoid the publisher thread from publisher thread from further publishing, until the reset thread acquires the write lock.
  2. Also if we have a stale notification after clearing the queue in reset say after 60 seconds, then this would go ahead and release a semaphore, which means the publisher can publish two events without receiving ack for first event, which would cause durability issues.

Considering these factors among others, we decided it would be better if we can validate the ack before publishing the next event, it would prevent the durability issue. Also to ensure that an event is scheduled as soon as possible when there are prefetched events and to avoid having any blocking calls, we introduced this concept of scheduling from different threads and exchanging controls based on the queue state and demand.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha.. yes. Makes sense. Thanks for the details.

But I still think we can achieve this without letting the ack thread drain the queue. We can discuss this later. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The publisher thread has this logic of spinning in a while loop to offer the prefetched element to the queue. That's the reason why the ack-thread (now) and demand-thread (earlier) are calling the drainQueue directly. Let's discuss this offline.


/**
* Constructor for the PrefetchRecordsPublisher. This cache prefetches records from Kinesis and stores them in a
Expand Down Expand Up @@ -151,30 +157,38 @@ public void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPosition
dataFetcher.initialize(extendedSequenceNumber, initialPositionInStreamExtended);

if (!started) {
log.info("Starting prefetching thread.");
log.info("{} : Starting prefetching thread.", shardId);
executorService.execute(defaultGetRecordsCacheDaemon);
}
started = true;
}

RecordsRetrieved getNextResult() {
private void throwOnIllegalState() {
if (executorService.isShutdown()) {
throw new IllegalStateException("Shutdown has been called on the cache, can't accept new requests.");
}

if (!started) {
throw new IllegalStateException("Cache has not been initialized, make sure to call start.");
}
PrefetchRecordsRetrieved result = null;
try {
result = getRecordsResultQueue.take().prepareForPublish();
}

RecordsRetrieved peekNextResult() {
throwOnIllegalState();
final PrefetchRecordsRetrieved result = getRecordsResultQueue.peek();
return result == null ? result : result.prepareForPublish();
}

RecordsRetrieved evictNextResult() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: pollNextResult? just to be in sync with Queue API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm no strong preference. Will change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to pollNextResultAndUpdatePrefetchCounters

throwOnIllegalState();
final PrefetchRecordsRetrieved result = getRecordsResultQueue.poll();
if (result != null) {
prefetchCounters.removed(result.processRecordsInput);
requestedResponses.decrementAndGet();

} catch (InterruptedException e) {
log.error("Interrupted while getting records from the cache", e);
} else {
log.info("{}: No record batch found while evicting from the prefetch queue. This indicates the prefetch buffer"
+ "was reset.", shardId);
}

return result;
}

Expand All @@ -195,6 +209,12 @@ public void restartFrom(RecordsRetrieved recordsRetrieved) {
resetLock.writeLock().lock();
try {
getRecordsResultQueue.clear();

// Give the drain control to publisher/demand-notifier thread.
log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}", shardId,
getRecordsResultQueue.size(), requestedResponses.get());
shouldDrainEventOnAck.set(false);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this change, we had the eventDeliveryLock semaphore which was only released upon receiving the ack. So if the ack is lost somewhere, the shard gets stuck for ever. Was there any reason why we didn't release it within restartFrom here?

Looks like we are fixing that issue in this change. Shouldn't adding this step into the Semaphore solution fix the issue? What do we gain on top of that by this new solution?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

answered above

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, sorry about the duplicate. :)


prefetchCounters.reset();

highestSequenceNumber = prefetchRecordsRetrieved.lastBatchSequenceNumber();
Expand All @@ -213,7 +233,7 @@ public void subscribe(Subscriber<? super RecordsRetrieved> s) {
@Override
public void request(long n) {
requestedResponses.addAndGet(n);
drainQueueForRequests();
initiateDrainQueueForRequests();
}

@Override
Expand All @@ -224,12 +244,34 @@ public void cancel() {
}

@Override
public void notify(RecordsDeliveryAck ack) {
eventDeliveryLock.release();
public synchronized void notify(RecordsDeliveryAck recordsDeliveryAck) {
final RecordsRetrieved recordsToCheck = peekNextResult();
// Verify if the ack matches the head of the queue and evict it.
if (recordsToCheck != null && recordsToCheck.batchUniqueIdentifier()
.equals(recordsDeliveryAck.batchUniqueIdentifier())) {
evictNextResult();
// Upon evicting, check if queue is empty. if yes, then give the drain control back to publisher thread.
if(getRecordsResultQueue.isEmpty()) {
log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}", shardId,
getRecordsResultQueue.size(), requestedResponses.get());
shouldDrainEventOnAck.set(false);
} else {
// Else attempt to drain the queue.
drainQueueForRequests();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the happy case where we continue to receive acks for each event delivered, always the ack-notifier thread will be draining the queue. Publisher thread will continue to do only the fetch part. I think this conflicts a bit with the design of this class and the purpose of the publisher thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Publisher thread will be a kick starter in the event of paused deliveries. In happy case, yes ack-notifier will be scheduling for minimum delay between each delivery and for simplicity.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it'll help a bit with the delay. But not with the simplicity. :) We have 3 different threads draining the queue. I think it would be better and simpler if we had only one thread publishing and other two just updating the demand and registering the ack. Anyway that's a bigger change and we can consider that in the future. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
} else {
// Log and ignore any other ack received. As long as an ack is received for head of the queue
// we are good. Any stale or future ack received can be ignored, though the latter is not feasible
// to happen.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another reason to come here is when the queue is reset before the ack comes back. Add this too in the comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be considered as a stale ack.

final BatchUniqueIdentifier peekedBatchUniqueIdentifier = recordsToCheck == null ? null : recordsToCheck.batchUniqueIdentifier();
log.info("{} : Received a stale notification with id {} instead of expected id {} at {}. Will ignore.", shardId,
recordsDeliveryAck.batchUniqueIdentifier(), peekedBatchUniqueIdentifier, Instant.now());
}
// Take action based on the time spent by the event in queue.
takeDelayedDeliveryActionIfRequired(shardId, eventDeliveryLockAcquireTime, log);
takeDelayedDeliveryActionIfRequired(shardId, lastEventDeliveryTime, log);
}

// Note : Do not make this method synchronous as notify() will not be able to evict any entry from the queue.
private void addArrivedRecordsInput(PrefetchRecordsRetrieved recordsRetrieved) throws InterruptedException {
wasReset = false;
while (!getRecordsResultQueue.offer(recordsRetrieved, idleMillisBetweenCalls, TimeUnit.MILLISECONDS)) {
Expand All @@ -248,11 +290,39 @@ private void addArrivedRecordsInput(PrefetchRecordsRetrieved recordsRetrieved) t
prefetchCounters.added(recordsRetrieved.processRecordsInput);
}

/**
* Method that will be called by the 'publisher thread' and the 'demand notifying thread',
* to drain the events if the 'event notifying thread' do not have the control.
*/
private synchronized void initiateDrainQueueForRequests() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initiate may not be the best word here. maybe drainQueueForRequestsIfAllowed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep

if(!shouldDrainEventOnAck.get()) {
drainQueueForRequests();
}
}

/**
* Method to drain the queue based on the demand and the events availability in the queue.
*/
private synchronized void drainQueueForRequests() {
while (requestedResponses.get() > 0 && !getRecordsResultQueue.isEmpty()) {
eventDeliveryLock.acquireUninterruptibly();
eventDeliveryLockAcquireTime = Instant.now();
subscriber.onNext(getNextResult());
final RecordsRetrieved recordsToDeliver = peekNextResult();
// If there is an event available to drain and if there is at least one demand,
// then schedule it for delivery
if (requestedResponses.get() > 0 && recordsToDeliver != null) {
lastEventDeliveryTime = Instant.now();
subscriber.onNext(recordsToDeliver);
if(!shouldDrainEventOnAck.get()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing space between if and (

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will address

log.debug("{} : Notifier thread takes over the draining control. Queue Size : {}, Demand : {}", shardId,
getRecordsResultQueue.size(), requestedResponses.get());
shouldDrainEventOnAck.set(true);
}
} else {
// Since we haven't scheduled the event delivery, give the drain control back to publisher/demand-notifier
// thread.
if(shouldDrainEventOnAck.get()){
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing space between if and (. check all such places.

log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}", shardId,
getRecordsResultQueue.size(), requestedResponses.get());
shouldDrainEventOnAck.set(false);
}
}
}

Expand All @@ -263,12 +333,26 @@ static class PrefetchRecordsRetrieved implements RecordsRetrieved {
final ProcessRecordsInput processRecordsInput;
final String lastBatchSequenceNumber;
final String shardIterator;
final BatchUniqueIdentifier batchUniqueIdentifier;

PrefetchRecordsRetrieved prepareForPublish() {
return new PrefetchRecordsRetrieved(processRecordsInput.toBuilder().cacheExitTime(Instant.now()).build(),
lastBatchSequenceNumber, shardIterator);
lastBatchSequenceNumber, shardIterator, batchUniqueIdentifier);
}

@Override
public BatchUniqueIdentifier batchUniqueIdentifier() {
return batchUniqueIdentifier;
}

/**
* Generate batch unique identifier for PrefetchRecordsRetrieved, where flow will be empty.
* @return BatchUniqueIdentifier
*/
public static BatchUniqueIdentifier generateBatchUniqueIdentifier() {
return new BatchUniqueIdentifier(UUID.randomUUID().toString(),
StringUtils.EMPTY);
}
}

private String calculateHighestSequenceNumber(ProcessRecordsInput processRecordsInput) {
Expand All @@ -291,15 +375,15 @@ private class DefaultGetRecordsCacheDaemon implements Runnable {
public void run() {
while (!isShutdown) {
if (Thread.currentThread().isInterrupted()) {
log.warn("Prefetch thread was interrupted.");
log.warn("{} : Prefetch thread was interrupted.", shardId);
break;
}

resetLock.readLock().lock();
try {
makeRetrievalAttempt();
} catch(PositionResetException pre) {
log.debug("Position was reset while attempting to add item to queue.");
log.debug("{} : Position was reset while attempting to add item to queue.", shardId);
} finally {
resetLock.readLock().unlock();
}
Expand Down Expand Up @@ -328,30 +412,31 @@ private void makeRetrievalAttempt() {

highestSequenceNumber = calculateHighestSequenceNumber(processRecordsInput);
PrefetchRecordsRetrieved recordsRetrieved = new PrefetchRecordsRetrieved(processRecordsInput,
highestSequenceNumber, getRecordsResult.nextShardIterator());
highestSequenceNumber, getRecordsResult.nextShardIterator(),
PrefetchRecordsRetrieved.generateBatchUniqueIdentifier());
highestSequenceNumber = recordsRetrieved.lastBatchSequenceNumber;
addArrivedRecordsInput(recordsRetrieved);
drainQueueForRequests();
initiateDrainQueueForRequests();
} catch (PositionResetException pse) {
throw pse;
} catch (RetryableRetrievalException rre) {
log.info("Timeout occurred while waiting for response from Kinesis. Will retry the request.");
log.info("{} : Timeout occurred while waiting for response from Kinesis. Will retry the request.", shardId);
} catch (InterruptedException e) {
log.info("Thread was interrupted, indicating shutdown was called on the cache.");
log.info("{} : Thread was interrupted, indicating shutdown was called on the cache.", shardId);
} catch (ExpiredIteratorException e) {
log.info("ShardId {}: records threw ExpiredIteratorException - restarting"
log.info("{} : records threw ExpiredIteratorException - restarting"
+ " after greatest seqNum passed to customer", shardId, e);

scope.addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.COUNT, MetricsLevel.SUMMARY);

dataFetcher.restartIterator();
} catch (SdkException e) {
log.error("Exception thrown while fetching records from Kinesis", e);
log.error("{} : Exception thrown while fetching records from Kinesis", shardId, e);
} catch (Throwable e) {
log.error("Unexpected exception was thrown. This could probably be an issue or a bug." +
log.error("{} : Unexpected exception was thrown. This could probably be an issue or a bug." +
" Please search for the exception/error online to check what is going on. If the " +
"issue persists or is a recurring problem, feel free to open an issue on, " +
"https://github.com/awslabs/amazon-kinesis-client.", e);
"https://github.com/awslabs/amazon-kinesis-client.", shardId, e);
} finally {
MetricsUtil.endScope(scope);
}
Expand All @@ -362,8 +447,8 @@ private void makeRetrievalAttempt() {
try {
prefetchCounters.waitForConsumer();
} catch (InterruptedException ie) {
log.info("Thread was interrupted while waiting for the consumer. " +
"Shutdown has probably been started");
log.info("{} : Thread was interrupted while waiting for the consumer. " +
"Shutdown has probably been started", shardId);
}
}
}
Expand Down Expand Up @@ -410,14 +495,14 @@ private long getByteSize(final ProcessRecordsInput result) {

public synchronized void waitForConsumer() throws InterruptedException {
if (!shouldGetNewRecords()) {
log.debug("Queue is full waiting for consumer for {} ms", idleMillisBetweenCalls);
log.debug("{} : Queue is full waiting for consumer for {} ms", shardId, idleMillisBetweenCalls);
this.wait(idleMillisBetweenCalls);
}
}

public synchronized boolean shouldGetNewRecords() {
if (log.isDebugEnabled()) {
log.debug("Current Prefetch Counter States: {}", this.toString());
log.debug("{} : Current Prefetch Counter States: {}", shardId, this.toString());
}
return size < maxRecordsCount && byteSize < maxByteSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static software.amazon.kinesis.utils.BlockingUtils.blockUntilRecordsAvailable;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -123,13 +122,15 @@ public void testRollingCache() {
getRecordsCache.start(extendedSequenceNumber, initialPosition);
sleep(IDLE_MILLIS_BETWEEN_CALLS);

ProcessRecordsInput processRecordsInput1 = getRecordsCache.getNextResult().processRecordsInput();
ProcessRecordsInput processRecordsInput1 = blockUntilRecordsAvailable(getRecordsCache::evictNextResult, 1000L)
.processRecordsInput();

assertTrue(processRecordsInput1.records().isEmpty());
assertEquals(processRecordsInput1.millisBehindLatest(), new Long(1000));
assertNotNull(processRecordsInput1.cacheEntryTime());

ProcessRecordsInput processRecordsInput2 = getRecordsCache.getNextResult().processRecordsInput();
ProcessRecordsInput processRecordsInput2 = blockUntilRecordsAvailable(getRecordsCache::evictNextResult, 1000L)
.processRecordsInput();

assertNotEquals(processRecordsInput1, processRecordsInput2);
}
Expand All @@ -141,8 +142,10 @@ public void testFullCache() {

assertEquals(getRecordsCache.getRecordsResultQueue.size(), MAX_SIZE);

ProcessRecordsInput processRecordsInput1 = getRecordsCache.getNextResult().processRecordsInput();
ProcessRecordsInput processRecordsInput2 = getRecordsCache.getNextResult().processRecordsInput();
ProcessRecordsInput processRecordsInput1 = blockUntilRecordsAvailable(getRecordsCache::evictNextResult, 1000L)
.processRecordsInput();
ProcessRecordsInput processRecordsInput2 = blockUntilRecordsAvailable(getRecordsCache::evictNextResult, 1000L)
.processRecordsInput();

assertNotEquals(processRecordsInput1, processRecordsInput2);
}
Expand Down Expand Up @@ -181,9 +184,9 @@ public void testDifferentShardCaches() {

sleep(IDLE_MILLIS_BETWEEN_CALLS);

ProcessRecordsInput p1 = getRecordsCache.getNextResult().processRecordsInput();
ProcessRecordsInput p1 = getRecordsCache.evictNextResult().processRecordsInput();

ProcessRecordsInput p2 = recordsPublisher2.getNextResult().processRecordsInput();
ProcessRecordsInput p2 = recordsPublisher2.evictNextResult().processRecordsInput();

assertNotEquals(p1, p2);
assertTrue(p1.records().isEmpty());
Expand All @@ -209,7 +212,8 @@ public DataFetcherResult answer(final InvocationOnMock invocationOnMock) throws
getRecordsCache.start(extendedSequenceNumber, initialPosition);
sleep(IDLE_MILLIS_BETWEEN_CALLS);

ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult().processRecordsInput();
ProcessRecordsInput processRecordsInput = blockUntilRecordsAvailable(getRecordsCache::evictNextResult, 1000L)
.processRecordsInput();

assertNotNull(processRecordsInput);
assertTrue(processRecordsInput.records().isEmpty());
Expand Down
Loading