Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run multiple instance of scheduler on one JVM #395

Merged
merged 11 commits into from
Oct 10, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import software.amazon.kinesis.leases.ShardPrioritization;
import software.amazon.kinesis.leases.ShardSyncTask;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.leases.HierarchichalShardSyncer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator;
import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
Expand Down Expand Up @@ -113,6 +114,7 @@ public class Scheduler implements Runnable {
private final ShardDetector shardDetector;
private final boolean ignoreUnexpetedChildShards;
private final AggregatorUtil aggregatorUtil;
private final HierarchichalShardSyncer hierarchichalShardSyncer;

// Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer.
Expand Down Expand Up @@ -184,6 +186,7 @@ public Scheduler(@NonNull final CheckpointConfig checkpointConfig,
this.shardDetector = this.shardSyncTaskManager.shardDetector();
this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
this.hierarchichalShardSyncer = leaseManagementConfig.hierarchichalShardSyncer();
}

/**
Expand Down Expand Up @@ -226,7 +229,8 @@ private void initialize() {
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
log.info("Syncing Kinesis shard info");
ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition,
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, metricsFactory);
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L,
hierarchichalShardSyncer, metricsFactory);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Misspelled Hierarchical.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ye.. Fixed

result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
} else {
log.info("Skipping shard sync per configuration setting (and lease table is not empty)");
Expand Down Expand Up @@ -559,8 +563,9 @@ protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo,
cleanupLeasesUponShardCompletion,
ignoreUnexpetedChildShards,
shardDetector,
metricsFactory,
aggregatorUtil);
aggregatorUtil,
hierarchichalShardSyncer,
metricsFactory);
return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(), argument);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@

import org.apache.commons.lang.StringUtils;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
Expand All @@ -49,29 +48,33 @@
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

/**
* Helper class to sync leases with shards of the Kinesis stream.
* Helper class per kcl scheduler instance to sync leases with shards of the Kinesis stream.
* It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding).
* It deletes leases for shards that have been trimmed from Kinesis, or if we've completed processing it
* and begun processing it's child shards.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j
public class ShardSyncer {
@KinesisClientInternalApi
public class HierarchichalShardSyncer {

/**
* Object level synchronization
* Check and create leases for any new shards (e.g. following a reshard operation). Sync leases with Kinesis shards
* (e.g. at startup, or when we reach end of a shard).
*
*
* @param shardDetector
* @param leaseRefresher
* @param initialPosition
* @param cleanupLeasesOfCompletedShards
* @param ignoreUnexpectedChildShards
* @param scope
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
// CHECKSTYLE:OFF CyclomaticComplexity
public static synchronized void checkAndCreateLeasesForNewShards(@NonNull final ShardDetector shardDetector,
public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
final MetricsScope scope) throws DependencyException, InvalidStateException,
Expand Down Expand Up @@ -152,7 +155,7 @@ private static Set<String> findInconsistentShardIds(final Map<String, Set<String
* @return ShardIds of child shards (children of the expectedClosedShard)
* @throws KinesisClientLibIOException
*/
static synchronized void assertClosedShardsAreCoveredOrAbsent(final Map<String, Shard> shardIdToShardMap,
synchronized void assertClosedShardsAreCoveredOrAbsent(final Map<String, Shard> shardIdToShardMap,
final Map<String, Set<String>> shardIdToChildShardIdsMap, final Set<String> shardIdsOfClosedShards)
throws KinesisClientLibIOException {
final String exceptionMessageSuffix = "This can happen if we constructed the list of shards "
Expand Down Expand Up @@ -181,7 +184,7 @@ static synchronized void assertClosedShardsAreCoveredOrAbsent(final Map<String,
}
}

private static synchronized void assertHashRangeOfClosedShardIsCovered(final Shard closedShard,
private synchronized void assertHashRangeOfClosedShardIsCovered(final Shard closedShard,
final Map<String, Shard> shardIdToShardMap, final Set<String> childShardIds)
throws KinesisClientLibIOException {
BigInteger minStartingHashKeyOfChildren = null;
Expand Down Expand Up @@ -583,7 +586,7 @@ static boolean isCandidateForCleanup(final Lease lease, final Set<String> curren
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
private static synchronized void cleanupLeasesOfFinishedShards(final Collection<Lease> currentLeases,
private synchronized void cleanupLeasesOfFinishedShards(final Collection<Lease> currentLeases,
final Map<String, Shard> shardIdToShardMap, final Map<String, Set<String>> shardIdToChildShardIdsMap,
final List<Lease> trackedLeases, final LeaseRefresher leaseRefresher) throws DependencyException,
InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
Expand Down Expand Up @@ -625,7 +628,7 @@ private static synchronized void cleanupLeasesOfFinishedShards(final Collection<
* @throws InvalidStateException
* @throws DependencyException
*/
static synchronized void cleanupLeaseForClosedShard(final String closedShardId, final Set<String> childShardIds,
synchronized void cleanupLeaseForClosedShard(final String closedShardId, final Set<String> childShardIds,
final Map<String, Lease> trackedLeases, final LeaseRefresher leaseRefresher)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
final Lease leaseForClosedShard = trackedLeases.get(closedShardId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ static class LeaseManagementThreadPool extends ThreadPoolExecutor {
}
};

private HierarchichalShardSyncer hierarchichalShardSyncer = new HierarchichalShardSyncer();

private LeaseManagementFactory leaseManagementFactory;

public LeaseManagementFactory leaseManagementFactory() {
Expand All @@ -246,7 +248,8 @@ public LeaseManagementFactory leaseManagementFactory() {
listShardsCacheAllowedAgeInSeconds(),
cacheMissWarningModulus(),
initialLeaseTableReadCapacity(),
initialLeaseTableWriteCapacity());
initialLeaseTableWriteCapacity(),
hierarchichalShardSyncer());
}
return leaseManagementFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class ShardSyncTask implements ConsumerTask {
private final boolean ignoreUnexpectedChildShards;
private final long shardSyncTaskIdleTimeMillis;
@NonNull
private final HierarchichalShardSyncer hierarchichalShardSyncer;
@NonNull
private final MetricsFactory metricsFactory;

private final TaskType taskType = TaskType.SHARDSYNC;
Expand All @@ -62,7 +64,8 @@ public TaskResult call() {
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, SHARD_SYNC_TASK_OPERATION);

try {
ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, leaseRefresher, initialPosition,
hierarchichalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, scope);
if (shardSyncTaskIdleTimeMillis > 0) {
Thread.sleep(shardSyncTaskIdleTimeMillis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,24 @@ public class ShardSyncTaskManager {
@NonNull
private final ExecutorService executorService;
@NonNull
private final HierarchichalShardSyncer hierarchichalShardSyncer;
@NonNull
private final MetricsFactory metricsFactory;

public ShardSyncTaskManager(ShardDetector shardDetector, LeaseRefresher leaseRefresher, InitialPositionInStreamExtended initialPositionInStream,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you change the order of MetricsFactory and ShardSyncer. We have maintained MetricsFactory as the last parameter of the constructors.

Copy link
Contributor Author

@xiaoyu-meng-mxy xiaoyu-meng-mxy Sep 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you create the deprecated constructor too, with the default being HierarchicalShardSyncer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

boolean cleanupLeasesUponShardCompletion, boolean ignoreUnexpectedChildShards, long shardSyncIdleTimeMillis,
ExecutorService executorService, HierarchichalShardSyncer hierarchichalShardSyncer, MetricsFactory metricsFactory) {
this.shardDetector = shardDetector;
this.leaseRefresher = leaseRefresher;
this.initialPositionInStream = initialPositionInStream;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
this.executorService = executorService;
this.hierarchichalShardSyncer = hierarchichalShardSyncer;
this.metricsFactory = metricsFactory;
}

private ConsumerTask currentTask;
private Future<TaskResult> future;

Expand Down Expand Up @@ -81,7 +97,7 @@ private synchronized boolean checkAndSubmitNextTask() {
initialPositionInStream,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
shardSyncIdleTimeMillis,
shardSyncIdleTimeMillis, hierarchichalShardSyncer,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you follow the formatting style for the file and include the parameter on a new line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

metricsFactory),
metricsFactory);
future = executorService.submit(currentTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.HierarchichalShardSyncer;
import software.amazon.kinesis.leases.KinesisShardDetector;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementFactory;
Expand Down Expand Up @@ -50,6 +51,9 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final ExecutorService executorService;
@NonNull
private final InitialPositionInStreamExtended initialPositionInStream;
@NonNull
private final HierarchichalShardSyncer hierarchichalShardSyncer;

private final long failoverTimeMillis;
private final long epsilonMillis;
private final int maxLeasesForWorker;
Expand Down Expand Up @@ -116,6 +120,8 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi
/**
* Constructor.
*
* <p>NOTE: This constructor is deprecated and will be removed in a future release.</p>
*
* @param kinesisClient
* @param streamName
* @param dynamoDBClient
Expand All @@ -140,6 +146,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
*/
@Deprecated
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream,
Expand All @@ -150,6 +157,54 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity) {
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService,
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
new HierarchichalShardSyncer());
}

/**
* Constructor.
*
* @param kinesisClient
* @param streamName
* @param dynamoDBClient
* @param tableName
* @param workerIdentifier
* @param executorService
* @param initialPositionInStream
* @param failoverTimeMillis
* @param epsilonMillis
* @param maxLeasesForWorker
* @param maxLeasesToStealAtOneTime
* @param maxLeaseRenewalThreads
* @param cleanupLeasesUponShardCompletion
* @param ignoreUnexpectedChildShards
* @param shardSyncIntervalMillis
* @param consistentReads
* @param listShardsBackoffTimeMillis
* @param maxListShardsRetryAttempts
* @param maxCacheMissesBeforeReload
* @param listShardsCacheAllowedAgeInSeconds
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param hierarchichalShardSyncer
*/
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
Copy link
Contributor

@sahilpalvia sahilpalvia Sep 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make it use the formatter on this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream,
final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchichalShardSyncer hierarchichalShardSyncer) {
this.kinesisClient = kinesisClient;
this.streamName = streamName;
this.dynamoDBClient = dynamoDBClient;
Expand All @@ -173,6 +228,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi
this.cacheMissWarningModulus = cacheMissWarningModulus;
this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity;
this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity;
this.hierarchichalShardSyncer = hierarchichalShardSyncer;
}

@Override
Expand All @@ -198,6 +254,7 @@ public ShardSyncTaskManager createShardSyncTaskManager(@NonNull final MetricsFac
ignoreUnexpectedChildShards,
shardSyncIntervalMillis,
executorService,
hierarchichalShardSyncer,
metricsFactory);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package software.amazon.kinesis.leases.exceptions;

import lombok.NonNull;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.HierarchichalShardSyncer;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.metrics.MetricsScope;

/**
* This class is deprecated
* Helper class to sync leases with shards of the Kinesis stream.
*/
@Deprecated
public class ShardSyncer {
private static final HierarchichalShardSyncer HIERARCHICHAL_SHARD_SYNCER = new HierarchichalShardSyncer();

/**
* This class is deprecated
* Class level synchronization
*
* @param shardDetector
* @param leaseRefresher
* @param initialPosition
* @param cleanupLeasesOfCompletedShards
* @param ignoreUnexpectedChildShards
* @param scope
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
@Deprecated
public static synchronized void checkAndCreateLeasesForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
final MetricsScope scope) throws DependencyException, InvalidStateException,
ProvisionedThroughputException, KinesisClientLibIOException {
HIERARCHICHAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(
shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ public ConsumerTask createTask(ShardConsumerArgument argument, ShardConsumer con
argument.leaseRefresher(),
argument.taskBackoffTimeMillis(),
argument.recordsPublisher(),
argument.hierarchichalShardSyncer(),
argument.metricsFactory());
}

Expand Down
Loading