Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to prevent the onNext event going to stale subscription when restart happens in poller #606

Merged
merged 5 commits into from
Sep 19, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.reactivestreams.Subscriber;
Expand Down Expand Up @@ -79,8 +81,6 @@
@KinesisClientInternalApi
public class PrefetchRecordsPublisher implements RecordsPublisher {
private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
@VisibleForTesting
LinkedBlockingQueue<PrefetchRecordsRetrieved> getRecordsResultQueue;
private int maxPendingProcessRecordsInput;
private int maxByteSize;
private int maxRecordsCount;
Expand All @@ -91,26 +91,71 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
private final long idleMillisBetweenCalls;
private Instant lastSuccessfulCall;
private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon;
private PrefetchCounters prefetchCounters;
private boolean started = false;
private final String operation;
private final KinesisDataFetcher dataFetcher;
private final String shardId;

private Subscriber<? super RecordsRetrieved> subscriber;
private final AtomicLong requestedResponses = new AtomicLong(0);

private String highestSequenceNumber;
private InitialPositionInStreamExtended initialPositionInStreamExtended;

@VisibleForTesting @Getter
private final PublisherSession publisherSession;
private final ReentrantReadWriteLock resetLock = new ReentrantReadWriteLock();
private boolean wasReset = false;

private Instant lastEventDeliveryTime = Instant.EPOCH;
// This flag controls who should drain the next request in the prefetch queue.
// When set to false, the publisher and demand-notifier thread would have the control.
// When set to true, the event-notifier thread would have the control.
private AtomicBoolean shouldDrainEventOnlyOnAck = new AtomicBoolean(false);

@Data
@Accessors(fluent = true)
static final class PublisherSession {
private final AtomicLong requestedResponses = new AtomicLong(0);
@VisibleForTesting @Getter
private final LinkedBlockingQueue<PrefetchRecordsRetrieved> prefetchRecordsQueue;
private final PrefetchCounters prefetchCounters;
private final KinesisDataFetcher dataFetcher;
private InitialPositionInStreamExtended initialPositionInStreamExtended;
private String highestSequenceNumber;

// Initialize the session on publisher start.
void init(ExtendedSequenceNumber extendedSequenceNumber,
InitialPositionInStreamExtended initialPositionInStreamExtended) {
this.initialPositionInStreamExtended = initialPositionInStreamExtended;
this.highestSequenceNumber = extendedSequenceNumber.sequenceNumber();
this.dataFetcher.initialize(extendedSequenceNumber, initialPositionInStreamExtended);
}

// Reset the session when publisher restarts.
void reset(PrefetchRecordsRetrieved prefetchRecordsRetrieved) {
// Reset the demand from ShardConsumer, to prevent this publisher from delivering events to stale RX-Java
// Subscriber. Publishing will be unblocked when the demand is communicated by the new Rx-Java subscriber.
requestedResponses.set(0);
// Clear the queue, so that the publisher repopulates the queue based on sequence number from subscriber.
prefetchRecordsQueue.clear();
prefetchCounters.reset();
highestSequenceNumber = prefetchRecordsRetrieved.lastBatchSequenceNumber();
dataFetcher.resetIterator(prefetchRecordsRetrieved.shardIterator(), highestSequenceNumber,
initialPositionInStreamExtended);
}

// Take action on successful event delivery.
RecordsRetrieved eventDeliveredAction(String shardId) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: handleEventDeliveryNotification()

final PrefetchRecordsRetrieved result = prefetchRecordsQueue.poll();
if (result != null) {
updateDemandTrackersOnPublish(result);
} else {
log.info(
"{}: No record batch found while evicting from the prefetch queue. This indicates the prefetch buffer"
+ "was reset.", shardId);
}
return result;
}

boolean hasDemandToPublish() {
return requestedResponses.get() > 0;
}

private void updateDemandTrackersOnPublish(PrefetchRecordsRetrieved result) {
prefetchCounters.removed(result.processRecordsInput);
requestedResponses.decrementAndGet();
}
}

/**
* Constructor for the PrefetchRecordsPublisher. This cache prefetches records from Kinesis and stores them in a
Expand Down Expand Up @@ -140,15 +185,14 @@ public PrefetchRecordsPublisher(final int maxPendingProcessRecordsInput, final i
this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput;
this.maxByteSize = maxByteSize;
this.maxRecordsCount = maxRecordsCount;
this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxPendingProcessRecordsInput);
this.prefetchCounters = new PrefetchCounters();
this.publisherSession = new PublisherSession(new LinkedBlockingQueue<>(this.maxPendingProcessRecordsInput),
new PrefetchCounters(), this.getRecordsRetrievalStrategy.getDataFetcher());
this.executorService = executorService;
this.metricsFactory = new ThreadSafeMetricsDelegatingFactory(metricsFactory);
this.idleMillisBetweenCalls = idleMillisBetweenCalls;
this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon();
Validate.notEmpty(operation, "Operation cannot be empty");
this.operation = operation;
this.dataFetcher = this.getRecordsRetrievalStrategy.getDataFetcher();
this.shardId = shardId;
}

Expand All @@ -158,9 +202,7 @@ public void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPosition
throw new IllegalStateException("ExecutorService has been shutdown.");
}

this.initialPositionInStreamExtended = initialPositionInStreamExtended;
highestSequenceNumber = extendedSequenceNumber.sequenceNumber();
dataFetcher.initialize(extendedSequenceNumber, initialPositionInStreamExtended);
publisherSession.init(extendedSequenceNumber, initialPositionInStreamExtended);

if (!started) {
log.info("{} : Starting prefetching thread.", shardId);
Expand All @@ -181,23 +223,13 @@ private void throwOnIllegalState() {

private RecordsRetrieved peekNextResult() {
throwOnIllegalState();
final PrefetchRecordsRetrieved result = getRecordsResultQueue.peek();
return result == null ? result : result.prepareForPublish();
return publisherSession.prefetchRecordsQueue().peek();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to abstract away the queue within the session. i.e. publisherSession.peekNext()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

@VisibleForTesting
RecordsRetrieved pollNextResultAndUpdatePrefetchCounters() {
RecordsRetrieved evictPublishedEvent() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seem to be used only in tests after the refactoring. Let's have the tests use session, or have this helper use session directly, without any other actions.

throwOnIllegalState();
final PrefetchRecordsRetrieved result = getRecordsResultQueue.poll();
if (result != null) {
prefetchCounters.removed(result.processRecordsInput);
requestedResponses.decrementAndGet();
} else {
log.info(
"{}: No record batch found while evicting from the prefetch queue. This indicates the prefetch buffer"
+ "was reset.", shardId);
}
return result;
return publisherSession.eventDeliveredAction(shardId);
}

@Override
Expand All @@ -213,21 +245,9 @@ public void restartFrom(RecordsRetrieved recordsRetrieved) {
throw new IllegalArgumentException(
"Provided RecordsRetrieved was not produced by the PrefetchRecordsPublisher");
}
PrefetchRecordsRetrieved prefetchRecordsRetrieved = (PrefetchRecordsRetrieved) recordsRetrieved;
resetLock.writeLock().lock();
try {
getRecordsResultQueue.clear();

// Give the drain control to publisher/demand-notifier thread.
log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}", shardId,
getRecordsResultQueue.size(), requestedResponses.get());
shouldDrainEventOnlyOnAck.set(false);

prefetchCounters.reset();

highestSequenceNumber = prefetchRecordsRetrieved.lastBatchSequenceNumber();
dataFetcher.resetIterator(prefetchRecordsRetrieved.shardIterator(), highestSequenceNumber,
initialPositionInStreamExtended);
publisherSession.reset((PrefetchRecordsRetrieved)recordsRetrieved);
wasReset = true;
} finally {
resetLock.writeLock().unlock();
Expand All @@ -240,13 +260,13 @@ public void subscribe(Subscriber<? super RecordsRetrieved> s) {
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
requestedResponses.addAndGet(n);
drainQueueForRequestsIfAllowed();
publisherSession.requestedResponses().addAndGet(n);
drainQueueForRequests();
}

@Override
public void cancel() {
requestedResponses.set(0);
publisherSession.requestedResponses().set(0);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like it should be resetting, instead of setting demand to 0. We already discussed this offline, will be good to leave a comment.

}
});
}
Expand All @@ -257,16 +277,9 @@ public synchronized void notify(RecordsDeliveryAck recordsDeliveryAck) {
// Verify if the ack matches the head of the queue and evict it.
if (recordsToCheck != null && recordsToCheck.batchUniqueIdentifier()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this if/else logic moved into session? i.e. handleRecordsDeliveryAck(RecordsDeliveryAck ack)

I see we're checking for illegalState in evictPublishedEvent, which we can do before calling handleRecordsDeliveryAck()

This will simplify the code a bit, will be no need to have methods/access to the queue directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

.equals(recordsDeliveryAck.batchUniqueIdentifier())) {
pollNextResultAndUpdatePrefetchCounters();
evictPublishedEvent();
// Upon evicting, check if queue is empty. if yes, then give the drain control back to publisher thread.
if (getRecordsResultQueue.isEmpty()) {
log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}",
shardId, getRecordsResultQueue.size(), requestedResponses.get());
shouldDrainEventOnlyOnAck.set(false);
} else {
// Else attempt to drain the queue.
drainQueueForRequests();
}
drainQueueForRequests();
} else {
// Log and ignore any other ack received. As long as an ack is received for head of the queue
// we are good. Any stale or future ack received can be ignored, though the latter is not feasible
Expand All @@ -283,7 +296,7 @@ public synchronized void notify(RecordsDeliveryAck recordsDeliveryAck) {
// Note : Do not make this method synchronous as notify() will not be able to evict any entry from the queue.
private void addArrivedRecordsInput(PrefetchRecordsRetrieved recordsRetrieved) throws InterruptedException {
wasReset = false;
while (!getRecordsResultQueue.offer(recordsRetrieved, idleMillisBetweenCalls, TimeUnit.MILLISECONDS)) {
while (!publisherSession.prefetchRecordsQueue().offer(recordsRetrieved, idleMillisBetweenCalls, TimeUnit.MILLISECONDS)) {
ashwing marked this conversation as resolved.
Show resolved Hide resolved
//
// Unlocking the read lock, and then reacquiring the read lock, should allow any waiters on the write lock a
// chance to run. If the write lock is acquired by restartFrom than the readLock will now block until
Expand All @@ -296,45 +309,27 @@ private void addArrivedRecordsInput(PrefetchRecordsRetrieved recordsRetrieved) t
throw new PositionResetException();
}
}
prefetchCounters.added(recordsRetrieved.processRecordsInput);
}

/**
* Method that will be called by the 'publisher thread' and the 'demand notifying thread',
* to drain the events if the 'event notifying thread' do not have the control.
*/
private synchronized void drainQueueForRequestsIfAllowed() {
if (!shouldDrainEventOnlyOnAck.get()) {
drainQueueForRequests();
}
publisherSession.prefetchCounters().added(recordsRetrieved.processRecordsInput);
}

/**
* Method to drain the queue based on the demand and the events availability in the queue.
*/
private synchronized void drainQueueForRequests() {
final RecordsRetrieved recordsToDeliver = peekNextResult();
final PrefetchRecordsRetrieved recordsToDeliver = (PrefetchRecordsRetrieved) peekNextResult();
ashwing marked this conversation as resolved.
Show resolved Hide resolved
// If there is an event available to drain and if there is at least one demand,
// then schedule it for delivery
if (requestedResponses.get() > 0 && recordsToDeliver != null) {
if (publisherSession.hasDemandToPublish() && canDispatchRecord(recordsToDeliver)) {
subscriber.onNext(recordsToDeliver.prepareForPublish());
recordsToDeliver.dispatched();
lastEventDeliveryTime = Instant.now();
subscriber.onNext(recordsToDeliver);
if (!shouldDrainEventOnlyOnAck.get()) {
log.debug("{} : Notifier thread takes over the draining control. Queue Size : {}, Demand : {}", shardId,
getRecordsResultQueue.size(), requestedResponses.get());
shouldDrainEventOnlyOnAck.set(true);
}
} else {
// Since we haven't scheduled the event delivery, give the drain control back to publisher/demand-notifier
// thread.
if (shouldDrainEventOnlyOnAck.get()) {
log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}",
shardId, getRecordsResultQueue.size(), requestedResponses.get());
shouldDrainEventOnlyOnAck.set(false);
}
}
}

private static boolean canDispatchRecord(PrefetchRecordsRetrieved recordsToDeliver) {
ashwing marked this conversation as resolved.
Show resolved Hide resolved
return recordsToDeliver != null && !recordsToDeliver.isDispatched();
}

@Accessors(fluent = true)
@Data
static class PrefetchRecordsRetrieved implements RecordsRetrieved {
Expand All @@ -343,6 +338,7 @@ static class PrefetchRecordsRetrieved implements RecordsRetrieved {
final String lastBatchSequenceNumber;
final String shardIterator;
final BatchUniqueIdentifier batchUniqueIdentifier;
@Accessors() @Setter(AccessLevel.NONE) boolean dispatched = false;

PrefetchRecordsRetrieved prepareForPublish() {
return new PrefetchRecordsRetrieved(processRecordsInput.toBuilder().cacheExitTime(Instant.now()).build(),
Expand All @@ -354,6 +350,9 @@ public BatchUniqueIdentifier batchUniqueIdentifier() {
return batchUniqueIdentifier;
}

// Indicates if this record batch was already dispatched for delivery.
void dispatched() { dispatched = true; }

/**
* Generate batch unique identifier for PrefetchRecordsRetrieved, where flow will be empty.
* @return BatchUniqueIdentifier
Expand All @@ -362,10 +361,11 @@ public static BatchUniqueIdentifier generateBatchUniqueIdentifier() {
return new BatchUniqueIdentifier(UUID.randomUUID().toString(),
StringUtils.EMPTY);
}

}

private String calculateHighestSequenceNumber(ProcessRecordsInput processRecordsInput) {
String result = this.highestSequenceNumber;
String result = publisherSession.highestSequenceNumber();
if (processRecordsInput.records() != null && !processRecordsInput.records().isEmpty()) {
result = processRecordsInput.records().get(processRecordsInput.records().size() - 1).sequenceNumber();
}
Expand Down Expand Up @@ -404,7 +404,7 @@ public void run() {

private void makeRetrievalAttempt() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something to consider: does this method fit better into session now -- there will be direct access to counters, and waiting etc.
Only when retrievalAttempt call to session returns success, then PrefetchRecordsPublisher would call drainQueue.

MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, operation);
if (prefetchCounters.shouldGetNewRecords()) {
if (publisherSession.prefetchCounters().shouldGetNewRecords()) {
try {
sleepBeforeNextCall();
GetRecordsResponse getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
Expand All @@ -419,13 +419,12 @@ private void makeRetrievalAttempt() {
.isAtShardEnd(getRecordsRetrievalStrategy.getDataFetcher().isShardEndReached())
.build();

highestSequenceNumber = calculateHighestSequenceNumber(processRecordsInput);
PrefetchRecordsRetrieved recordsRetrieved = new PrefetchRecordsRetrieved(processRecordsInput,
highestSequenceNumber, getRecordsResult.nextShardIterator(),
calculateHighestSequenceNumber(processRecordsInput), getRecordsResult.nextShardIterator(),
PrefetchRecordsRetrieved.generateBatchUniqueIdentifier());
highestSequenceNumber = recordsRetrieved.lastBatchSequenceNumber;
publisherSession.highestSequenceNumber(recordsRetrieved.lastBatchSequenceNumber);
addArrivedRecordsInput(recordsRetrieved);
drainQueueForRequestsIfAllowed();
drainQueueForRequests();
} catch (PositionResetException pse) {
throw pse;
} catch (RetryableRetrievalException rre) {
Expand All @@ -438,7 +437,7 @@ private void makeRetrievalAttempt() {

scope.addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.COUNT, MetricsLevel.SUMMARY);

dataFetcher.restartIterator();
publisherSession.dataFetcher().restartIterator();
} catch (SdkException e) {
log.error("{} : Exception thrown while fetching records from Kinesis", shardId, e);
} catch (Throwable e) {
Expand All @@ -454,7 +453,7 @@ private void makeRetrievalAttempt() {
// Consumer isn't ready to receive new records will allow prefetch counters to pause
//
try {
prefetchCounters.waitForConsumer();
publisherSession.prefetchCounters().waitForConsumer();
} catch (InterruptedException ie) {
log.info("{} : Thread was interrupted while waiting for the consumer. " +
"Shutdown has probably been started", shardId);
Expand Down Expand Up @@ -523,7 +522,7 @@ void reset() {

@Override
public String toString() {
return String.format("{ Requests: %d, Records: %d, Bytes: %d }", getRecordsResultQueue.size(), size,
return String.format("{ Requests: %d, Records: %d, Bytes: %d }", publisherSession.prefetchRecordsQueue().size(), size,
byteSize);
}
}
Expand Down
Loading