Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run multiple instance of scheduler on one JVM #395

Merged
merged 11 commits into from
Oct 10, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import software.amazon.kinesis.leases.ShardPrioritization;
import software.amazon.kinesis.leases.ShardSyncTask;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.leases.ShardSyncer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator;
import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
Expand Down Expand Up @@ -113,6 +114,7 @@ public class Scheduler implements Runnable {
private final ShardDetector shardDetector;
private final boolean ignoreUnexpetedChildShards;
private final AggregatorUtil aggregatorUtil;
private final ShardSyncer shardSyncer;

// Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer.
Expand Down Expand Up @@ -184,6 +186,7 @@ public Scheduler(@NonNull final CheckpointConfig checkpointConfig,
this.shardDetector = this.shardSyncTaskManager.shardDetector();
this.ignoreUnexpetedChildShards = this.leaseManagementConfig.ignoreUnexpectedChildShards();
this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
this.shardSyncer = leaseManagementConfig.shardSyncer();
}

/**
Expand Down Expand Up @@ -226,7 +229,7 @@ private void initialize() {
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
log.info("Syncing Kinesis shard info");
ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition,
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, metricsFactory);
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, metricsFactory, shardSyncer);
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
} else {
log.info("Skipping shard sync per configuration setting (and lease table is not empty)");
Expand Down Expand Up @@ -560,7 +563,8 @@ protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo,
ignoreUnexpetedChildShards,
shardDetector,
metricsFactory,
aggregatorUtil);
aggregatorUtil,
shardSyncer);
return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(), argument);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,15 @@ static class LeaseManagementThreadPool extends ThreadPoolExecutor {
}
};

private ShardSyncer shardSyncer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to create the getter

private ShardSyncer shardSyncer = new ShardSyncer();

This should work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes you are right


public ShardSyncer shardSyncer() {
if (shardSyncer == null) {
shardSyncer = new ShardSyncer();
}
return shardSyncer;
}

private LeaseManagementFactory leaseManagementFactory;

public LeaseManagementFactory leaseManagementFactory() {
Expand All @@ -246,7 +255,8 @@ public LeaseManagementFactory leaseManagementFactory() {
listShardsCacheAllowedAgeInSeconds(),
cacheMissWarningModulus(),
initialLeaseTableReadCapacity(),
initialLeaseTableWriteCapacity());
initialLeaseTableWriteCapacity(),
shardSyncer());
}
return leaseManagementFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class ShardSyncTask implements ConsumerTask {
private final long shardSyncTaskIdleTimeMillis;
@NonNull
private final MetricsFactory metricsFactory;
@NonNull
private final ShardSyncer shardSyncer;

private final TaskType taskType = TaskType.SHARDSYNC;

Expand All @@ -62,7 +64,7 @@ public TaskResult call() {
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, SHARD_SYNC_TASK_OPERATION);

try {
ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, leaseRefresher, initialPosition,
shardSyncer.checkAndCreateLeasesForNewShards(shardDetector, leaseRefresher, initialPosition,
cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, scope);
if (shardSyncTaskIdleTimeMillis > 0) {
Thread.sleep(shardSyncTaskIdleTimeMillis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,22 @@ public class ShardSyncTaskManager {
private final ExecutorService executorService;
@NonNull
private final MetricsFactory metricsFactory;
@NonNull
private final ShardSyncer shardSyncer;

public ShardSyncTaskManager(ShardDetector shardDetector, LeaseRefresher leaseRefresher, InitialPositionInStreamExtended initialPositionInStream,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you change the order of MetricsFactory and ShardSyncer. We have maintained MetricsFactory as the last parameter of the constructors.

Copy link
Contributor Author

@xiaoyu-meng-mxy xiaoyu-meng-mxy Sep 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you create the deprecated constructor too, with the default being HierarchicalShardSyncer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

boolean cleanupLeasesUponShardCompletion, boolean ignoreUnexpectedChildShards, long shardSyncIdleTimeMillis,
ExecutorService executorService, MetricsFactory metricsFactory, ShardSyncer shardSyncer) {
this.shardDetector = shardDetector;
this.leaseRefresher = leaseRefresher;
this.initialPositionInStream = initialPositionInStream;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
this.executorService = executorService;
this.metricsFactory = metricsFactory;
this.shardSyncer = shardSyncer;
}

private ConsumerTask currentTask;
private Future<TaskResult> future;
Expand Down Expand Up @@ -82,7 +98,8 @@ private synchronized boolean checkAndSubmitNextTask() {
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
shardSyncIdleTimeMillis,
metricsFactory),
metricsFactory,
shardSyncer),
metricsFactory);
future = executorService.submit(currentTask);
submittedNewTask = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@

import org.apache.commons.lang.StringUtils;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -49,12 +47,11 @@
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

/**
* Helper class to sync leases with shards of the Kinesis stream.
* Helper class per kcl scheduler instance to sync leases with shards of the Kinesis stream.
* It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding).
* It deletes leases for shards that have been trimmed from Kinesis, or if we've completed processing it
* and begun processing it's child shards.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j
public class ShardSyncer {
/**
Expand All @@ -71,7 +68,7 @@ public class ShardSyncer {
* @throws KinesisClientLibIOException
*/
// CHECKSTYLE:OFF CyclomaticComplexity
public static synchronized void checkAndCreateLeasesForNewShards(@NonNull final ShardDetector shardDetector,
public synchronized void checkAndCreateLeasesForNewShards(@NonNull final ShardDetector shardDetector,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a breaking change. A better way would be to deprecate the existing method and creating a new non-static method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, modified.

final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
final MetricsScope scope) throws DependencyException, InvalidStateException,
Expand Down Expand Up @@ -152,7 +149,7 @@ private static Set<String> findInconsistentShardIds(final Map<String, Set<String
* @return ShardIds of child shards (children of the expectedClosedShard)
* @throws KinesisClientLibIOException
*/
static synchronized void assertClosedShardsAreCoveredOrAbsent(final Map<String, Shard> shardIdToShardMap,
synchronized void assertClosedShardsAreCoveredOrAbsent(final Map<String, Shard> shardIdToShardMap,
final Map<String, Set<String>> shardIdToChildShardIdsMap, final Set<String> shardIdsOfClosedShards)
throws KinesisClientLibIOException {
final String exceptionMessageSuffix = "This can happen if we constructed the list of shards "
Expand Down Expand Up @@ -181,7 +178,7 @@ static synchronized void assertClosedShardsAreCoveredOrAbsent(final Map<String,
}
}

private static synchronized void assertHashRangeOfClosedShardIsCovered(final Shard closedShard,
private synchronized void assertHashRangeOfClosedShardIsCovered(final Shard closedShard,
final Map<String, Shard> shardIdToShardMap, final Set<String> childShardIds)
throws KinesisClientLibIOException {
BigInteger minStartingHashKeyOfChildren = null;
Expand Down Expand Up @@ -583,7 +580,7 @@ static boolean isCandidateForCleanup(final Lease lease, final Set<String> curren
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
private static synchronized void cleanupLeasesOfFinishedShards(final Collection<Lease> currentLeases,
private synchronized void cleanupLeasesOfFinishedShards(final Collection<Lease> currentLeases,
final Map<String, Shard> shardIdToShardMap, final Map<String, Set<String>> shardIdToChildShardIdsMap,
final List<Lease> trackedLeases, final LeaseRefresher leaseRefresher) throws DependencyException,
InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
Expand Down Expand Up @@ -625,7 +622,7 @@ private static synchronized void cleanupLeasesOfFinishedShards(final Collection<
* @throws InvalidStateException
* @throws DependencyException
*/
static synchronized void cleanupLeaseForClosedShard(final String closedShardId, final Set<String> childShardIds,
synchronized void cleanupLeaseForClosedShard(final String closedShardId, final Set<String> childShardIds,
final Map<String, Lease> trackedLeases, final LeaseRefresher leaseRefresher)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
final Lease leaseForClosedShard = trackedLeases.get(closedShardId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import software.amazon.kinesis.leases.LeaseManagementFactory;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.leases.ShardSyncer;
import software.amazon.kinesis.metrics.MetricsFactory;

/**
Expand All @@ -50,6 +51,9 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final ExecutorService executorService;
@NonNull
private final InitialPositionInStreamExtended initialPositionInStream;
@NonNull
private final ShardSyncer shardSyncer;

private final long failoverTimeMillis;
private final long epsilonMillis;
private final int maxLeasesForWorker;
Expand Down Expand Up @@ -116,6 +120,8 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi
/**
* Constructor.
*
* <p>NOTE: This constructor is deprecated and will be removed in a future release.</p>
*
* @param kinesisClient
* @param streamName
* @param dynamoDBClient
Expand All @@ -140,6 +146,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
*/
@Deprecated
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream,
Expand All @@ -150,6 +157,54 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity) {
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService,
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
new ShardSyncer());
}

/**
* Constructor.
*
* @param kinesisClient
* @param streamName
* @param dynamoDBClient
* @param tableName
* @param workerIdentifier
* @param executorService
* @param initialPositionInStream
* @param failoverTimeMillis
* @param epsilonMillis
* @param maxLeasesForWorker
* @param maxLeasesToStealAtOneTime
* @param maxLeaseRenewalThreads
* @param cleanupLeasesUponShardCompletion
* @param ignoreUnexpectedChildShards
* @param shardSyncIntervalMillis
* @param consistentReads
* @param listShardsBackoffTimeMillis
* @param maxListShardsRetryAttempts
* @param maxCacheMissesBeforeReload
* @param listShardsCacheAllowedAgeInSeconds
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param shardSyncer
*/
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
Copy link
Contributor

@sahilpalvia sahilpalvia Sep 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make it use the formatter on this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream,
final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final ShardSyncer shardSyncer) {
this.kinesisClient = kinesisClient;
this.streamName = streamName;
this.dynamoDBClient = dynamoDBClient;
Expand All @@ -173,6 +228,7 @@ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, fi
this.cacheMissWarningModulus = cacheMissWarningModulus;
this.initialLeaseTableReadCapacity = initialLeaseTableReadCapacity;
this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity;
this.shardSyncer = shardSyncer;
}

@Override
Expand All @@ -198,7 +254,8 @@ public ShardSyncTaskManager createShardSyncTaskManager(@NonNull final MetricsFac
ignoreUnexpectedChildShards,
shardSyncIntervalMillis,
executorService,
metricsFactory);
metricsFactory,
shardSyncer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,8 @@ public ConsumerTask createTask(ShardConsumerArgument argument, ShardConsumer con
argument.leaseRefresher(),
argument.taskBackoffTimeMillis(),
argument.recordsPublisher(),
argument.metricsFactory());
argument.metricsFactory(),
argument.shardSyncer());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardSyncer;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.ShardRecordProcessor;
Expand Down Expand Up @@ -68,4 +69,5 @@ public class ShardConsumerArgument {
@NonNull
private final MetricsFactory metricsFactory;
private final AggregatorUtil aggregatorUtil;
private final ShardSyncer shardSyncer;
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public class ShutdownTask implements ConsumerTask {
private final RecordsPublisher recordsPublisher;
@NonNull
private final MetricsFactory metricsFactory;
@NonNull
private final ShardSyncer shardSyncer;

private final TaskType taskType = TaskType.SHUTDOWN;

Expand Down Expand Up @@ -123,7 +125,7 @@ public TaskResult call() {
if (reason == ShutdownReason.SHARD_END) {
log.debug("Looking for child shards of shard {}", shardInfo.shardId());
// create leases for the child shards
ShardSyncer.checkAndCreateLeasesForNewShards(shardDetector, leaseRefresher, initialPositionInStream,
shardSyncer.checkAndCreateLeasesForNewShards(shardDetector, leaseRefresher, initialPositionInStream,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope);
log.debug("Finished checking for child shards of shard {}", shardInfo.shardId());
}
Expand Down
Loading