Skip to content

Commit

Permalink
Handle Expired Iterators Correctly
Browse files Browse the repository at this point in the history
Fix for the lease losses in the PrefetchCache and AsyncGetRecordsStrategy caused due to ExpiredIteratorException. (#263)
  • Loading branch information
sahilpalvia authored and pfifer committed Nov 8, 2017
1 parent 3de901e commit 5c3ff2b
Show file tree
Hide file tree
Showing 15 changed files with 248 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
Expand All @@ -31,6 +30,7 @@

import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingScope;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

Expand Down Expand Up @@ -81,33 +81,39 @@ public GetRecordsResult getRecords(final int maxRecords) {
CompletionService<DataFetcherResult> completionService = completionServiceSupplier.get();
Set<Future<DataFetcherResult>> futures = new HashSet<>();
Callable<DataFetcherResult> retrieverCall = createRetrieverCallable(maxRecords);
while (true) {
try {
futures.add(completionService.submit(retrieverCall));
} catch (RejectedExecutionException e) {
log.warn("Out of resources, unable to start additional requests.");
}
try {
while (true) {
try {
futures.add(completionService.submit(retrieverCall));
} catch (RejectedExecutionException e) {
log.warn("Out of resources, unable to start additional requests.");
}

try {
Future<DataFetcherResult> resultFuture = completionService.poll(retryGetRecordsInSeconds,
TimeUnit.SECONDS);
if (resultFuture != null) {
//
// Fix to ensure that we only let the shard iterator advance when we intend to return the result
// to the caller. This ensures that the shard iterator is consistently advance in step with
// what the caller sees.
//
result = resultFuture.get().accept();
try {
Future<DataFetcherResult> resultFuture = completionService.poll(retryGetRecordsInSeconds,
TimeUnit.SECONDS);
if (resultFuture != null) {
//
// Fix to ensure that we only let the shard iterator advance when we intend to return the result
// to the caller. This ensures that the shard iterator is consistently advance in step with
// what the caller sees.
//
result = resultFuture.get().accept();
break;
}
} catch (ExecutionException e) {
if (e.getCause() instanceof ExpiredIteratorException) {
throw (ExpiredIteratorException) e.getCause();
}
log.error("ExecutionException thrown while trying to get records", e);
} catch (InterruptedException e) {
log.error("Thread was interrupted", e);
break;
}
} catch (ExecutionException e) {
log.error("ExecutionException thrown while trying to get records", e);
} catch (InterruptedException e) {
log.error("Thread was interrupted", e);
break;
}
} finally {
futures.forEach(f -> f.cancel(true));
}
futures.forEach(f -> f.cancel(true));
return result;
}

Expand Down Expand Up @@ -140,4 +146,9 @@ private static ExecutorService buildExector(int maxGetRecordsThreadPool, String
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadNameFormat).build(),
new ThreadPoolExecutor.AbortPolicy());
}

@Override
public KinesisDataFetcher getDataFetcher() {
return dataFetcher;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,11 @@ public interface GetRecordsRetrievalStrategy {
* @return true if the strategy has been shutdown, false otherwise.
*/
boolean isShutdown();

/**
* Returns the KinesisDataFetcher used to getRecords from Kinesis.
*
* @return KinesisDataFetcher
*/
KinesisDataFetcher getDataFetcher();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Collections;
import java.util.Date;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

Expand All @@ -27,6 +28,8 @@
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.util.CollectionUtils;
import com.google.common.collect.Iterables;

import lombok.Data;

Expand All @@ -42,6 +45,8 @@ class KinesisDataFetcher {
private final String shardId;
private boolean isShardEndReached;
private boolean isInitialized;
private String lastKnownSequenceNumber;
private InitialPositionInStreamExtended initialPositionInStream;

/**
*
Expand Down Expand Up @@ -108,6 +113,9 @@ public GetRecordsResult getResult() {
@Override
public GetRecordsResult accept() {
nextIterator = result.getNextShardIterator();
if (!CollectionUtils.isNullOrEmpty(result.getRecords())) {
lastKnownSequenceNumber = Iterables.getLast(result.getRecords()).getSequenceNumber();
}
if (nextIterator == null) {
isShardEndReached = true;
}
Expand Down Expand Up @@ -161,6 +169,8 @@ void advanceIteratorTo(String sequenceNumber, InitialPositionInStreamExtended in
if (nextIterator == null) {
isShardEndReached = true;
}
this.lastKnownSequenceNumber = sequenceNumber;
this.initialPositionInStream = initialPositionInStream;
}

/**
Expand Down Expand Up @@ -217,6 +227,17 @@ private String getIterator(Date timestamp) {
return iterator;
}

/**
* Gets a new iterator from the last known sequence number i.e. the sequence number of the last record from the last
* getRecords call.
*/
public void restartIterator() {
if (StringUtils.isEmpty(lastKnownSequenceNumber) || initialPositionInStream == null) {
throw new IllegalStateException("Make sure to initialize the KinesisDataFetcher before restarting the iterator.");
}
advanceIteratorTo(lastKnownSequenceNumber, initialPositionInStream);
}

/**
* @return the shardEndReached
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
import org.apache.commons.lang.Validate;

import com.amazonaws.SdkClientException;
import com.amazonaws.services.cloudwatch.model.StandardUnit;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult;

import lombok.NonNull;
Expand All @@ -42,6 +45,7 @@
*/
@CommonsLog
public class PrefetchGetRecordsCache implements GetRecordsCache {
private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
LinkedBlockingQueue<ProcessRecordsInput> getRecordsResultQueue;
private int maxPendingProcessRecordsInput;
private int maxByteSize;
Expand All @@ -56,6 +60,8 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
private PrefetchCounters prefetchCounters;
private boolean started = false;
private final String operation;
private final KinesisDataFetcher dataFetcher;
private final String shardId;

/**
* Constructor for the PrefetchGetRecordsCache. This cache prefetches records from Kinesis and stores them in a
Expand All @@ -76,9 +82,10 @@ public PrefetchGetRecordsCache(final int maxPendingProcessRecordsInput, final in
final int maxRecordsPerCall,
@NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
@NonNull final ExecutorService executorService,
long idleMillisBetweenCalls,
final long idleMillisBetweenCalls,
@NonNull final IMetricsFactory metricsFactory,
@NonNull String operation) {
@NonNull final String operation,
@NonNull final String shardId) {
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
this.maxRecordsPerCall = maxRecordsPerCall;
this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput;
Expand All @@ -92,6 +99,8 @@ public PrefetchGetRecordsCache(final int maxPendingProcessRecordsInput, final in
this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon();
Validate.notEmpty(operation, "Operation cannot be empty");
this.operation = operation;
this.dataFetcher = this.getRecordsRetrievalStrategy.getDataFetcher();
this.shardId = shardId;
}

@Override
Expand Down Expand Up @@ -162,6 +171,14 @@ public void run() {
prefetchCounters.added(processRecordsInput);
} catch (InterruptedException e) {
log.info("Thread was interrupted, indicating shutdown was called on the cache.");
} catch (ExpiredIteratorException e) {
log.info(String.format("ShardId %s: getRecords threw ExpiredIteratorException - restarting"
+ " after greatest seqNum passed to customer", shardId), e);

MetricsHelper.getMetricsScope().addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.Count,
MetricsLevel.SUMMARY);

dataFetcher.restartIterator();
} catch (SdkClientException e) {
log.error("Exception thrown while fetching records from Kinesis", e);
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public interface RecordsFetcherFactory {
*
* @return GetRecordsCache used to get records from Kinesis.
*/
GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory);
GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId,
IMetricsFactory metricsFactory);

/**
* Sets the maximum number of ProcessRecordsInput objects the GetRecordsCache can hold, before further requests are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import lombok.extern.apachecommons.CommonsLog;

@CommonsLog
Expand All @@ -34,7 +35,8 @@ public SimpleRecordsFetcherFactory(int maxRecords) {
}

@Override
public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory) {
public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId,
IMetricsFactory metricsFactory) {
if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) {
return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy);
} else {
Expand All @@ -46,7 +48,8 @@ public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecor
.build()),
idleMillisBetweenCalls,
metricsFactory,
"ProcessTask");
"ProcessTask",
shardId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,9 @@ public void shutdown() {
public boolean isShutdown() {
return false;
}

@Override
public KinesisDataFetcher getDataFetcher() {
return dataFetcher;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,21 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;

import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.mockito.stubbing.Answer;

@RunWith(MockitoJUnitRunner.class)
public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
Expand Down Expand Up @@ -133,6 +137,24 @@ public void testInterrupted() throws InterruptedException, ExecutionException {
verify(mockFuture).get();
assertNull(getRecordsResult);
}

@Test (expected = ExpiredIteratorException.class)
public void testExpiredIteratorExcpetion() throws InterruptedException {
when(dataFetcher.getRecords(eq(numberOfRecords))).thenAnswer(new Answer<DataFetcherResult>() {
@Override
public DataFetcherResult answer(final InvocationOnMock invocationOnMock) throws Throwable {
Thread.sleep(SLEEP_GET_RECORDS_IN_SECONDS * 1000);
throw new ExpiredIteratorException("ExpiredIterator");
}
});

try {
getRecordsRetrivalStrategy.getRecords(numberOfRecords);
} finally {
verify(dataFetcher, atLeast(getLeastNumberOfCalls())).getRecords(eq(numberOfRecords));
verify(executorService, atLeast(getLeastNumberOfCalls())).execute(any());
}
}

private int getLeastNumberOfCalls() {
int leastNumberOfCalls = 0;
Expand Down Expand Up @@ -163,6 +185,7 @@ public DataFetcherResult getRecords(final int maxRecords) {
} catch (InterruptedException e) {
// Do nothing
}

return result;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,25 @@
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
Expand Down Expand Up @@ -153,5 +158,27 @@ public void testPoolOutOfResources() throws Exception {

assertThat(actualResult, equalTo(expectedResults));
}

@Test (expected = ExpiredIteratorException.class)
public void testExpiredIteratorExceptionCase() throws Exception {
AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher,
executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID);
Future<DataFetcherResult> successfulFuture2 = mock(Future.class);

when(executorService.isShutdown()).thenReturn(false);
when(completionService.submit(any())).thenReturn(successfulFuture, successfulFuture2);
when(completionService.poll(anyLong(), any())).thenReturn(null).thenReturn(successfulFuture);
when(successfulFuture.get()).thenThrow(new ExecutionException(new ExpiredIteratorException("ExpiredException")));

try {
strategy.getRecords(10);
} finally {
verify(executorService).isShutdown();
verify(completionService, times(2)).submit(any());
verify(completionService, times(2)).poll(eq(RETRY_GET_RECORDS_IN_SECONDS), eq(TimeUnit.SECONDS));
verify(successfulFuture).cancel(eq(true));
verify(successfulFuture2).cancel(eq(true));
}
}

}
Loading

0 comments on commit 5c3ff2b

Please sign in to comment.