-
Notifications
You must be signed in to change notification settings - Fork 467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix to prevent the onNext event going to stale subscription when restart happens in poller #606
Changes from 2 commits
99e21e5
f008f5c
f03e407
e788f10
6257d02
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,11 +22,13 @@ | |
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
import java.util.stream.Collectors; | ||
|
||
import lombok.AccessLevel; | ||
import lombok.Getter; | ||
import lombok.Setter; | ||
import org.apache.commons.lang3.StringUtils; | ||
import org.apache.commons.lang3.Validate; | ||
import org.reactivestreams.Subscriber; | ||
|
@@ -79,8 +81,6 @@ | |
@KinesisClientInternalApi | ||
public class PrefetchRecordsPublisher implements RecordsPublisher { | ||
private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator"; | ||
@VisibleForTesting | ||
LinkedBlockingQueue<PrefetchRecordsRetrieved> getRecordsResultQueue; | ||
private int maxPendingProcessRecordsInput; | ||
private int maxByteSize; | ||
private int maxRecordsCount; | ||
|
@@ -91,26 +91,71 @@ public class PrefetchRecordsPublisher implements RecordsPublisher { | |
private final long idleMillisBetweenCalls; | ||
private Instant lastSuccessfulCall; | ||
private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon; | ||
private PrefetchCounters prefetchCounters; | ||
private boolean started = false; | ||
private final String operation; | ||
private final KinesisDataFetcher dataFetcher; | ||
private final String shardId; | ||
|
||
private Subscriber<? super RecordsRetrieved> subscriber; | ||
private final AtomicLong requestedResponses = new AtomicLong(0); | ||
|
||
private String highestSequenceNumber; | ||
private InitialPositionInStreamExtended initialPositionInStreamExtended; | ||
|
||
@VisibleForTesting @Getter | ||
private final PublisherSession publisherSession; | ||
private final ReentrantReadWriteLock resetLock = new ReentrantReadWriteLock(); | ||
private boolean wasReset = false; | ||
|
||
private Instant lastEventDeliveryTime = Instant.EPOCH; | ||
// This flag controls who should drain the next request in the prefetch queue. | ||
// When set to false, the publisher and demand-notifier thread would have the control. | ||
// When set to true, the event-notifier thread would have the control. | ||
private AtomicBoolean shouldDrainEventOnlyOnAck = new AtomicBoolean(false); | ||
|
||
@Data | ||
@Accessors(fluent = true) | ||
static final class PublisherSession { | ||
private final AtomicLong requestedResponses = new AtomicLong(0); | ||
@VisibleForTesting @Getter | ||
private final LinkedBlockingQueue<PrefetchRecordsRetrieved> prefetchRecordsQueue; | ||
private final PrefetchCounters prefetchCounters; | ||
private final KinesisDataFetcher dataFetcher; | ||
private InitialPositionInStreamExtended initialPositionInStreamExtended; | ||
private String highestSequenceNumber; | ||
|
||
// Initialize the session on publisher start. | ||
void init(ExtendedSequenceNumber extendedSequenceNumber, | ||
InitialPositionInStreamExtended initialPositionInStreamExtended) { | ||
this.initialPositionInStreamExtended = initialPositionInStreamExtended; | ||
this.highestSequenceNumber = extendedSequenceNumber.sequenceNumber(); | ||
this.dataFetcher.initialize(extendedSequenceNumber, initialPositionInStreamExtended); | ||
} | ||
|
||
// Reset the session when publisher restarts. | ||
void reset(PrefetchRecordsRetrieved prefetchRecordsRetrieved) { | ||
// Reset the demand from ShardConsumer, to prevent this publisher from delivering events to stale RX-Java | ||
// Subscriber. Publishing will be unblocked when the demand is communicated by the new Rx-Java subscriber. | ||
requestedResponses.set(0); | ||
// Clear the queue, so that the publisher repopulates the queue based on sequence number from subscriber. | ||
prefetchRecordsQueue.clear(); | ||
prefetchCounters.reset(); | ||
highestSequenceNumber = prefetchRecordsRetrieved.lastBatchSequenceNumber(); | ||
dataFetcher.resetIterator(prefetchRecordsRetrieved.shardIterator(), highestSequenceNumber, | ||
initialPositionInStreamExtended); | ||
} | ||
|
||
// Take action on successful event delivery. | ||
RecordsRetrieved eventDeliveredAction(String shardId) { | ||
final PrefetchRecordsRetrieved result = prefetchRecordsQueue.poll(); | ||
if (result != null) { | ||
updateDemandTrackersOnPublish(result); | ||
} else { | ||
log.info( | ||
"{}: No record batch found while evicting from the prefetch queue. This indicates the prefetch buffer" | ||
+ "was reset.", shardId); | ||
} | ||
return result; | ||
} | ||
|
||
boolean hasDemandToPublish() { | ||
return requestedResponses.get() > 0; | ||
} | ||
|
||
private void updateDemandTrackersOnPublish(PrefetchRecordsRetrieved result) { | ||
prefetchCounters.removed(result.processRecordsInput); | ||
requestedResponses.decrementAndGet(); | ||
} | ||
} | ||
|
||
/** | ||
* Constructor for the PrefetchRecordsPublisher. This cache prefetches records from Kinesis and stores them in a | ||
|
@@ -140,15 +185,14 @@ public PrefetchRecordsPublisher(final int maxPendingProcessRecordsInput, final i | |
this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput; | ||
this.maxByteSize = maxByteSize; | ||
this.maxRecordsCount = maxRecordsCount; | ||
this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxPendingProcessRecordsInput); | ||
this.prefetchCounters = new PrefetchCounters(); | ||
this.publisherSession = new PublisherSession(new LinkedBlockingQueue<>(this.maxPendingProcessRecordsInput), | ||
new PrefetchCounters(), this.getRecordsRetrievalStrategy.getDataFetcher()); | ||
this.executorService = executorService; | ||
this.metricsFactory = new ThreadSafeMetricsDelegatingFactory(metricsFactory); | ||
this.idleMillisBetweenCalls = idleMillisBetweenCalls; | ||
this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon(); | ||
Validate.notEmpty(operation, "Operation cannot be empty"); | ||
this.operation = operation; | ||
this.dataFetcher = this.getRecordsRetrievalStrategy.getDataFetcher(); | ||
this.shardId = shardId; | ||
} | ||
|
||
|
@@ -158,9 +202,7 @@ public void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPosition | |
throw new IllegalStateException("ExecutorService has been shutdown."); | ||
} | ||
|
||
this.initialPositionInStreamExtended = initialPositionInStreamExtended; | ||
highestSequenceNumber = extendedSequenceNumber.sequenceNumber(); | ||
dataFetcher.initialize(extendedSequenceNumber, initialPositionInStreamExtended); | ||
publisherSession.init(extendedSequenceNumber, initialPositionInStreamExtended); | ||
|
||
if (!started) { | ||
log.info("{} : Starting prefetching thread.", shardId); | ||
|
@@ -181,23 +223,13 @@ private void throwOnIllegalState() { | |
|
||
private RecordsRetrieved peekNextResult() { | ||
throwOnIllegalState(); | ||
final PrefetchRecordsRetrieved result = getRecordsResultQueue.peek(); | ||
return result == null ? result : result.prepareForPublish(); | ||
return publisherSession.prefetchRecordsQueue().peek(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would be nice to abstract away the queue within the session. i.e. publisherSession.peekNext() There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
} | ||
|
||
@VisibleForTesting | ||
RecordsRetrieved pollNextResultAndUpdatePrefetchCounters() { | ||
RecordsRetrieved evictPublishedEvent() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seem to be used only in tests after the refactoring. Let's have the tests use session, or have this helper use session directly, without any other actions. |
||
throwOnIllegalState(); | ||
final PrefetchRecordsRetrieved result = getRecordsResultQueue.poll(); | ||
if (result != null) { | ||
prefetchCounters.removed(result.processRecordsInput); | ||
requestedResponses.decrementAndGet(); | ||
} else { | ||
log.info( | ||
"{}: No record batch found while evicting from the prefetch queue. This indicates the prefetch buffer" | ||
+ "was reset.", shardId); | ||
} | ||
return result; | ||
return publisherSession.eventDeliveredAction(shardId); | ||
} | ||
|
||
@Override | ||
|
@@ -213,21 +245,9 @@ public void restartFrom(RecordsRetrieved recordsRetrieved) { | |
throw new IllegalArgumentException( | ||
"Provided RecordsRetrieved was not produced by the PrefetchRecordsPublisher"); | ||
} | ||
PrefetchRecordsRetrieved prefetchRecordsRetrieved = (PrefetchRecordsRetrieved) recordsRetrieved; | ||
resetLock.writeLock().lock(); | ||
try { | ||
getRecordsResultQueue.clear(); | ||
|
||
// Give the drain control to publisher/demand-notifier thread. | ||
log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}", shardId, | ||
getRecordsResultQueue.size(), requestedResponses.get()); | ||
shouldDrainEventOnlyOnAck.set(false); | ||
|
||
prefetchCounters.reset(); | ||
|
||
highestSequenceNumber = prefetchRecordsRetrieved.lastBatchSequenceNumber(); | ||
dataFetcher.resetIterator(prefetchRecordsRetrieved.shardIterator(), highestSequenceNumber, | ||
initialPositionInStreamExtended); | ||
publisherSession.reset((PrefetchRecordsRetrieved)recordsRetrieved); | ||
wasReset = true; | ||
} finally { | ||
resetLock.writeLock().unlock(); | ||
|
@@ -240,13 +260,13 @@ public void subscribe(Subscriber<? super RecordsRetrieved> s) { | |
subscriber.onSubscribe(new Subscription() { | ||
@Override | ||
public void request(long n) { | ||
requestedResponses.addAndGet(n); | ||
drainQueueForRequestsIfAllowed(); | ||
publisherSession.requestedResponses().addAndGet(n); | ||
drainQueueForRequests(); | ||
} | ||
|
||
@Override | ||
public void cancel() { | ||
requestedResponses.set(0); | ||
publisherSession.requestedResponses().set(0); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This feels like it should be resetting, instead of setting demand to 0. We already discussed this offline, will be good to leave a comment. |
||
} | ||
}); | ||
} | ||
|
@@ -257,16 +277,9 @@ public synchronized void notify(RecordsDeliveryAck recordsDeliveryAck) { | |
// Verify if the ack matches the head of the queue and evict it. | ||
if (recordsToCheck != null && recordsToCheck.batchUniqueIdentifier() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can this if/else logic moved into session? i.e. handleRecordsDeliveryAck(RecordsDeliveryAck ack) I see we're checking for illegalState in evictPublishedEvent, which we can do before calling handleRecordsDeliveryAck() This will simplify the code a bit, will be no need to have methods/access to the queue directly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
.equals(recordsDeliveryAck.batchUniqueIdentifier())) { | ||
pollNextResultAndUpdatePrefetchCounters(); | ||
evictPublishedEvent(); | ||
// Upon evicting, check if queue is empty. if yes, then give the drain control back to publisher thread. | ||
if (getRecordsResultQueue.isEmpty()) { | ||
log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}", | ||
shardId, getRecordsResultQueue.size(), requestedResponses.get()); | ||
shouldDrainEventOnlyOnAck.set(false); | ||
} else { | ||
// Else attempt to drain the queue. | ||
drainQueueForRequests(); | ||
} | ||
drainQueueForRequests(); | ||
} else { | ||
// Log and ignore any other ack received. As long as an ack is received for head of the queue | ||
// we are good. Any stale or future ack received can be ignored, though the latter is not feasible | ||
|
@@ -283,7 +296,7 @@ public synchronized void notify(RecordsDeliveryAck recordsDeliveryAck) { | |
// Note : Do not make this method synchronous as notify() will not be able to evict any entry from the queue. | ||
private void addArrivedRecordsInput(PrefetchRecordsRetrieved recordsRetrieved) throws InterruptedException { | ||
wasReset = false; | ||
while (!getRecordsResultQueue.offer(recordsRetrieved, idleMillisBetweenCalls, TimeUnit.MILLISECONDS)) { | ||
while (!publisherSession.prefetchRecordsQueue().offer(recordsRetrieved, idleMillisBetweenCalls, TimeUnit.MILLISECONDS)) { | ||
ashwing marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// | ||
// Unlocking the read lock, and then reacquiring the read lock, should allow any waiters on the write lock a | ||
// chance to run. If the write lock is acquired by restartFrom than the readLock will now block until | ||
|
@@ -296,45 +309,27 @@ private void addArrivedRecordsInput(PrefetchRecordsRetrieved recordsRetrieved) t | |
throw new PositionResetException(); | ||
} | ||
} | ||
prefetchCounters.added(recordsRetrieved.processRecordsInput); | ||
} | ||
|
||
/** | ||
* Method that will be called by the 'publisher thread' and the 'demand notifying thread', | ||
* to drain the events if the 'event notifying thread' do not have the control. | ||
*/ | ||
private synchronized void drainQueueForRequestsIfAllowed() { | ||
if (!shouldDrainEventOnlyOnAck.get()) { | ||
drainQueueForRequests(); | ||
} | ||
publisherSession.prefetchCounters().added(recordsRetrieved.processRecordsInput); | ||
} | ||
|
||
/** | ||
* Method to drain the queue based on the demand and the events availability in the queue. | ||
*/ | ||
private synchronized void drainQueueForRequests() { | ||
final RecordsRetrieved recordsToDeliver = peekNextResult(); | ||
final PrefetchRecordsRetrieved recordsToDeliver = (PrefetchRecordsRetrieved) peekNextResult(); | ||
ashwing marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// If there is an event available to drain and if there is at least one demand, | ||
// then schedule it for delivery | ||
if (requestedResponses.get() > 0 && recordsToDeliver != null) { | ||
if (publisherSession.hasDemandToPublish() && canDispatchRecord(recordsToDeliver)) { | ||
subscriber.onNext(recordsToDeliver.prepareForPublish()); | ||
recordsToDeliver.dispatched(); | ||
lastEventDeliveryTime = Instant.now(); | ||
subscriber.onNext(recordsToDeliver); | ||
if (!shouldDrainEventOnlyOnAck.get()) { | ||
log.debug("{} : Notifier thread takes over the draining control. Queue Size : {}, Demand : {}", shardId, | ||
getRecordsResultQueue.size(), requestedResponses.get()); | ||
shouldDrainEventOnlyOnAck.set(true); | ||
} | ||
} else { | ||
// Since we haven't scheduled the event delivery, give the drain control back to publisher/demand-notifier | ||
// thread. | ||
if (shouldDrainEventOnlyOnAck.get()) { | ||
log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}", | ||
shardId, getRecordsResultQueue.size(), requestedResponses.get()); | ||
shouldDrainEventOnlyOnAck.set(false); | ||
} | ||
} | ||
} | ||
|
||
private static boolean canDispatchRecord(PrefetchRecordsRetrieved recordsToDeliver) { | ||
ashwing marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return recordsToDeliver != null && !recordsToDeliver.isDispatched(); | ||
} | ||
|
||
@Accessors(fluent = true) | ||
@Data | ||
static class PrefetchRecordsRetrieved implements RecordsRetrieved { | ||
|
@@ -343,6 +338,7 @@ static class PrefetchRecordsRetrieved implements RecordsRetrieved { | |
final String lastBatchSequenceNumber; | ||
final String shardIterator; | ||
final BatchUniqueIdentifier batchUniqueIdentifier; | ||
@Accessors() @Setter(AccessLevel.NONE) boolean dispatched = false; | ||
|
||
PrefetchRecordsRetrieved prepareForPublish() { | ||
return new PrefetchRecordsRetrieved(processRecordsInput.toBuilder().cacheExitTime(Instant.now()).build(), | ||
|
@@ -354,6 +350,9 @@ public BatchUniqueIdentifier batchUniqueIdentifier() { | |
return batchUniqueIdentifier; | ||
} | ||
|
||
// Indicates if this record batch was already dispatched for delivery. | ||
void dispatched() { dispatched = true; } | ||
|
||
/** | ||
* Generate batch unique identifier for PrefetchRecordsRetrieved, where flow will be empty. | ||
* @return BatchUniqueIdentifier | ||
|
@@ -362,10 +361,11 @@ public static BatchUniqueIdentifier generateBatchUniqueIdentifier() { | |
return new BatchUniqueIdentifier(UUID.randomUUID().toString(), | ||
StringUtils.EMPTY); | ||
} | ||
|
||
} | ||
|
||
private String calculateHighestSequenceNumber(ProcessRecordsInput processRecordsInput) { | ||
String result = this.highestSequenceNumber; | ||
String result = publisherSession.highestSequenceNumber(); | ||
if (processRecordsInput.records() != null && !processRecordsInput.records().isEmpty()) { | ||
result = processRecordsInput.records().get(processRecordsInput.records().size() - 1).sequenceNumber(); | ||
} | ||
|
@@ -404,7 +404,7 @@ public void run() { | |
|
||
private void makeRetrievalAttempt() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. something to consider: does this method fit better into session now -- there will be direct access to counters, and waiting etc. |
||
MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, operation); | ||
if (prefetchCounters.shouldGetNewRecords()) { | ||
if (publisherSession.prefetchCounters().shouldGetNewRecords()) { | ||
try { | ||
sleepBeforeNextCall(); | ||
GetRecordsResponse getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); | ||
|
@@ -419,13 +419,12 @@ private void makeRetrievalAttempt() { | |
.isAtShardEnd(getRecordsRetrievalStrategy.getDataFetcher().isShardEndReached()) | ||
.build(); | ||
|
||
highestSequenceNumber = calculateHighestSequenceNumber(processRecordsInput); | ||
PrefetchRecordsRetrieved recordsRetrieved = new PrefetchRecordsRetrieved(processRecordsInput, | ||
highestSequenceNumber, getRecordsResult.nextShardIterator(), | ||
calculateHighestSequenceNumber(processRecordsInput), getRecordsResult.nextShardIterator(), | ||
PrefetchRecordsRetrieved.generateBatchUniqueIdentifier()); | ||
highestSequenceNumber = recordsRetrieved.lastBatchSequenceNumber; | ||
publisherSession.highestSequenceNumber(recordsRetrieved.lastBatchSequenceNumber); | ||
addArrivedRecordsInput(recordsRetrieved); | ||
drainQueueForRequestsIfAllowed(); | ||
drainQueueForRequests(); | ||
} catch (PositionResetException pse) { | ||
throw pse; | ||
} catch (RetryableRetrievalException rre) { | ||
|
@@ -438,7 +437,7 @@ private void makeRetrievalAttempt() { | |
|
||
scope.addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.COUNT, MetricsLevel.SUMMARY); | ||
|
||
dataFetcher.restartIterator(); | ||
publisherSession.dataFetcher().restartIterator(); | ||
} catch (SdkException e) { | ||
log.error("{} : Exception thrown while fetching records from Kinesis", shardId, e); | ||
} catch (Throwable e) { | ||
|
@@ -454,7 +453,7 @@ private void makeRetrievalAttempt() { | |
// Consumer isn't ready to receive new records will allow prefetch counters to pause | ||
// | ||
try { | ||
prefetchCounters.waitForConsumer(); | ||
publisherSession.prefetchCounters().waitForConsumer(); | ||
} catch (InterruptedException ie) { | ||
log.info("{} : Thread was interrupted while waiting for the consumer. " + | ||
"Shutdown has probably been started", shardId); | ||
|
@@ -523,7 +522,7 @@ void reset() { | |
|
||
@Override | ||
public String toString() { | ||
return String.format("{ Requests: %d, Records: %d, Bytes: %d }", getRecordsResultQueue.size(), size, | ||
return String.format("{ Requests: %d, Records: %d, Bytes: %d }", publisherSession.prefetchRecordsQueue().size(), size, | ||
byteSize); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: handleEventDeliveryNotification()