Skip to content

Commit

Permalink
Isolating session variables into a new class. Replacing thread contro…
Browse files Browse the repository at this point in the history
…l shifting logic for publishing with monitor based control
  • Loading branch information
girida-amazon committed Sep 12, 2019
1 parent 99e21e5 commit f008f5c
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.reactivestreams.Subscriber;
Expand Down Expand Up @@ -79,8 +81,6 @@
@KinesisClientInternalApi
public class PrefetchRecordsPublisher implements RecordsPublisher {
private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
@VisibleForTesting
LinkedBlockingQueue<PrefetchRecordsRetrieved> getRecordsResultQueue;
private int maxPendingProcessRecordsInput;
private int maxByteSize;
private int maxRecordsCount;
Expand All @@ -91,26 +91,71 @@ public class PrefetchRecordsPublisher implements RecordsPublisher {
private final long idleMillisBetweenCalls;
private Instant lastSuccessfulCall;
private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon;
private PrefetchCounters prefetchCounters;
private boolean started = false;
private final String operation;
private final KinesisDataFetcher dataFetcher;
private final String shardId;

private Subscriber<? super RecordsRetrieved> subscriber;
private final AtomicLong requestedResponses = new AtomicLong(0);

private String highestSequenceNumber;
private InitialPositionInStreamExtended initialPositionInStreamExtended;

@VisibleForTesting @Getter
private final PublisherSession publisherSession;
private final ReentrantReadWriteLock resetLock = new ReentrantReadWriteLock();
private boolean wasReset = false;

private Instant lastEventDeliveryTime = Instant.EPOCH;
// This flag controls who should drain the next request in the prefetch queue.
// When set to false, the publisher and demand-notifier thread would have the control.
// When set to true, the event-notifier thread would have the control.
private AtomicBoolean shouldDrainEventOnlyOnAck = new AtomicBoolean(false);

@Data
@Accessors(fluent = true)
static final class PublisherSession {
private final AtomicLong requestedResponses = new AtomicLong(0);
@VisibleForTesting @Getter
private final LinkedBlockingQueue<PrefetchRecordsRetrieved> prefetchRecordsQueue;
private final PrefetchCounters prefetchCounters;
private final KinesisDataFetcher dataFetcher;
private InitialPositionInStreamExtended initialPositionInStreamExtended;
private String highestSequenceNumber;

// Initialize the session on publisher start.
void init(ExtendedSequenceNumber extendedSequenceNumber,
InitialPositionInStreamExtended initialPositionInStreamExtended) {
this.initialPositionInStreamExtended = initialPositionInStreamExtended;
this.highestSequenceNumber = extendedSequenceNumber.sequenceNumber();
this.dataFetcher.initialize(extendedSequenceNumber, initialPositionInStreamExtended);
}

// Reset the session when publisher restarts.
void reset(PrefetchRecordsRetrieved prefetchRecordsRetrieved) {
// Reset the demand from ShardConsumer, to prevent this publisher from delivering events to stale RX-Java
// Subscriber. Publishing will be unblocked when the demand is communicated by the new Rx-Java subscriber.
requestedResponses.set(0);
// Clear the queue, so that the publisher repopulates the queue based on sequence number from subscriber.
prefetchRecordsQueue.clear();
prefetchCounters.reset();
highestSequenceNumber = prefetchRecordsRetrieved.lastBatchSequenceNumber();
dataFetcher.resetIterator(prefetchRecordsRetrieved.shardIterator(), highestSequenceNumber,

This comment has been minimized.

Copy link
@yasemin-amzn

yasemin-amzn Sep 13, 2019

Is there any critical state stored in dataFetcher instance? Should we instead initialize a new PublisherSession instance in the PrefetchedRecordsPublisher, instead of adding resetting the same instance?

initialPositionInStreamExtended);
}

// Take action on successful event delivery.
RecordsRetrieved eventDeliveredAction(String shardId) {
final PrefetchRecordsRetrieved result = prefetchRecordsQueue.poll();
if (result != null) {
updateDemandTrackersOnPublish(result);
} else {
log.info(
"{}: No record batch found while evicting from the prefetch queue. This indicates the prefetch buffer"
+ "was reset.", shardId);
}
return result;
}

boolean hasDemandToPublish() {
return requestedResponses.get() > 0;
}

private void updateDemandTrackersOnPublish(PrefetchRecordsRetrieved result) {
prefetchCounters.removed(result.processRecordsInput);
requestedResponses.decrementAndGet();
}
}

/**
* Constructor for the PrefetchRecordsPublisher. This cache prefetches records from Kinesis and stores them in a
Expand Down Expand Up @@ -140,15 +185,14 @@ public PrefetchRecordsPublisher(final int maxPendingProcessRecordsInput, final i
this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput;
this.maxByteSize = maxByteSize;
this.maxRecordsCount = maxRecordsCount;
this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxPendingProcessRecordsInput);
this.prefetchCounters = new PrefetchCounters();
this.publisherSession = new PublisherSession(new LinkedBlockingQueue<>(this.maxPendingProcessRecordsInput),
new PrefetchCounters(), this.getRecordsRetrievalStrategy.getDataFetcher());
this.executorService = executorService;
this.metricsFactory = new ThreadSafeMetricsDelegatingFactory(metricsFactory);
this.idleMillisBetweenCalls = idleMillisBetweenCalls;
this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon();
Validate.notEmpty(operation, "Operation cannot be empty");
this.operation = operation;
this.dataFetcher = this.getRecordsRetrievalStrategy.getDataFetcher();
this.shardId = shardId;
}

Expand All @@ -158,9 +202,7 @@ public void start(ExtendedSequenceNumber extendedSequenceNumber, InitialPosition
throw new IllegalStateException("ExecutorService has been shutdown.");
}

this.initialPositionInStreamExtended = initialPositionInStreamExtended;
highestSequenceNumber = extendedSequenceNumber.sequenceNumber();
dataFetcher.initialize(extendedSequenceNumber, initialPositionInStreamExtended);
publisherSession.init(extendedSequenceNumber, initialPositionInStreamExtended);

if (!started) {
log.info("{} : Starting prefetching thread.", shardId);
Expand All @@ -181,23 +223,13 @@ private void throwOnIllegalState() {

private RecordsRetrieved peekNextResult() {
throwOnIllegalState();
final PrefetchRecordsRetrieved result = getRecordsResultQueue.peek();
return result == null ? result : result.prepareForPublish();
return publisherSession.prefetchRecordsQueue().peek();
}

@VisibleForTesting
RecordsRetrieved pollNextResultAndUpdatePrefetchCounters() {
RecordsRetrieved evictPublishedEvent() {
throwOnIllegalState();
final PrefetchRecordsRetrieved result = getRecordsResultQueue.poll();
if (result != null) {
prefetchCounters.removed(result.processRecordsInput);
requestedResponses.decrementAndGet();
} else {
log.info(
"{}: No record batch found while evicting from the prefetch queue. This indicates the prefetch buffer"
+ "was reset.", shardId);
}
return result;
return publisherSession.eventDeliveredAction(shardId);
}

@Override
Expand All @@ -213,24 +245,9 @@ public void restartFrom(RecordsRetrieved recordsRetrieved) {
throw new IllegalArgumentException(
"Provided RecordsRetrieved was not produced by the PrefetchRecordsPublisher");
}
PrefetchRecordsRetrieved prefetchRecordsRetrieved = (PrefetchRecordsRetrieved) recordsRetrieved;
resetLock.writeLock().lock();
try {
// Reset the demand from ShardConsumer, to prevent this publisher from delivering events to stale RX-Java
// Subscriber. Publishing will be unblocked when the demand is communicated by the new Rx-Java subscriber.
requestedResponses.set(0);

// Clear the queue, so that the publisher repopulates the queue based on sequence number from subscriber.
getRecordsResultQueue.clear();

// Give the drain control to publisher/demand-notifier thread.
giveDrainingControlToPublisherOrDemandNotifier();

prefetchCounters.reset();

highestSequenceNumber = prefetchRecordsRetrieved.lastBatchSequenceNumber();
dataFetcher.resetIterator(prefetchRecordsRetrieved.shardIterator(), highestSequenceNumber,
initialPositionInStreamExtended);
publisherSession.reset((PrefetchRecordsRetrieved)recordsRetrieved);
wasReset = true;
} finally {
resetLock.writeLock().unlock();
Expand All @@ -243,13 +260,13 @@ public void subscribe(Subscriber<? super RecordsRetrieved> s) {
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
requestedResponses.addAndGet(n);
drainQueueForRequestsIfAllowed();
publisherSession.requestedResponses().addAndGet(n);
drainQueueForRequests();
}

@Override
public void cancel() {
requestedResponses.set(0);
publisherSession.requestedResponses().set(0);
}
});
}
Expand All @@ -260,14 +277,9 @@ public synchronized void notify(RecordsDeliveryAck recordsDeliveryAck) {
// Verify if the ack matches the head of the queue and evict it.
if (recordsToCheck != null && recordsToCheck.batchUniqueIdentifier()
.equals(recordsDeliveryAck.batchUniqueIdentifier())) {
pollNextResultAndUpdatePrefetchCounters();
evictPublishedEvent();
// Upon evicting, check if queue is empty. if yes, then give the drain control back to publisher thread.
if (getRecordsResultQueue.isEmpty()) {
giveDrainingControlToPublisherOrDemandNotifier();
} else {
// Else attempt to drain the queue.
drainQueueForRequests();
}
drainQueueForRequests();
} else {
// Log and ignore any other ack received. As long as an ack is received for head of the queue
// we are good. Any stale or future ack received can be ignored, though the latter is not feasible
Expand All @@ -284,7 +296,7 @@ public synchronized void notify(RecordsDeliveryAck recordsDeliveryAck) {
// Note : Do not make this method synchronous as notify() will not be able to evict any entry from the queue.
private void addArrivedRecordsInput(PrefetchRecordsRetrieved recordsRetrieved) throws InterruptedException {
wasReset = false;
while (!getRecordsResultQueue.offer(recordsRetrieved, idleMillisBetweenCalls, TimeUnit.MILLISECONDS)) {
while (!publisherSession.prefetchRecordsQueue().offer(recordsRetrieved, idleMillisBetweenCalls, TimeUnit.MILLISECONDS)) {
//
// Unlocking the read lock, and then reacquiring the read lock, should allow any waiters on the write lock a
// chance to run. If the write lock is acquired by restartFrom than the readLock will now block until
Expand All @@ -297,51 +309,25 @@ private void addArrivedRecordsInput(PrefetchRecordsRetrieved recordsRetrieved) t
throw new PositionResetException();
}
}
prefetchCounters.added(recordsRetrieved.processRecordsInput);
}

/**
* Method that will be called by the 'publisher thread' and the 'demand notifying thread',
* to drain the events if the 'event notifying thread' do not have the control.
*/
private synchronized void drainQueueForRequestsIfAllowed() {
if (!shouldDrainEventOnlyOnAck.get()) {
drainQueueForRequests();
}
publisherSession.prefetchCounters().added(recordsRetrieved.processRecordsInput);
}

/**
* Method to drain the queue based on the demand and the events availability in the queue.
*/
private synchronized void drainQueueForRequests() {
final RecordsRetrieved recordsToDeliver = peekNextResult();
final PrefetchRecordsRetrieved recordsToDeliver = (PrefetchRecordsRetrieved) peekNextResult();
// If there is an event available to drain and if there is at least one demand,
// then schedule it for delivery
if (requestedResponses.get() > 0 && recordsToDeliver != null) {
subscriber.onNext(recordsToDeliver);
if (publisherSession.hasDemandToPublish() && canDispatchRecord(recordsToDeliver)) {
subscriber.onNext(recordsToDeliver.prepareForPublish());
recordsToDeliver.dispatched();
lastEventDeliveryTime = Instant.now();
giveDrainingControlToEventNotifier();
} else {
// Since we haven't scheduled the event delivery, give the drain control back to publisher/demand-notifier
// thread.
giveDrainingControlToPublisherOrDemandNotifier();
}
}

private void giveDrainingControlToEventNotifier() {
if (!shouldDrainEventOnlyOnAck.get()) {
log.debug("{} : Notifier thread takes over the draining control. Queue Size : {}, Demand : {}", shardId,
getRecordsResultQueue.size(), requestedResponses.get());
shouldDrainEventOnlyOnAck.set(true);
}
}

private void giveDrainingControlToPublisherOrDemandNotifier() {
if (shouldDrainEventOnlyOnAck.get()) {
log.debug("{} : Publisher thread takes over the draining control. Queue Size : {}, Demand : {}",
shardId, getRecordsResultQueue.size(), requestedResponses.get());
shouldDrainEventOnlyOnAck.set(false);
}
private static boolean canDispatchRecord(PrefetchRecordsRetrieved recordsToDeliver) {
return recordsToDeliver != null && !recordsToDeliver.isDispatched();
}

@Accessors(fluent = true)
Expand All @@ -352,6 +338,7 @@ static class PrefetchRecordsRetrieved implements RecordsRetrieved {
final String lastBatchSequenceNumber;
final String shardIterator;
final BatchUniqueIdentifier batchUniqueIdentifier;
@Accessors() @Setter(AccessLevel.NONE) boolean dispatched = false;

PrefetchRecordsRetrieved prepareForPublish() {
return new PrefetchRecordsRetrieved(processRecordsInput.toBuilder().cacheExitTime(Instant.now()).build(),
Expand All @@ -363,6 +350,9 @@ public BatchUniqueIdentifier batchUniqueIdentifier() {
return batchUniqueIdentifier;
}

// Indicates if this record batch was already dispatched for delivery.
void dispatched() { dispatched = true; }

/**
* Generate batch unique identifier for PrefetchRecordsRetrieved, where flow will be empty.
* @return BatchUniqueIdentifier
Expand All @@ -371,10 +361,11 @@ public static BatchUniqueIdentifier generateBatchUniqueIdentifier() {
return new BatchUniqueIdentifier(UUID.randomUUID().toString(),
StringUtils.EMPTY);
}

}

private String calculateHighestSequenceNumber(ProcessRecordsInput processRecordsInput) {
String result = this.highestSequenceNumber;
String result = publisherSession.highestSequenceNumber();
if (processRecordsInput.records() != null && !processRecordsInput.records().isEmpty()) {
result = processRecordsInput.records().get(processRecordsInput.records().size() - 1).sequenceNumber();
}
Expand Down Expand Up @@ -413,7 +404,7 @@ public void run() {

private void makeRetrievalAttempt() {
MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, operation);
if (prefetchCounters.shouldGetNewRecords()) {
if (publisherSession.prefetchCounters().shouldGetNewRecords()) {
try {
sleepBeforeNextCall();
GetRecordsResponse getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
Expand All @@ -428,13 +419,12 @@ private void makeRetrievalAttempt() {
.isAtShardEnd(getRecordsRetrievalStrategy.getDataFetcher().isShardEndReached())
.build();

highestSequenceNumber = calculateHighestSequenceNumber(processRecordsInput);
PrefetchRecordsRetrieved recordsRetrieved = new PrefetchRecordsRetrieved(processRecordsInput,
highestSequenceNumber, getRecordsResult.nextShardIterator(),
calculateHighestSequenceNumber(processRecordsInput), getRecordsResult.nextShardIterator(),
PrefetchRecordsRetrieved.generateBatchUniqueIdentifier());
highestSequenceNumber = recordsRetrieved.lastBatchSequenceNumber;
publisherSession.highestSequenceNumber(recordsRetrieved.lastBatchSequenceNumber);
addArrivedRecordsInput(recordsRetrieved);
drainQueueForRequestsIfAllowed();
drainQueueForRequests();
} catch (PositionResetException pse) {
throw pse;
} catch (RetryableRetrievalException rre) {
Expand All @@ -447,7 +437,7 @@ private void makeRetrievalAttempt() {

scope.addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.COUNT, MetricsLevel.SUMMARY);

dataFetcher.restartIterator();
publisherSession.dataFetcher().restartIterator();
} catch (SdkException e) {
log.error("{} : Exception thrown while fetching records from Kinesis", shardId, e);
} catch (Throwable e) {
Expand All @@ -463,7 +453,7 @@ private void makeRetrievalAttempt() {
// Consumer isn't ready to receive new records will allow prefetch counters to pause
//
try {
prefetchCounters.waitForConsumer();
publisherSession.prefetchCounters().waitForConsumer();
} catch (InterruptedException ie) {
log.info("{} : Thread was interrupted while waiting for the consumer. " +
"Shutdown has probably been started", shardId);
Expand Down Expand Up @@ -532,7 +522,7 @@ void reset() {

@Override
public String toString() {
return String.format("{ Requests: %d, Records: %d, Bytes: %d }", getRecordsResultQueue.size(), size,
return String.format("{ Requests: %d, Records: %d, Bytes: %d }", publisherSession.prefetchRecordsQueue().size(), size,
byteSize);
}
}
Expand Down
Loading

0 comments on commit f008f5c

Please sign in to comment.