Skip to content

Commit

Permalink
Handle custom metric scope (#235)
Browse files Browse the repository at this point in the history
* integrated prefetch with shardconsumer

* fixed tests

* added fatory methods

* added tests and fixed broken tests

* Resolved conflicts

* Addressed comments

* Integrated the changes

* Handle Custom Metric Scope

* emit metric

* Addressed comments

* Passed the operation by caller

* Get rid of sysout

* Added set metrics to InitializeTask

* Addressed comments

* Addressed Comments

* Addressed comments

* Addressed comment
  • Loading branch information
BtXin authored and sahilpalvia committed Oct 9, 2017
1 parent 4d35210 commit 3b89b56
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@

import com.amazonaws.SdkClientException;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.model.GetRecordsResult;

import lombok.NonNull;
import lombok.extern.apachecommons.CommonsLog;
import org.apache.commons.lang.Validate;

/**
* This is the prefetch caching class, this class spins up a thread if prefetching is enabled. That thread fetches the
Expand All @@ -44,13 +48,13 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
private final int maxRecordsPerCall;
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private final ExecutorService executorService;
private final IMetricsFactory metricsFactory;
private final long idleMillisBetweenCalls;
private Instant lastSuccessfulCall;
private final DefaultGetRecordsCacheDaemon defaultGetRecordsCacheDaemon;

private PrefetchCounters prefetchCounters;

private boolean started = false;
private final String operation;

/**
* Constructor for the PrefetchGetRecordsCache. This cache prefetches records from Kinesis and stores them in a
Expand All @@ -71,7 +75,9 @@ public PrefetchGetRecordsCache(final int maxPendingProcessRecordsInput, final in
final int maxRecordsPerCall,
@NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
@NonNull final ExecutorService executorService,
long idleMillisBetweenCalls) {
long idleMillisBetweenCalls,
@NonNull final IMetricsFactory metricsFactory,
@NonNull String operation) {
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
this.maxRecordsPerCall = maxRecordsPerCall;
this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput;
Expand All @@ -80,8 +86,11 @@ public PrefetchGetRecordsCache(final int maxPendingProcessRecordsInput, final in
this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxPendingProcessRecordsInput);
this.prefetchCounters = new PrefetchCounters();
this.executorService = executorService;
this.metricsFactory = new ThreadSafeMetricsDelegatingFactory(metricsFactory);
this.idleMillisBetweenCalls = idleMillisBetweenCalls;
this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon();
Validate.notEmpty(operation, "Operation cannot be empty");
this.operation = operation;
}

@Override
Expand Down Expand Up @@ -138,6 +147,7 @@ public void run() {
log.warn("Prefetch thread was interrupted.");
break;
}
MetricsHelper.startScope(metricsFactory, operation);
if (prefetchCounters.shouldGetNewRecords()) {
try {
sleepBeforeNextCall();
Expand All @@ -158,6 +168,8 @@ public void run() {
" Please search for the exception/error online to check what is going on. If the " +
"issue persists or is a recurring problem, feel free to open an issue on, " +
"https://github.com/awslabs/amazon-kinesis-client.", e);
} finally {
MetricsHelper.endScope();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ public TaskResult call() {
scope.addDimension(MetricsHelper.SHARD_ID_DIMENSION_NAME, shardInfo.getShardId());
scope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.Count, MetricsLevel.SUMMARY);
scope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.Bytes, MetricsLevel.SUMMARY);

Exception exception = null;

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;

/**
* This factory is used to create the records fetcher to retrieve data from Kinesis for a given shard.
*/
Expand All @@ -23,10 +25,11 @@ public interface RecordsFetcherFactory {
*
* @param getRecordsRetrievalStrategy GetRecordsRetrievalStrategy to be used with the GetRecordsCache
* @param shardId ShardId of the shard that the fetcher will retrieve records for
* @param metricsFactory MetricsFactory used to create metricScope
*
* @return GetRecordsCache used to get records from Kinesis.
*/
GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId);
GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory);

/**
* Sets the maximum number of ProcessRecordsInput objects the GetRecordsCache can hold, before further requests are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher
this.dataFetcher = kinesisDataFetcher;
this.getRecordsCache = config.getRecordsFetcherFactory().createRecordsFetcher(
makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo),
this.getShardInfo().getShardId());
this.getShardInfo().getShardId(), this.metricsFactory);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

import java.util.concurrent.Executors;

import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import lombok.extern.apachecommons.CommonsLog;

@CommonsLog
Expand All @@ -28,13 +28,14 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
private int maxRecordsCount = 30000;
private long idleMillisBetweenCalls = 1500L;
private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT;
private IMetricsFactory metricsFactory;

public SimpleRecordsFetcherFactory(int maxRecords) {
this.maxRecords = maxRecords;
}

@Override
public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId) {
public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory) {
if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) {
return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy, idleMillisBetweenCalls);
} else {
Expand All @@ -44,7 +45,9 @@ public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecor
.setDaemon(true)
.setNameFormat("prefetch-cache-" + shardId + "-%04d")
.build()),
idleMillisBetweenCalls);
idleMillisBetweenCalls,
metricsFactory,
"ProcessTask");
}
}

Expand All @@ -68,7 +71,6 @@ public void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy){
this.dataFetchingStrategy = dataFetchingStrategy;
}

@Override
public void setIdleMillisBetweenCalls(final long idleMillisBetweenCalls) {
this.idleMillisBetweenCalls = idleMillisBetweenCalls;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
import com.amazonaws.services.kinesis.model.Record;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -62,6 +66,7 @@ public class PrefetchGetRecordsCacheIntegrationTest {
private KinesisDataFetcher dataFetcher;
private ExecutorService executorService;
private List<Record> records;
private String operation = "ProcessTask";

@Mock
private IKinesisProxy proxy;
Expand All @@ -82,7 +87,9 @@ public void setup() {
MAX_RECORDS_PER_CALL,
getRecordsRetrievalStrategy,
executorService,
IDLE_MILLIS_BETWEEN_CALLS);
IDLE_MILLIS_BETWEEN_CALLS,
new NullMetricsFactory(),
operation);
}

@Test
Expand Down Expand Up @@ -126,7 +133,9 @@ public void testDifferentShardCaches() {
MAX_RECORDS_PER_CALL,
getRecordsRetrievalStrategy2,
executorService2,
IDLE_MILLIS_BETWEEN_CALLS);
IDLE_MILLIS_BETWEEN_CALLS,
new NullMetricsFactory(),
operation);

getRecordsCache.start();
sleep(IDLE_MILLIS_BETWEEN_CALLS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.IntStream;

import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class PrefetchGetRecordsCacheTest {
private ExecutorService executorService;
private LinkedBlockingQueue<ProcessRecordsInput> spyQueue;
private PrefetchGetRecordsCache getRecordsCache;
private String operation = "ProcessTask";

@Before
public void setup() {
Expand All @@ -81,7 +83,9 @@ public void setup() {
MAX_RECORDS_PER_CALL,
getRecordsRetrievalStrategy,
executorService,
IDLE_MILLIS_BETWEEN_CALLS);
IDLE_MILLIS_BETWEEN_CALLS,
new NullMetricsFactory(),
operation);
spyQueue = spy(getRecordsCache.getRecordsResultQueue);
records = spy(new ArrayList<>());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;

Expand All @@ -15,6 +17,9 @@ public class RecordsFetcherFactoryTest {
@Mock
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;

@Mock
private IMetricsFactory metricsFactory;

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
Expand All @@ -23,14 +28,16 @@ public void setUp() {

@Test
public void createDefaultRecordsFetcherTest() {
GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId);
GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId,
metricsFactory);
assertThat(recordsCache, instanceOf(BlockingGetRecordsCache.class));
}

@Test
public void createPrefetchRecordsFetcherTest() {
recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.PREFETCH_CACHED);
GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId);
GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId,
metricsFactory);
assertThat(recordsCache, instanceOf(PrefetchGetRecordsCache.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ public final void testConsumeShard() throws Exception {
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords,
new SynchronousGetRecordsRetrievalStrategy(dataFetcher),
0L));
when(recordsFetcherFactory.createRecordsFetcher(any(), anyString())).thenReturn(getRecordsCache);
when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache);

ShardConsumer consumer =
new ShardConsumer(shardInfo,
Expand Down Expand Up @@ -471,7 +471,7 @@ public final void testConsumeShardWithInitialPositionAtTimestamp() throws Except
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords,
new SynchronousGetRecordsRetrievalStrategy(dataFetcher),
0L));
when(recordsFetcherFactory.createRecordsFetcher(any(), anyString())).thenReturn(getRecordsCache);
when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache);

ShardConsumer consumer =
new ShardConsumer(shardInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
RecordsFetcherFactory recordsFetcherFactory = mock(RecordsFetcherFactory.class);
GetRecordsCache getRecordsCache = mock(GetRecordsCache.class);
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
when(recordsFetcherFactory.createRecordsFetcher(any(), anyString())).thenReturn(getRecordsCache);
when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache);
when(getRecordsCache.getNextResult()).thenReturn(new ProcessRecordsInput().withRecords(Collections.emptyList()).withMillisBehindLatest(0L));

WorkerThread workerThread = runWorker(shardList,
Expand Down

0 comments on commit 3b89b56

Please sign in to comment.