-
Notifications
You must be signed in to change notification settings - Fork 467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Run multiple instance of scheduler on one JVM #395
Changes from 2 commits
ea999c5
ab29cd2
9fa3fe9
b598d44
474209c
0a6a333
300e480
a305e34
227c96f
4b60aab
5073d25
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,6 +51,22 @@ public class ShardSyncTaskManager { | |
private final ExecutorService executorService; | ||
@NonNull | ||
private final MetricsFactory metricsFactory; | ||
@NonNull | ||
private final ShardSyncer shardSyncer; | ||
|
||
public ShardSyncTaskManager(ShardDetector shardDetector, LeaseRefresher leaseRefresher, InitialPositionInStreamExtended initialPositionInStream, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you change the order of MetricsFactory and ShardSyncer. We have maintained MetricsFactory as the last parameter of the constructors. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Modified There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you create the deprecated constructor too, with the default being HierarchicalShardSyncer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
boolean cleanupLeasesUponShardCompletion, boolean ignoreUnexpectedChildShards, long shardSyncIdleTimeMillis, | ||
ExecutorService executorService, MetricsFactory metricsFactory, ShardSyncer shardSyncer) { | ||
this.shardDetector = shardDetector; | ||
this.leaseRefresher = leaseRefresher; | ||
this.initialPositionInStream = initialPositionInStream; | ||
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion; | ||
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards; | ||
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis; | ||
this.executorService = executorService; | ||
this.metricsFactory = metricsFactory; | ||
this.shardSyncer = shardSyncer; | ||
} | ||
|
||
private ConsumerTask currentTask; | ||
private Future<TaskResult> future; | ||
|
@@ -82,7 +98,8 @@ private synchronized boolean checkAndSubmitNextTask() { | |
cleanupLeasesUponShardCompletion, | ||
ignoreUnexpectedChildShards, | ||
shardSyncIdleTimeMillis, | ||
metricsFactory), | ||
metricsFactory, | ||
shardSyncer), | ||
metricsFactory); | ||
future = executorService.submit(currentTask); | ||
submittedNewTask = true; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,8 +30,6 @@ | |
|
||
import org.apache.commons.lang.StringUtils; | ||
|
||
import lombok.AccessLevel; | ||
import lombok.NoArgsConstructor; | ||
import lombok.NonNull; | ||
import lombok.RequiredArgsConstructor; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
@@ -49,12 +47,11 @@ | |
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; | ||
|
||
/** | ||
* Helper class to sync leases with shards of the Kinesis stream. | ||
* Helper class per kcl scheduler instance to sync leases with shards of the Kinesis stream. | ||
* It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding). | ||
* It deletes leases for shards that have been trimmed from Kinesis, or if we've completed processing it | ||
* and begun processing it's child shards. | ||
*/ | ||
@NoArgsConstructor(access = AccessLevel.PRIVATE) | ||
@Slf4j | ||
public class ShardSyncer { | ||
/** | ||
|
@@ -71,7 +68,7 @@ public class ShardSyncer { | |
* @throws KinesisClientLibIOException | ||
*/ | ||
// CHECKSTYLE:OFF CyclomaticComplexity | ||
public static synchronized void checkAndCreateLeasesForNewShards(@NonNull final ShardDetector shardDetector, | ||
public synchronized void checkAndCreateLeasesForNewShards(@NonNull final ShardDetector shardDetector, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems like a breaking change. A better way would be to deprecate the existing method and creating a new non-static method. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right, modified. |
||
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, | ||
final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, | ||
final MetricsScope scope) throws DependencyException, InvalidStateException, | ||
|
@@ -152,7 +149,7 @@ private static Set<String> findInconsistentShardIds(final Map<String, Set<String | |
* @return ShardIds of child shards (children of the expectedClosedShard) | ||
* @throws KinesisClientLibIOException | ||
*/ | ||
static synchronized void assertClosedShardsAreCoveredOrAbsent(final Map<String, Shard> shardIdToShardMap, | ||
synchronized void assertClosedShardsAreCoveredOrAbsent(final Map<String, Shard> shardIdToShardMap, | ||
final Map<String, Set<String>> shardIdToChildShardIdsMap, final Set<String> shardIdsOfClosedShards) | ||
throws KinesisClientLibIOException { | ||
final String exceptionMessageSuffix = "This can happen if we constructed the list of shards " | ||
|
@@ -181,7 +178,7 @@ static synchronized void assertClosedShardsAreCoveredOrAbsent(final Map<String, | |
} | ||
} | ||
|
||
private static synchronized void assertHashRangeOfClosedShardIsCovered(final Shard closedShard, | ||
private synchronized void assertHashRangeOfClosedShardIsCovered(final Shard closedShard, | ||
final Map<String, Shard> shardIdToShardMap, final Set<String> childShardIds) | ||
throws KinesisClientLibIOException { | ||
BigInteger minStartingHashKeyOfChildren = null; | ||
|
@@ -583,7 +580,7 @@ static boolean isCandidateForCleanup(final Lease lease, final Set<String> curren | |
* @throws ProvisionedThroughputException | ||
* @throws KinesisClientLibIOException | ||
*/ | ||
private static synchronized void cleanupLeasesOfFinishedShards(final Collection<Lease> currentLeases, | ||
private synchronized void cleanupLeasesOfFinishedShards(final Collection<Lease> currentLeases, | ||
final Map<String, Shard> shardIdToShardMap, final Map<String, Set<String>> shardIdToChildShardIdsMap, | ||
final List<Lease> trackedLeases, final LeaseRefresher leaseRefresher) throws DependencyException, | ||
InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { | ||
|
@@ -625,7 +622,7 @@ private static synchronized void cleanupLeasesOfFinishedShards(final Collection< | |
* @throws InvalidStateException | ||
* @throws DependencyException | ||
*/ | ||
static synchronized void cleanupLeaseForClosedShard(final String closedShardId, final Set<String> childShardIds, | ||
synchronized void cleanupLeaseForClosedShard(final String closedShardId, final Set<String> childShardIds, | ||
final Map<String, Lease> trackedLeases, final LeaseRefresher leaseRefresher) | ||
throws DependencyException, InvalidStateException, ProvisionedThroughputException { | ||
final Lease leaseForClosedShard = trackedLeases.get(closedShardId); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,7 @@ | |
import software.amazon.kinesis.leases.LeaseManagementFactory; | ||
import software.amazon.kinesis.leases.ShardDetector; | ||
import software.amazon.kinesis.leases.ShardSyncTaskManager; | ||
import software.amazon.kinesis.leases.ShardSyncer; | ||
import software.amazon.kinesis.metrics.MetricsFactory; | ||
|
||
/** | ||
|
@@ -50,6 +51,9 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { | |
private final ExecutorService executorService; | ||
@NonNull | ||
private final InitialPositionInStreamExtended initialPositionInStream; | ||
@NonNull | ||
private final ShardSyncer shardSyncer; | ||
|
||
private final long failoverTimeMillis; | ||
private final long epsilonMillis; | ||
private final int maxLeasesForWorker; | ||
|
@@ -116,6 +120,8 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi | |
/** | ||
* Constructor. | ||
* | ||
* <p>NOTE: This constructor is deprecated and will be removed in a future release.</p> | ||
* | ||
* @param kinesisClient | ||
* @param streamName | ||
* @param dynamoDBClient | ||
|
@@ -140,6 +146,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi | |
* @param initialLeaseTableReadCapacity | ||
* @param initialLeaseTableWriteCapacity | ||
*/ | ||
@Deprecated | ||
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, | ||
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, | ||
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream, | ||
|
@@ -150,6 +157,54 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi | |
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, | ||
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, | ||
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity) { | ||
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService, | ||
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, | ||
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, | ||
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, | ||
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, | ||
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, | ||
new ShardSyncer()); | ||
} | ||
|
||
/** | ||
* Constructor. | ||
* | ||
* @param kinesisClient | ||
* @param streamName | ||
* @param dynamoDBClient | ||
* @param tableName | ||
* @param workerIdentifier | ||
* @param executorService | ||
* @param initialPositionInStream | ||
* @param failoverTimeMillis | ||
* @param epsilonMillis | ||
* @param maxLeasesForWorker | ||
* @param maxLeasesToStealAtOneTime | ||
* @param maxLeaseRenewalThreads | ||
* @param cleanupLeasesUponShardCompletion | ||
* @param ignoreUnexpectedChildShards | ||
* @param shardSyncIntervalMillis | ||
* @param consistentReads | ||
* @param listShardsBackoffTimeMillis | ||
* @param maxListShardsRetryAttempts | ||
* @param maxCacheMissesBeforeReload | ||
* @param listShardsCacheAllowedAgeInSeconds | ||
* @param cacheMissWarningModulus | ||
* @param initialLeaseTableReadCapacity | ||
* @param initialLeaseTableWriteCapacity | ||
* @param shardSyncer | ||
*/ | ||
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you make it use the formatter on this change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, | ||
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream, | ||
final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker, | ||
final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, | ||
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, | ||
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, | ||
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, | ||
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, | ||
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, | ||
final ShardSyncer shardSyncer) { | ||
this.kinesisClient = kinesisClient; | ||
this.streamName = streamName; | ||
this.dynamoDBClient = dynamoDBClient; | ||
|
@@ -173,6 +228,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi | |
this.cacheMissWarningModulus = cacheMissWarningModulus; | ||
this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity; | ||
this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity; | ||
this.shardSyncer = shardSyncer; | ||
} | ||
|
||
@Override | ||
|
@@ -198,7 +254,8 @@ public ShardSyncTaskManager createShardSyncTaskManager(@NonNull final MetricsFac | |
ignoreUnexpectedChildShards, | ||
shardSyncIntervalMillis, | ||
executorService, | ||
metricsFactory); | ||
metricsFactory, | ||
shardSyncer); | ||
} | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to create the getter
This should work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes you are right