Skip to content

Commit

Permalink
Adding capability to set KinesisProxy using Worker.Builder (#274)
Browse files Browse the repository at this point in the history
* Added IKinesisProxy injector in Worker.Builder to allow injecting custom proxy implementations

* Added unit tests for IKinesisProxy injection in Worker Builder

* Revert "Added unit tests for IKinesisProxy injection in Worker Builder"

This reverts commit aa944c1.
Reverting to undo changes to import ordering.

* Added unit tests for IKinesisProxy injection in Worker Builder

Re-added unit tests after reverting changes to import ordering.

* Revert "Added unit tests for IKinesisProxy injection in Worker Builder"

This reverts commit 91e4457.
Reverting to refactor unit tests.

* Added unit tests for Worker Builder IKinesisProxy injection validation

Refactored unit tests as per comments in the pull request.

* Added debug logs in KinesisLocalFileDataCreator

* Revert "Added debug logs in KinesisLocalFileDataCreator"

This reverts commit 1ff00d0.

* Edited JavaDoc for Worker Builder kinesisProxy
  • Loading branch information
parijatsinha authored and sahilpalvia committed Jan 15, 2018
1 parent 31fd0b5 commit 222bcda
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

Expand Down Expand Up @@ -996,6 +997,11 @@ public Worker(
metricsFactory, execService);
}

@VisibleForTesting
StreamConfig getStreamConfig() {
return streamConfig;
}

/**
* Given configuration, returns appropriate metrics factory.
*
Expand Down Expand Up @@ -1073,6 +1079,7 @@ public static class Builder {
private IMetricsFactory metricsFactory;
private ExecutorService execService;
private ShardPrioritization shardPrioritization;
private IKinesisProxy kinesisProxy;

/**
* Default constructor.
Expand Down Expand Up @@ -1192,6 +1199,19 @@ public Builder shardPrioritization(ShardPrioritization shardPrioritization) {
return this;
}

/**
* Set KinesisProxy for the worker.
*
* @param kinesisProxy
* Sets an implementation of IKinesisProxy.
*
* @return A reference to this updated object so that method calls can be chained together.
*/
public Builder kinesisProxy(IKinesisProxy kinesisProxy) {
this.kinesisProxy = kinesisProxy;
return this;
}

/**
* Build the Worker instance.
*
Expand Down Expand Up @@ -1257,13 +1277,15 @@ public Worker build() {
if (shardPrioritization == null) {
shardPrioritization = new ParentsFirstShardPrioritization(1);
}

if (kinesisProxy == null) {
kinesisProxy = new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient)
.getProxy(config.getStreamName());
}

return new Worker(config.getApplicationName(),
recordProcessorFactory,
config,
new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(),
kinesisClient).getProxy(config.getStreamName()),
new StreamConfig(kinesisProxy,
config.getMaxRecords(),
config.getIdleTimeBetweenReadsInMillis(),
config.shouldCallProcessRecordsEvenForEmptyRecordList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerCWMetricsFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerThreadPoolExecutor;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
Expand Down Expand Up @@ -1474,6 +1475,31 @@ public List<ShardInfo> answer(InvocationOnMock invocation) throws Throwable {

}

@Test
public void testBuilderWithDefaultKinesisProxy() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
Worker worker = new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(config)
.build();
Assert.assertNotNull(worker.getStreamConfig().getStreamProxy());
Assert.assertTrue(worker.getStreamConfig().getStreamProxy() instanceof KinesisProxy);
}

@Test
public void testBuilderWhenKinesisProxyIsSet() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
// Create an instance of KinesisLocalFileProxy for injection and validation
IKinesisProxy kinesisProxy = mock(KinesisLocalFileProxy.class);
Worker worker = new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(config)
.kinesisProxy(kinesisProxy)
.build();
Assert.assertNotNull(worker.getStreamConfig().getStreamProxy());
Assert.assertTrue(worker.getStreamConfig().getStreamProxy() instanceof KinesisLocalFileProxy);
}

private abstract class InjectableWorker extends Worker {
InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, StreamConfig streamConfig,
Expand Down

0 comments on commit 222bcda

Please sign in to comment.