-
Notifications
You must be signed in to change notification settings - Fork 467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix to prevent data loss and stuck shards in the event of failed records delivery in Polling readers #603
Fix to prevent data loss and stuck shards in the event of failed records delivery in Polling readers #603
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,14 +18,16 @@ | |
import java.time.Duration; | ||
import java.time.Instant; | ||
import java.util.List; | ||
import java.util.UUID; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
import java.util.concurrent.Semaphore; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
import java.util.stream.Collectors; | ||
|
||
import org.apache.commons.lang3.StringUtils; | ||
import org.apache.commons.lang3.Validate; | ||
import org.reactivestreams.Subscriber; | ||
import org.reactivestreams.Subscription; | ||
|
@@ -48,6 +50,7 @@ | |
import software.amazon.kinesis.metrics.MetricsScope; | ||
import software.amazon.kinesis.metrics.MetricsUtil; | ||
import software.amazon.kinesis.metrics.ThreadSafeMetricsDelegatingFactory; | ||
import software.amazon.kinesis.retrieval.BatchUniqueIdentifier; | ||
import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; | ||
import software.amazon.kinesis.retrieval.KinesisClientRecord; | ||
import software.amazon.kinesis.retrieval.RecordsDeliveryAck; | ||
|
@@ -97,8 +100,11 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { | |
private final ReentrantReadWriteLock resetLock = new ReentrantReadWriteLock(); | ||
private boolean wasReset = false; | ||
|
||
private final Semaphore eventDeliveryLock = new Semaphore(1); | ||
private Instant eventDeliveryLockAcquireTime = Instant.EPOCH; | ||
private Instant lastEventDeliveryTime = Instant.EPOCH; | ||
// This flag controls who should drain the next request in the prefetch queue. | ||
// When set to false, the publisher and demand-notifier thread would have the control. | ||
// When set to true, the event-notifier thread would have the control. | ||
private AtomicBoolean shouldDrainEventOnAck = new AtomicBoolean(false); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldDrainEventOnlyOnAck? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like the reason for shards getting stuck in the previous semaphore solution was that we were not releasing the semaphore when the health checker calls restartFrom. If we do that, shouldn't it solve the entire problem of shards getting stuck forever? This solution fixes the above problem by handing the control over to publisher thread when the restartFrom is called. But what do we gain by letting the ack-notifier thread drain the queue? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Considering these factors among others, we decided it would be better if we can validate the ack before publishing the next event, it would prevent the durability issue. Also to ensure that an event is scheduled as soon as possible when there are prefetched events and to avoid having any blocking calls, we introduced this concept of scheduling from different threads and exchanging controls based on the queue state and demand. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Aha.. yes. Makes sense. Thanks for the details. But I still think we can achieve this without letting the ack thread drain the queue. We can discuss this later. :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The publisher thread has this logic of spinning in a while loop to offer the prefetched element to the queue. That's the reason why the ack-thread (now) and demand-thread (earlier) are calling the drainQueue directly. Let's discuss this offline. |
||
|
||
/** | ||
* Constructor for the PrefetchRecordsPublisher. This cache prefetches records from Kinesis and stores them in a | ||
|
@@ -151,30 +157,38 @@ public void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPosition | |
dataFetcher.initialize(extendedSequenceNumber, initialPositionInStreamExtended); | ||
|
||
if (!started) { | ||
log.info("Starting prefetching thread."); | ||
log.info("{} : Starting prefetching thread.", shardId); | ||
executorService.execute(defaultGetRecordsCacheDaemon); | ||
} | ||
started = true; | ||
} | ||
|
||
RecordsRetrieved getNextResult() { | ||
private void throwOnIllegalState() { | ||
if (executorService.isShutdown()) { | ||
throw new IllegalStateException("Shutdown has been called on the cache, can't accept new requests."); | ||
} | ||
|
||
if (!started) { | ||
throw new IllegalStateException("Cache has not been initialized, make sure to call start."); | ||
} | ||
PrefetchRecordsRetrieved result = null; | ||
try { | ||
result = getRecordsResultQueue.take().prepareForPublish(); | ||
} | ||
|
||
RecordsRetrieved peekNextResult() { | ||
throwOnIllegalState(); | ||
final PrefetchRecordsRetrieved result = getRecordsResultQueue.peek(); | ||
return result == null ? result : result.prepareForPublish(); | ||
} | ||
|
||
RecordsRetrieved evictNextResult() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: pollNextResult? just to be in sync with Queue API? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmmm no strong preference. Will change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated to pollNextResultAndUpdatePrefetchCounters |
||
throwOnIllegalState(); | ||
final PrefetchRecordsRetrieved result = getRecordsResultQueue.poll(); | ||
if (result != null) { | ||
prefetchCounters.removed(result.processRecordsInput); | ||
requestedResponses.decrementAndGet(); | ||
|
||
} catch (InterruptedException e) { | ||
log.error("Interrupted while getting records from the cache", e); | ||
} else { | ||
log.info("{}: No record batch found while evicting from the prefetch queue. This indicates the prefetch buffer" | ||
+ "was reset.", shardId); | ||
} | ||
|
||
return result; | ||
} | ||
|
||
|
@@ -195,6 +209,12 @@ public void restartFrom(RecordsRetrieved recordsRetrieved) { | |
resetLock.writeLock().lock(); | ||
try { | ||
getRecordsResultQueue.clear(); | ||
|
||
// Give the drain control to publisher/demand-notifier thread. | ||
log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}", shardId, | ||
getRecordsResultQueue.size(), requestedResponses.get()); | ||
shouldDrainEventOnAck.set(false); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Before this change, we had the eventDeliveryLock semaphore which was only released upon receiving the ack. So if the ack is lost somewhere, the shard gets stuck for ever. Was there any reason why we didn't release it within restartFrom here? Looks like we are fixing that issue in this change. Shouldn't adding this step into the Semaphore solution fix the issue? What do we gain on top of that by this new solution? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. answered above There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, sorry about the duplicate. :) |
||
|
||
prefetchCounters.reset(); | ||
|
||
highestSequenceNumber = prefetchRecordsRetrieved.lastBatchSequenceNumber(); | ||
|
@@ -213,7 +233,7 @@ public void subscribe(Subscriber<? super RecordsRetrieved> s) { | |
@Override | ||
public void request(long n) { | ||
requestedResponses.addAndGet(n); | ||
drainQueueForRequests(); | ||
initiateDrainQueueForRequests(); | ||
} | ||
|
||
@Override | ||
|
@@ -224,12 +244,34 @@ public void cancel() { | |
} | ||
|
||
@Override | ||
public void notify(RecordsDeliveryAck ack) { | ||
eventDeliveryLock.release(); | ||
public synchronized void notify(RecordsDeliveryAck recordsDeliveryAck) { | ||
final RecordsRetrieved recordsToCheck = peekNextResult(); | ||
// Verify if the ack matches the head of the queue and evict it. | ||
if (recordsToCheck != null && recordsToCheck.batchUniqueIdentifier() | ||
.equals(recordsDeliveryAck.batchUniqueIdentifier())) { | ||
evictNextResult(); | ||
// Upon evicting, check if queue is empty. if yes, then give the drain control back to publisher thread. | ||
if(getRecordsResultQueue.isEmpty()) { | ||
log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}", shardId, | ||
getRecordsResultQueue.size(), requestedResponses.get()); | ||
shouldDrainEventOnAck.set(false); | ||
} else { | ||
// Else attempt to drain the queue. | ||
drainQueueForRequests(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the happy case where we continue to receive acks for each event delivered, always the ack-notifier thread will be draining the queue. Publisher thread will continue to do only the fetch part. I think this conflicts a bit with the design of this class and the purpose of the publisher thread. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Publisher thread will be a kick starter in the event of paused deliveries. In happy case, yes ack-notifier will be scheduling for minimum delay between each delivery and for simplicity. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that it'll help a bit with the delay. But not with the simplicity. :) We have 3 different threads draining the queue. I think it would be better and simpler if we had only one thread publishing and other two just updating the demand and registering the ack. Anyway that's a bigger change and we can consider that in the future. :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed this in https://github.com/awslabs/amazon-kinesis-client/pull/606/files |
||
} | ||
} else { | ||
// Log and ignore any other ack received. As long as an ack is received for head of the queue | ||
// we are good. Any stale or future ack received can be ignored, though the latter is not feasible | ||
// to happen. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another reason to come here is when the queue is reset before the ack comes back. Add this too in the comment? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That would be considered as a stale ack. |
||
final BatchUniqueIdentifier peekedBatchUniqueIdentifier = recordsToCheck == null ? null : recordsToCheck.batchUniqueIdentifier(); | ||
log.info("{} : Received a stale notification with id {} instead of expected id {} at {}. Will ignore.", shardId, | ||
recordsDeliveryAck.batchUniqueIdentifier(), peekedBatchUniqueIdentifier, Instant.now()); | ||
} | ||
// Take action based on the time spent by the event in queue. | ||
takeDelayedDeliveryActionIfRequired(shardId, eventDeliveryLockAcquireTime, log); | ||
takeDelayedDeliveryActionIfRequired(shardId, lastEventDeliveryTime, log); | ||
} | ||
|
||
// Note : Do not make this method synchronous as notify() will not be able to evict any entry from the queue. | ||
private void addArrivedRecordsInput(PrefetchRecordsRetrieved recordsRetrieved) throws InterruptedException { | ||
wasReset = false; | ||
while (!getRecordsResultQueue.offer(recordsRetrieved, idleMillisBetweenCalls, TimeUnit.MILLISECONDS)) { | ||
|
@@ -248,11 +290,39 @@ private void addArrivedRecordsInput(PrefetchRecordsRetrieved recordsRetrieved) t | |
prefetchCounters.added(recordsRetrieved.processRecordsInput); | ||
} | ||
|
||
/** | ||
* Method that will be called by the 'publisher thread' and the 'demand notifying thread', | ||
* to drain the events if the 'event notifying thread' do not have the control. | ||
*/ | ||
private synchronized void initiateDrainQueueForRequests() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. initiate may not be the best word here. maybe drainQueueForRequestsIfAllowed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep |
||
if(!shouldDrainEventOnAck.get()) { | ||
drainQueueForRequests(); | ||
} | ||
} | ||
|
||
/** | ||
* Method to drain the queue based on the demand and the events availability in the queue. | ||
*/ | ||
private synchronized void drainQueueForRequests() { | ||
while (requestedResponses.get() > 0 && !getRecordsResultQueue.isEmpty()) { | ||
eventDeliveryLock.acquireUninterruptibly(); | ||
eventDeliveryLockAcquireTime = Instant.now(); | ||
subscriber.onNext(getNextResult()); | ||
final RecordsRetrieved recordsToDeliver = peekNextResult(); | ||
// If there is an event available to drain and if there is at least one demand, | ||
// then schedule it for delivery | ||
if (requestedResponses.get() > 0 && recordsToDeliver != null) { | ||
lastEventDeliveryTime = Instant.now(); | ||
subscriber.onNext(recordsToDeliver); | ||
if(!shouldDrainEventOnAck.get()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: missing space between if and ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will address |
||
log.debug("{} : Notifier thread takes over the draining control. Queue Size : {}, Demand : {}", shardId, | ||
getRecordsResultQueue.size(), requestedResponses.get()); | ||
shouldDrainEventOnAck.set(true); | ||
} | ||
} else { | ||
// Since we haven't scheduled the event delivery, give the drain control back to publisher/demand-notifier | ||
// thread. | ||
if(shouldDrainEventOnAck.get()){ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: missing space between if and (. check all such places. |
||
log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}", shardId, | ||
getRecordsResultQueue.size(), requestedResponses.get()); | ||
shouldDrainEventOnAck.set(false); | ||
} | ||
} | ||
} | ||
|
||
|
@@ -263,12 +333,26 @@ static class PrefetchRecordsRetrieved implements RecordsRetrieved { | |
final ProcessRecordsInput processRecordsInput; | ||
final String lastBatchSequenceNumber; | ||
final String shardIterator; | ||
final BatchUniqueIdentifier batchUniqueIdentifier; | ||
|
||
PrefetchRecordsRetrieved prepareForPublish() { | ||
return new PrefetchRecordsRetrieved(processRecordsInput.toBuilder().cacheExitTime(Instant.now()).build(), | ||
lastBatchSequenceNumber, shardIterator); | ||
lastBatchSequenceNumber, shardIterator, batchUniqueIdentifier); | ||
} | ||
|
||
@Override | ||
public BatchUniqueIdentifier batchUniqueIdentifier() { | ||
return batchUniqueIdentifier; | ||
} | ||
|
||
/** | ||
* Generate batch unique identifier for PrefetchRecordsRetrieved, where flow will be empty. | ||
* @return BatchUniqueIdentifier | ||
*/ | ||
public static BatchUniqueIdentifier generateBatchUniqueIdentifier() { | ||
return new BatchUniqueIdentifier(UUID.randomUUID().toString(), | ||
StringUtils.EMPTY); | ||
} | ||
} | ||
|
||
private String calculateHighestSequenceNumber(ProcessRecordsInput processRecordsInput) { | ||
|
@@ -291,15 +375,15 @@ private class DefaultGetRecordsCacheDaemon implements Runnable { | |
public void run() { | ||
while (!isShutdown) { | ||
if (Thread.currentThread().isInterrupted()) { | ||
log.warn("Prefetch thread was interrupted."); | ||
log.warn("{} : Prefetch thread was interrupted.", shardId); | ||
break; | ||
} | ||
|
||
resetLock.readLock().lock(); | ||
try { | ||
makeRetrievalAttempt(); | ||
} catch(PositionResetException pre) { | ||
log.debug("Position was reset while attempting to add item to queue."); | ||
log.debug("{} : Position was reset while attempting to add item to queue.", shardId); | ||
} finally { | ||
resetLock.readLock().unlock(); | ||
} | ||
|
@@ -328,30 +412,31 @@ private void makeRetrievalAttempt() { | |
|
||
highestSequenceNumber = calculateHighestSequenceNumber(processRecordsInput); | ||
PrefetchRecordsRetrieved recordsRetrieved = new PrefetchRecordsRetrieved(processRecordsInput, | ||
highestSequenceNumber, getRecordsResult.nextShardIterator()); | ||
highestSequenceNumber, getRecordsResult.nextShardIterator(), | ||
PrefetchRecordsRetrieved.generateBatchUniqueIdentifier()); | ||
highestSequenceNumber = recordsRetrieved.lastBatchSequenceNumber; | ||
addArrivedRecordsInput(recordsRetrieved); | ||
drainQueueForRequests(); | ||
initiateDrainQueueForRequests(); | ||
} catch (PositionResetException pse) { | ||
throw pse; | ||
} catch (RetryableRetrievalException rre) { | ||
log.info("Timeout occurred while waiting for response from Kinesis. Will retry the request."); | ||
log.info("{} : Timeout occurred while waiting for response from Kinesis. Will retry the request.", shardId); | ||
} catch (InterruptedException e) { | ||
log.info("Thread was interrupted, indicating shutdown was called on the cache."); | ||
log.info("{} : Thread was interrupted, indicating shutdown was called on the cache.", shardId); | ||
} catch (ExpiredIteratorException e) { | ||
log.info("ShardId {}: records threw ExpiredIteratorException - restarting" | ||
log.info("{} : records threw ExpiredIteratorException - restarting" | ||
+ " after greatest seqNum passed to customer", shardId, e); | ||
|
||
scope.addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.COUNT, MetricsLevel.SUMMARY); | ||
|
||
dataFetcher.restartIterator(); | ||
} catch (SdkException e) { | ||
log.error("Exception thrown while fetching records from Kinesis", e); | ||
log.error("{} : Exception thrown while fetching records from Kinesis", shardId, e); | ||
} catch (Throwable e) { | ||
log.error("Unexpected exception was thrown. This could probably be an issue or a bug." + | ||
log.error("{} : Unexpected exception was thrown. This could probably be an issue or a bug." + | ||
" Please search for the exception/error online to check what is going on. If the " + | ||
"issue persists or is a recurring problem, feel free to open an issue on, " + | ||
"https://github.com/awslabs/amazon-kinesis-client.", e); | ||
"https://github.com/awslabs/amazon-kinesis-client.", shardId, e); | ||
} finally { | ||
MetricsUtil.endScope(scope); | ||
} | ||
|
@@ -362,8 +447,8 @@ private void makeRetrievalAttempt() { | |
try { | ||
prefetchCounters.waitForConsumer(); | ||
} catch (InterruptedException ie) { | ||
log.info("Thread was interrupted while waiting for the consumer. " + | ||
"Shutdown has probably been started"); | ||
log.info("{} : Thread was interrupted while waiting for the consumer. " + | ||
"Shutdown has probably been started", shardId); | ||
} | ||
} | ||
} | ||
|
@@ -410,14 +495,14 @@ private long getByteSize(final ProcessRecordsInput result) { | |
|
||
public synchronized void waitForConsumer() throws InterruptedException { | ||
if (!shouldGetNewRecords()) { | ||
log.debug("Queue is full waiting for consumer for {} ms", idleMillisBetweenCalls); | ||
log.debug("{} : Queue is full waiting for consumer for {} ms", shardId, idleMillisBetweenCalls); | ||
this.wait(idleMillisBetweenCalls); | ||
} | ||
} | ||
|
||
public synchronized boolean shouldGetNewRecords() { | ||
if (log.isDebugEnabled()) { | ||
log.debug("Current Prefetch Counter States: {}", this.toString()); | ||
log.debug("{} : Current Prefetch Counter States: {}", shardId, this.toString()); | ||
} | ||
return size < maxRecordsCount && byteSize < maxByteSize; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better if we can define what each thread is doing. Maybe we can add a class comment for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay