From 222bcdaf3f4c5873506d45a2809d5fc88db57f3e Mon Sep 17 00:00:00 2001 From: parijatsinha Date: Mon, 15 Jan 2018 14:31:04 -0800 Subject: [PATCH] Adding capability to set KinesisProxy using Worker.Builder (#274) * Added IKinesisProxy injector in Worker.Builder to allow injecting custom proxy implementations * Added unit tests for IKinesisProxy injection in Worker Builder * Revert "Added unit tests for IKinesisProxy injection in Worker Builder" This reverts commit aa944c17061b1506c5c55cf3932857b6f6086049. Reverting to undo changes to import ordering. * Added unit tests for IKinesisProxy injection in Worker Builder Re-added unit tests after reverting changes to import ordering. * Revert "Added unit tests for IKinesisProxy injection in Worker Builder" This reverts commit 91e445774beda2097788da7ffc09d04c03314a43. Reverting to refactor unit tests. * Added unit tests for Worker Builder IKinesisProxy injection validation Refactored unit tests as per comments in the pull request. * Added debug logs in KinesisLocalFileDataCreator * Revert "Added debug logs in KinesisLocalFileDataCreator" This reverts commit 1ff00d0b01d6d95a02b7ae67e542977a75b6e307. * Edited JavaDoc for Worker Builder kinesisProxy --- .../clientlibrary/lib/worker/Worker.java | 28 +++++++++++++++++-- .../clientlibrary/lib/worker/WorkerTest.java | 26 +++++++++++++++++ 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index c0f413be9..87d5cdb15 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -996,6 +997,11 @@ public Worker( metricsFactory, execService); } + @VisibleForTesting + StreamConfig getStreamConfig() { + return streamConfig; + } + /** * Given configuration, returns appropriate metrics factory. * @@ -1073,6 +1079,7 @@ public static class Builder { private IMetricsFactory metricsFactory; private ExecutorService execService; private ShardPrioritization shardPrioritization; + private IKinesisProxy kinesisProxy; /** * Default constructor. @@ -1192,6 +1199,19 @@ public Builder shardPrioritization(ShardPrioritization shardPrioritization) { return this; } + /** + * Set KinesisProxy for the worker. + * + * @param kinesisProxy + * Sets an implementation of IKinesisProxy. + * + * @return A reference to this updated object so that method calls can be chained together. + */ + public Builder kinesisProxy(IKinesisProxy kinesisProxy) { + this.kinesisProxy = kinesisProxy; + return this; + } + /** * Build the Worker instance. * @@ -1257,13 +1277,15 @@ public Worker build() { if (shardPrioritization == null) { shardPrioritization = new ParentsFirstShardPrioritization(1); } - + if (kinesisProxy == null) { + kinesisProxy = new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient) + .getProxy(config.getStreamName()); + } return new Worker(config.getApplicationName(), recordProcessorFactory, config, - new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(), - kinesisClient).getProxy(config.getStreamName()), + new StreamConfig(kinesisProxy, config.getMaxRecords(), config.getIdleTimeBetweenReadsInMillis(), config.shouldCallProcessRecordsEvenForEmptyRecordList(), diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index ce406dce1..6cc7ef087 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -90,6 +90,7 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerCWMetricsFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerThreadPoolExecutor; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; +import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; @@ -1474,6 +1475,31 @@ public List answer(InvocationOnMock invocation) throws Throwable { } + @Test + public void testBuilderWithDefaultKinesisProxy() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + Worker worker = new Worker.Builder() + .recordProcessorFactory(recordProcessorFactory) + .config(config) + .build(); + Assert.assertNotNull(worker.getStreamConfig().getStreamProxy()); + Assert.assertTrue(worker.getStreamConfig().getStreamProxy() instanceof KinesisProxy); + } + + @Test + public void testBuilderWhenKinesisProxyIsSet() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + // Create an instance of KinesisLocalFileProxy for injection and validation + IKinesisProxy kinesisProxy = mock(KinesisLocalFileProxy.class); + Worker worker = new Worker.Builder() + .recordProcessorFactory(recordProcessorFactory) + .config(config) + .kinesisProxy(kinesisProxy) + .build(); + Assert.assertNotNull(worker.getStreamConfig().getStreamProxy()); + Assert.assertTrue(worker.getStreamConfig().getStreamProxy() instanceof KinesisLocalFileProxy); + } + private abstract class InjectableWorker extends Worker { InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, StreamConfig streamConfig,