-
Notifications
You must be signed in to change notification settings - Fork 467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Run multiple instance of scheduler on one JVM #395
Changes from 5 commits
ea999c5
ab29cd2
9fa3fe9
b598d44
474209c
0a6a333
300e480
a305e34
227c96f
4b60aab
5073d25
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,8 +50,24 @@ public class ShardSyncTaskManager { | |
@NonNull | ||
private final ExecutorService executorService; | ||
@NonNull | ||
private final HierarchichalShardSyncer hierarchichalShardSyncer; | ||
@NonNull | ||
private final MetricsFactory metricsFactory; | ||
|
||
public ShardSyncTaskManager(ShardDetector shardDetector, LeaseRefresher leaseRefresher, InitialPositionInStreamExtended initialPositionInStream, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you change the order of MetricsFactory and ShardSyncer. We have maintained MetricsFactory as the last parameter of the constructors. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Modified There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you create the deprecated constructor too, with the default being HierarchicalShardSyncer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
boolean cleanupLeasesUponShardCompletion, boolean ignoreUnexpectedChildShards, long shardSyncIdleTimeMillis, | ||
ExecutorService executorService, HierarchichalShardSyncer hierarchichalShardSyncer, MetricsFactory metricsFactory) { | ||
this.shardDetector = shardDetector; | ||
this.leaseRefresher = leaseRefresher; | ||
this.initialPositionInStream = initialPositionInStream; | ||
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion; | ||
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; | ||
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis; | ||
this.executorService = executorService; | ||
this.hierarchichalShardSyncer = hierarchichalShardSyncer; | ||
this.metricsFactory = metricsFactory; | ||
} | ||
|
||
private ConsumerTask currentTask; | ||
private Future<TaskResult> future; | ||
|
||
|
@@ -81,7 +97,7 @@ private synchronized boolean checkAndSubmitNextTask() { | |
initialPositionInStream, | ||
cleanupLeasesUponShardCompletion, | ||
ignoreUnexpectedChildShards, | ||
shardSyncIdleTimeMillis, | ||
shardSyncIdleTimeMillis, hierarchichalShardSyncer, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you follow the formatting style for the file and include the parameter on a new line. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed |
||
metricsFactory), | ||
metricsFactory); | ||
future = executorService.submit(currentTask); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ | |
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; | ||
import software.amazon.kinesis.annotations.KinesisClientInternalApi; | ||
import software.amazon.kinesis.common.InitialPositionInStreamExtended; | ||
import software.amazon.kinesis.leases.HierarchichalShardSyncer; | ||
import software.amazon.kinesis.leases.KinesisShardDetector; | ||
import software.amazon.kinesis.leases.LeaseCoordinator; | ||
import software.amazon.kinesis.leases.LeaseManagementFactory; | ||
|
@@ -50,6 +51,9 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { | |
private final ExecutorService executorService; | ||
@NonNull | ||
private final InitialPositionInStreamExtended initialPositionInStream; | ||
@NonNull | ||
private final HierarchichalShardSyncer hierarchichalShardSyncer; | ||
|
||
private final long failoverTimeMillis; | ||
private final long epsilonMillis; | ||
private final int maxLeasesForWorker; | ||
|
@@ -116,6 +120,8 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi | |
/** | ||
* Constructor. | ||
* | ||
* <p>NOTE: This constructor is deprecated and will be removed in a future release.</p> | ||
* | ||
* @param kinesisClient | ||
* @param streamName | ||
* @param dynamoDBClient | ||
|
@@ -140,6 +146,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi | |
* @param initialLeaseTableReadCapacity | ||
* @param initialLeaseTableWriteCapacity | ||
*/ | ||
@Deprecated | ||
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, | ||
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, | ||
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream, | ||
|
@@ -150,6 +157,54 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi | |
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, | ||
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, | ||
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity) { | ||
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService, | ||
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, | ||
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, | ||
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, | ||
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, | ||
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, | ||
new HierarchichalShardSyncer()); | ||
} | ||
|
||
/** | ||
* Constructor. | ||
* | ||
* @param kinesisClient | ||
* @param streamName | ||
* @param dynamoDBClient | ||
* @param tableName | ||
* @param workerIdentifier | ||
* @param executorService | ||
* @param initialPositionInStream | ||
* @param failoverTimeMillis | ||
* @param epsilonMillis | ||
* @param maxLeasesForWorker | ||
* @param maxLeasesToStealAtOneTime | ||
* @param maxLeaseRenewalThreads | ||
* @param cleanupLeasesUponShardCompletion | ||
* @param ignoreUnexpectedChildShards | ||
* @param shardSyncIntervalMillis | ||
* @param consistentReads | ||
* @param listShardsBackoffTimeMillis | ||
* @param maxListShardsRetryAttempts | ||
* @param maxCacheMissesBeforeReload | ||
* @param listShardsCacheAllowedAgeInSeconds | ||
* @param cacheMissWarningModulus | ||
* @param initialLeaseTableReadCapacity | ||
* @param initialLeaseTableWriteCapacity | ||
* @param hierarchichalShardSyncer | ||
*/ | ||
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you make it use the formatter on this change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, | ||
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream, | ||
final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker, | ||
final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, | ||
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, | ||
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, | ||
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, | ||
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, | ||
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, | ||
final HierarchichalShardSyncer hierarchichalShardSyncer) { | ||
this.kinesisClient = kinesisClient; | ||
this.streamName = streamName; | ||
this.dynamoDBClient = dynamoDBClient; | ||
|
@@ -173,6 +228,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi | |
this.cacheMissWarningModulus = cacheMissWarningModulus; | ||
this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity; | ||
this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity; | ||
this.hierarchichalShardSyncer = hierarchichalShardSyncer; | ||
} | ||
|
||
@Override | ||
|
@@ -198,6 +254,7 @@ public ShardSyncTaskManager createShardSyncTaskManager(@NonNull final MetricsFac | |
ignoreUnexpectedChildShards, | ||
shardSyncIntervalMillis, | ||
executorService, | ||
hierarchichalShardSyncer, | ||
metricsFactory); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
package software.amazon.kinesis.leases.exceptions; | ||
|
||
import lombok.NonNull; | ||
import software.amazon.kinesis.common.InitialPositionInStreamExtended; | ||
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; | ||
import software.amazon.kinesis.leases.HierarchichalShardSyncer; | ||
import software.amazon.kinesis.leases.LeaseRefresher; | ||
import software.amazon.kinesis.leases.ShardDetector; | ||
import software.amazon.kinesis.metrics.MetricsScope; | ||
|
||
/** | ||
* This class is deprecated | ||
* Helper class to sync leases with shards of the Kinesis stream. | ||
*/ | ||
@Deprecated | ||
public class ShardSyncer { | ||
private static final HierarchichalShardSyncer HIERARCHICHAL_SHARD_SYNCER = new HierarchichalShardSyncer(); | ||
|
||
/** | ||
* This class is deprecated | ||
* Class level synchronization | ||
* | ||
* @param shardDetector | ||
* @param leaseRefresher | ||
* @param initialPosition | ||
* @param cleanupLeasesOfCompletedShards | ||
* @param ignoreUnexpectedChildShards | ||
* @param scope | ||
* @throws DependencyException | ||
* @throws InvalidStateException | ||
* @throws ProvisionedThroughputException | ||
* @throws KinesisClientLibIOException | ||
*/ | ||
@Deprecated | ||
public static synchronized void checkAndCreateLeasesForNewShards(@NonNull final ShardDetector shardDetector, | ||
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, | ||
final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, | ||
final MetricsScope scope) throws DependencyException, InvalidStateException, | ||
ProvisionedThroughputException, KinesisClientLibIOException { | ||
HIERARCHICHAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards( | ||
shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Misspelled Hierarchical.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ye.. Fixed