Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for regression cause due to sleep in BlockingGetRecordsCache #256

Merged
merged 1 commit into from
Oct 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,11 @@
public class BlockingGetRecordsCache implements GetRecordsCache {
private final int maxRecordsPerCall;
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private final long idleMillisBetweenCalls;
private Instant lastSuccessfulCall;

public BlockingGetRecordsCache(final int maxRecordsPerCall,
final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
final long idleMillisBetweenCalls) {
final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
this.maxRecordsPerCall = maxRecordsPerCall;
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
this.idleMillisBetweenCalls = idleMillisBetweenCalls;
}

@Override
Expand All @@ -51,33 +47,12 @@ public void start() {

@Override
public ProcessRecordsInput getNextResult() {
sleepBeforeNextCall();
GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
lastSuccessfulCall = Instant.now();
ProcessRecordsInput processRecordsInput = new ProcessRecordsInput()
return new ProcessRecordsInput()
.withRecords(getRecordsResult.getRecords())
.withMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
return processRecordsInput;
}

private void sleepBeforeNextCall() {
if (!Thread.interrupted()) {
if (lastSuccessfulCall == null) {
return;
}
long timeSinceLastCall = Duration.between(lastSuccessfulCall, Instant.now()).abs().toMillis();
if (timeSinceLastCall < idleMillisBetweenCalls) {
try {
Thread.sleep(idleMillisBetweenCalls - timeSinceLastCall);
} catch (InterruptedException e) {
log.info("Thread was interrupted, indicating that shutdown was called.");
}
}
} else {
log.info("Thread has been interrupted, indicating that it is in the shutdown phase.");
}
}

@Override
public GetRecordsRetrievalStrategy getGetRecordsRetrievalStrategy() {
return getRecordsRetrievalStrategy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
private int maxRecordsCount = 30000;
private long idleMillisBetweenCalls = 1500L;
private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT;
private IMetricsFactory metricsFactory;

public SimpleRecordsFetcherFactory(int maxRecords) {
this.maxRecords = maxRecords;
Expand All @@ -37,7 +36,7 @@ public SimpleRecordsFetcherFactory(int maxRecords) {
@Override
public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory) {
if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) {
return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy, idleMillisBetweenCalls);
return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy);
} else {
return new PrefetchGetRecordsCache(maxPendingProcessRecordsInput, maxByteSize, maxRecordsCount, maxRecords,
getRecordsRetrievalStrategy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
@RunWith(MockitoJUnitRunner.class)
public class BlockingGetRecordsCacheTest {
private static final int MAX_RECORDS_PER_COUNT = 10_000;
private static final long IDLE_MILLIS_BETWEEN_CALLS = 500L;

@Mock
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
Expand All @@ -53,7 +52,7 @@ public class BlockingGetRecordsCacheTest {
@Before
public void setup() {
records = new ArrayList<>();
blockingGetRecordsCache = new BlockingGetRecordsCache(MAX_RECORDS_PER_COUNT, getRecordsRetrievalStrategy, IDLE_MILLIS_BETWEEN_CALLS);
blockingGetRecordsCache = new BlockingGetRecordsCache(MAX_RECORDS_PER_COUNT, getRecordsRetrievalStrategy);

when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_COUNT))).thenReturn(getRecordsResult);
when(getRecordsResult.getRecords()).thenReturn(records);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,7 @@ public final void testConsumeShard() throws Exception {
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);

getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords,
new SynchronousGetRecordsRetrievalStrategy(dataFetcher),
0L));
new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));
when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache);

ShardConsumer consumer =
Expand Down Expand Up @@ -469,8 +468,7 @@ public final void testConsumeShardWithInitialPositionAtTimestamp() throws Except
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);

getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords,
new SynchronousGetRecordsRetrievalStrategy(dataFetcher),
0L));
new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));
when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache);

ShardConsumer consumer =
Expand Down