Skip to content

Commit

Permalink
Fix to prevent data loss and stuck shards in the event of failed reco…
Browse files Browse the repository at this point in the history
…rds delivery in Polling readers (#603)

* Fix to prevent data loss and stuck shards in the event of failed records delivery.

* Review comment fixes

* Access specifiers fix
  • Loading branch information
ashwing authored and micah-jaffe committed Sep 3, 2019
1 parent 85d31c9 commit f6dec3e
Show file tree
Hide file tree
Showing 4 changed files with 309 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
Expand All @@ -48,6 +50,7 @@
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.metrics.ThreadSafeMetricsDelegatingFactory;
import software.amazon.kinesis.retrieval.BatchUniqueIdentifier;
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RecordsDeliveryAck;
Expand All @@ -65,6 +68,12 @@
* i.e. the byte size of the records stored in the cache and maxRecordsCount i.e. the max number of records that should
* be present in the cache across multiple GetRecordsResult object. If no data is available in the cache, the call from
* the record processor is blocked till records are retrieved from Kinesis.
*
* There are three threads namely publisher, demand-notifier and ack-notifier which will contend to drain the events
* to the Subscriber (ShardConsumer in KCL). The publisher/demand-notifier thread gains the control to drain only when
* there is no pending event in the prefetch queue waiting for the ack. Otherwise, it will be the ack-notifier thread
* which will drain an event on the receipt of an ack.
*
*/
@Slf4j
@KinesisClientInternalApi
Expand Down Expand Up @@ -97,8 +106,11 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
private final ReentrantReadWriteLock resetLock = new ReentrantReadWriteLock();
private boolean wasReset = false;

private final Semaphore eventDeliveryLock = new Semaphore(1);
private Instant eventDeliveryLockAcquireTime = Instant.EPOCH;
private Instant lastEventDeliveryTime = Instant.EPOCH;
// This flag controls who should drain the next request in the prefetch queue.
// When set to false, the publisher and demand-notifier thread would have the control.
// When set to true, the event-notifier thread would have the control.
private AtomicBoolean shouldDrainEventOnlyOnAck = new AtomicBoolean(false);

/**
* Constructor for the PrefetchRecordsPublisher. This cache prefetches records from Kinesis and stores them in a
Expand Down Expand Up @@ -151,30 +163,40 @@ public void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPosition
dataFetcher.initialize(extendedSequenceNumber, initialPositionInStreamExtended);

if (!started) {
log.info("Starting prefetching thread.");
log.info("{} : Starting prefetching thread.", shardId);
executorService.execute(defaultGetRecordsCacheDaemon);
}
started = true;
}

RecordsRetrieved getNextResult() {
private void throwOnIllegalState() {
if (executorService.isShutdown()) {
throw new IllegalStateException("Shutdown has been called on the cache, can't accept new requests.");
}

if (!started) {
throw new IllegalStateException("Cache has not been initialized, make sure to call start.");
}
PrefetchRecordsRetrieved result = null;
try {
result = getRecordsResultQueue.take().prepareForPublish();
}

private RecordsRetrieved peekNextResult() {
throwOnIllegalState();
final PrefetchRecordsRetrieved result = getRecordsResultQueue.peek();
return result == null ? result : result.prepareForPublish();
}

@VisibleForTesting
RecordsRetrieved pollNextResultAndUpdatePrefetchCounters() {
throwOnIllegalState();
final PrefetchRecordsRetrieved result = getRecordsResultQueue.poll();
if (result != null) {
prefetchCounters.removed(result.processRecordsInput);
requestedResponses.decrementAndGet();

} catch (InterruptedException e) {
log.error("Interrupted while getting records from the cache", e);
} else {
log.info(
"{}: No record batch found while evicting from the prefetch queue. This indicates the prefetch buffer"
+ "was reset.", shardId);
}

return result;
}

Expand All @@ -195,6 +217,12 @@ public void restartFrom(RecordsRetrieved recordsRetrieved) {
resetLock.writeLock().lock();
try {
getRecordsResultQueue.clear();

// Give the drain control to publisher/demand-notifier thread.
log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}", shardId,
getRecordsResultQueue.size(), requestedResponses.get());
shouldDrainEventOnlyOnAck.set(false);

prefetchCounters.reset();

highestSequenceNumber = prefetchRecordsRetrieved.lastBatchSequenceNumber();
Expand All @@ -213,7 +241,7 @@ public void subscribe(Subscriber<? super RecordsRetrieved> s) {
@Override
public void request(long n) {
requestedResponses.addAndGet(n);
drainQueueForRequests();
drainQueueForRequestsIfAllowed();
}

@Override
Expand All @@ -224,12 +252,35 @@ public void cancel() {
}

@Override
public void notify(RecordsDeliveryAck ack) {
eventDeliveryLock.release();
public synchronized void notify(RecordsDeliveryAck recordsDeliveryAck) {
final RecordsRetrieved recordsToCheck = peekNextResult();
// Verify if the ack matches the head of the queue and evict it.
if (recordsToCheck != null && recordsToCheck.batchUniqueIdentifier()
.equals(recordsDeliveryAck.batchUniqueIdentifier())) {
pollNextResultAndUpdatePrefetchCounters();
// Upon evicting, check if queue is empty. if yes, then give the drain control back to publisher thread.
if (getRecordsResultQueue.isEmpty()) {
log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}",
shardId, getRecordsResultQueue.size(), requestedResponses.get());
shouldDrainEventOnlyOnAck.set(false);
} else {
// Else attempt to drain the queue.
drainQueueForRequests();
}
} else {
// Log and ignore any other ack received. As long as an ack is received for head of the queue
// we are good. Any stale or future ack received can be ignored, though the latter is not feasible
// to happen.
final BatchUniqueIdentifier peekedBatchUniqueIdentifier =
recordsToCheck == null ? null : recordsToCheck.batchUniqueIdentifier();
log.info("{} : Received a stale notification with id {} instead of expected id {} at {}. Will ignore.",
shardId, recordsDeliveryAck.batchUniqueIdentifier(), peekedBatchUniqueIdentifier, Instant.now());
}
// Take action based on the time spent by the event in queue.
takeDelayedDeliveryActionIfRequired(shardId, eventDeliveryLockAcquireTime, log);
takeDelayedDeliveryActionIfRequired(shardId, lastEventDeliveryTime, log);
}

// Note : Do not make this method synchronous as notify() will not be able to evict any entry from the queue.
private void addArrivedRecordsInput(PrefetchRecordsRetrieved recordsRetrieved) throws InterruptedException {
wasReset = false;
while (!getRecordsResultQueue.offer(recordsRetrieved, idleMillisBetweenCalls, TimeUnit.MILLISECONDS)) {
Expand All @@ -248,11 +299,39 @@ private void addArrivedRecordsInput(PrefetchRecordsRetrieved recordsRetrieved) t
prefetchCounters.added(recordsRetrieved.processRecordsInput);
}

/**
* Method that will be called by the 'publisher thread' and the 'demand notifying thread',
* to drain the events if the 'event notifying thread' do not have the control.
*/
private synchronized void drainQueueForRequestsIfAllowed() {
if (!shouldDrainEventOnlyOnAck.get()) {
drainQueueForRequests();
}
}

/**
* Method to drain the queue based on the demand and the events availability in the queue.
*/
private synchronized void drainQueueForRequests() {
while (requestedResponses.get() > 0 && !getRecordsResultQueue.isEmpty()) {
eventDeliveryLock.acquireUninterruptibly();
eventDeliveryLockAcquireTime = Instant.now();
subscriber.onNext(getNextResult());
final RecordsRetrieved recordsToDeliver = peekNextResult();
// If there is an event available to drain and if there is at least one demand,
// then schedule it for delivery
if (requestedResponses.get() > 0 && recordsToDeliver != null) {
lastEventDeliveryTime = Instant.now();
subscriber.onNext(recordsToDeliver);
if (!shouldDrainEventOnlyOnAck.get()) {
log.debug("{} : Notifier thread takes over the draining control. Queue Size : {}, Demand : {}", shardId,
getRecordsResultQueue.size(), requestedResponses.get());
shouldDrainEventOnlyOnAck.set(true);
}
} else {
// Since we haven't scheduled the event delivery, give the drain control back to publisher/demand-notifier
// thread.
if (shouldDrainEventOnlyOnAck.get()) {
log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}",
shardId, getRecordsResultQueue.size(), requestedResponses.get());
shouldDrainEventOnlyOnAck.set(false);
}
}
}

Expand All @@ -263,12 +342,26 @@ static class PrefetchRecordsRetrieved implements RecordsRetrieved {
final ProcessRecordsInput processRecordsInput;
final String lastBatchSequenceNumber;
final String shardIterator;
final BatchUniqueIdentifier batchUniqueIdentifier;

PrefetchRecordsRetrieved prepareForPublish() {
return new PrefetchRecordsRetrieved(processRecordsInput.toBuilder().cacheExitTime(Instant.now()).build(),
lastBatchSequenceNumber, shardIterator);
lastBatchSequenceNumber, shardIterator, batchUniqueIdentifier);
}

@Override
public BatchUniqueIdentifier batchUniqueIdentifier() {
return batchUniqueIdentifier;
}

/**
* Generate batch unique identifier for PrefetchRecordsRetrieved, where flow will be empty.
* @return BatchUniqueIdentifier
*/
public static BatchUniqueIdentifier generateBatchUniqueIdentifier() {
return new BatchUniqueIdentifier(UUID.randomUUID().toString(),
StringUtils.EMPTY);
}
}

private String calculateHighestSequenceNumber(ProcessRecordsInput processRecordsInput) {
Expand All @@ -291,15 +384,15 @@ private class DefaultGetRecordsCacheDaemon implements Runnable {
public void run() {
while (!isShutdown) {
if (Thread.currentThread().isInterrupted()) {
log.warn("Prefetch thread was interrupted.");
log.warn("{} : Prefetch thread was interrupted.", shardId);
break;
}

resetLock.readLock().lock();
try {
makeRetrievalAttempt();
} catch(PositionResetException pre) {
log.debug("Position was reset while attempting to add item to queue.");
log.debug("{} : Position was reset while attempting to add item to queue.", shardId);
} finally {
resetLock.readLock().unlock();
}
Expand Down Expand Up @@ -328,30 +421,31 @@ private void makeRetrievalAttempt() {

highestSequenceNumber = calculateHighestSequenceNumber(processRecordsInput);
PrefetchRecordsRetrieved recordsRetrieved = new PrefetchRecordsRetrieved(processRecordsInput,
highestSequenceNumber, getRecordsResult.nextShardIterator());
highestSequenceNumber, getRecordsResult.nextShardIterator(),
PrefetchRecordsRetrieved.generateBatchUniqueIdentifier());
highestSequenceNumber = recordsRetrieved.lastBatchSequenceNumber;
addArrivedRecordsInput(recordsRetrieved);
drainQueueForRequests();
drainQueueForRequestsIfAllowed();
} catch (PositionResetException pse) {
throw pse;
} catch (RetryableRetrievalException rre) {
log.info("Timeout occurred while waiting for response from Kinesis. Will retry the request.");
log.info("{} : Timeout occurred while waiting for response from Kinesis. Will retry the request.", shardId);
} catch (InterruptedException e) {
log.info("Thread was interrupted, indicating shutdown was called on the cache.");
log.info("{} : Thread was interrupted, indicating shutdown was called on the cache.", shardId);
} catch (ExpiredIteratorException e) {
log.info("ShardId {}: records threw ExpiredIteratorException - restarting"
log.info("{} : records threw ExpiredIteratorException - restarting"
+ " after greatest seqNum passed to customer", shardId, e);

scope.addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.COUNT, MetricsLevel.SUMMARY);

dataFetcher.restartIterator();
} catch (SdkException e) {
log.error("Exception thrown while fetching records from Kinesis", e);
log.error("{} : Exception thrown while fetching records from Kinesis", shardId, e);
} catch (Throwable e) {
log.error("Unexpected exception was thrown. This could probably be an issue or a bug." +
log.error("{} : Unexpected exception was thrown. This could probably be an issue or a bug." +
" Please search for the exception/error online to check what is going on. If the " +
"issue persists or is a recurring problem, feel free to open an issue on, " +
"https://github.com/awslabs/amazon-kinesis-client.", e);
"https://github.com/awslabs/amazon-kinesis-client.", shardId, e);
} finally {
MetricsUtil.endScope(scope);
}
Expand All @@ -362,8 +456,8 @@ private void makeRetrievalAttempt() {
try {
prefetchCounters.waitForConsumer();
} catch (InterruptedException ie) {
log.info("Thread was interrupted while waiting for the consumer. " +
"Shutdown has probably been started");
log.info("{} : Thread was interrupted while waiting for the consumer. " +
"Shutdown has probably been started", shardId);
}
}
}
Expand Down Expand Up @@ -410,14 +504,14 @@ private long getByteSize(final ProcessRecordsInput result) {

public synchronized void waitForConsumer() throws InterruptedException {
if (!shouldGetNewRecords()) {
log.debug("Queue is full waiting for consumer for {} ms", idleMillisBetweenCalls);
log.debug("{} : Queue is full waiting for consumer for {} ms", shardId, idleMillisBetweenCalls);
this.wait(idleMillisBetweenCalls);
}
}

public synchronized boolean shouldGetNewRecords() {
if (log.isDebugEnabled()) {
log.debug("Current Prefetch Counter States: {}", this.toString());
log.debug("{} : Current Prefetch Counter States: {}", shardId, this.toString());
}
return size < maxRecordsCount && byteSize < maxByteSize;
}
Expand Down
Loading

0 comments on commit f6dec3e

Please sign in to comment.