Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 210] - allow unexpected child shards to be ignored #240

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ public ITask createTask(ShardConsumer consumer) {
consumer.getStreamConfig().getStreamProxy(),
consumer.getStreamConfig().getInitialPositionInStream(),
consumer.isCleanupLeasesOfCompletedShards(),
consumer.isIgnoreUnexpectedChildShards(),
consumer.getLeaseManager(),
consumer.getTaskBackoffTimeMillis(),
consumer.getGetRecordsCache());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ public class KinesisClientLibConfiguration {
private boolean callProcessRecordsEvenForEmptyRecordList;
private long parentShardPollIntervalMillis;
private boolean cleanupLeasesUponShardCompletion;
private boolean ignoreUnexpectedChildShards;
private ClientConfiguration kinesisClientConfig;
private ClientConfiguration dynamoDBClientConfig;
private ClientConfiguration cloudWatchClientConfig;
Expand Down Expand Up @@ -802,6 +803,13 @@ public boolean shouldCleanupLeasesUponShardCompletion() {
return cleanupLeasesUponShardCompletion;
}

/**
* @return true if we should ignore child shards which have open parents
*/
public boolean shouldIgnoreUnexpectedChildShards() {
return ignoreUnexpectedChildShards;
}

/**
* @return true if KCL should validate client provided sequence numbers with a call to Amazon Kinesis before
* checkpointing for calls to {@link RecordProcessorCheckpointer#checkpoint(String)}
Expand Down Expand Up @@ -1022,6 +1030,16 @@ public KinesisClientLibConfiguration withCleanupLeasesUponShardCompletion(
return this;
}

/**
* @param ignoreUnexpectedChildShards Ignore child shards with open parents.
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withIgnoreUnexpectedChildShards(
boolean ignoreUnexpectedChildShards) {
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
return this;
}

/**
* @param clientConfig Common client configuration used by Kinesis/DynamoDB/CloudWatch client
* @return KinesisClientLibConfiguration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,10 @@ boolean isCleanupLeasesOfCompletedShards() {
return cleanupLeasesOfCompletedShards;
}

boolean isIgnoreUnexpectedChildShards() {
return config.shouldIgnoreUnexpectedChildShards();
}

long getTaskBackoffTimeMillis() {
return taskBackoffTimeMillis;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class ShardSyncTask implements ITask {
private final ILeaseManager<KinesisClientLease> leaseManager;
private InitialPositionInStreamExtended initialPosition;
private final boolean cleanupLeasesUponShardCompletion;
private final boolean ignoreUnexpectedChildShards;
private final long shardSyncTaskIdleTimeMillis;
private final TaskType taskType = TaskType.SHARDSYNC;

Expand All @@ -49,11 +50,13 @@ class ShardSyncTask implements ITask {
ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesUponShardCompletion,
boolean ignoreUnexpectedChildShards,
long shardSyncTaskIdleTimeMillis) {
this.kinesisProxy = kinesisProxy;
this.leaseManager = leaseManager;
this.initialPosition = initialPositionInStream;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.shardSyncTaskIdleTimeMillis = shardSyncTaskIdleTimeMillis;
}

Expand All @@ -68,7 +71,8 @@ public TaskResult call() {
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
leaseManager,
initialPosition,
cleanupLeasesUponShardCompletion);
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards);
if (shardSyncTaskIdleTimeMillis > 0) {
Thread.sleep(shardSyncTaskIdleTimeMillis);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class ShardSyncTaskManager {
private final ExecutorService executorService;
private final InitialPositionInStreamExtended initialPositionInStream;
private boolean cleanupLeasesUponShardCompletion;
private boolean ignoreUnexpectedChildShards;
private final long shardSyncIdleTimeMillis;


Expand All @@ -55,6 +56,7 @@ class ShardSyncTaskManager {
* @param initialPositionInStream Initial position in stream
* @param cleanupLeasesUponShardCompletion Clean up leases for shards that we've finished processing (don't wait
* until they expire)
* @param ignoreUnexpectedChildShards Ignore child shards with open parents
* @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards
* @param metricsFactory Metrics factory
* @param executorService ExecutorService to execute the shard sync tasks
Expand All @@ -63,13 +65,15 @@ class ShardSyncTaskManager {
final ILeaseManager<KinesisClientLease> leaseManager,
final InitialPositionInStreamExtended initialPositionInStream,
final boolean cleanupLeasesUponShardCompletion,
final boolean ignoreUnexpectedChildShards,
final long shardSyncIdleTimeMillis,
final IMetricsFactory metricsFactory,
ExecutorService executorService) {
this.kinesisProxy = kinesisProxy;
this.leaseManager = leaseManager;
this.metricsFactory = metricsFactory;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
this.executorService = executorService;
this.initialPositionInStream = initialPositionInStream;
Expand Down Expand Up @@ -99,6 +103,7 @@ private synchronized boolean checkAndSubmitNextTask(Set<String> closedShardIds)
leaseManager,
initialPositionInStream,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
shardSyncIdleTimeMillis), metricsFactory);
future = executorService.submit(currentTask);
submittedNewTask = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.lang.StringUtils;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
Expand Down Expand Up @@ -60,9 +61,11 @@ private ShardSyncer() {
static synchronized void bootstrapShardLeases(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards)
boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards);
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards);
}

/**
Expand All @@ -71,33 +74,38 @@ static synchronized void bootstrapShardLeases(IKinesisProxy kinesisProxy,
* @param kinesisProxy
* @param leaseManager
* @param initialPositionInStream
* @param expectedClosedShardId If this is not null, we will assert that the shard list we get from Kinesis
* shows this shard to be closed (e.g. parent shard must be closed after a reshard operation).
* If it is open, we assume this is an race condition around a reshard event and throw
* a KinesisClientLibIOException so client can backoff and retry later.
* @param cleanupLeasesOfCompletedShards
* @param ignoreUnexpectedChildShards
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards);
}

static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards);
checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, false);
}

/**
* Sync leases with Kinesis shards (e.g. at startup, or when we reach end of a shard).
*
* @param kinesisProxy
* @param leaseManager
* @param expectedClosedShardId If this is not null, we will assert that the shard list we get from Kinesis
* does not show this shard to be open (e.g. parent shard must be closed after a reshard operation).
* If it is still open, we assume this is a race condition around a reshard event and
* throw a KinesisClientLibIOException so client can backoff and retry later. If the shard doesn't exist in
* Kinesis at all, we assume this is an old/expired shard and continue with the sync operation.
* @param initialPosition
* @param cleanupLeasesOfCompletedShards
* @param ignoreUnexpectedChildShards
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
Expand All @@ -107,18 +115,23 @@ static synchronized void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisP
private static synchronized void syncShardLeases(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPosition,
boolean cleanupLeasesOfCompletedShards)
boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
List<Shard> shards = getShardList(kinesisProxy);
LOG.debug("Num shards: " + shards.size());

Map<String, Shard> shardIdToShardMap = constructShardIdToShardMap(shards);
Map<String, Set<String>> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap(shardIdToShardMap);
assertAllParentShardsAreClosed(shardIdToChildShardIdsMap, shardIdToShardMap);

Set<String> inconsistentShardIds = findInconsistentShardIds(shardIdToChildShardIdsMap, shardIdToShardMap);
if (!ignoreUnexpectedChildShards) {
assertAllParentShardsAreClosed(inconsistentShardIds);
}

List<KinesisClientLease> currentLeases = leaseManager.listLeases();

List<KinesisClientLease> newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition);

List<KinesisClientLease> newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition,
inconsistentShardIds);
LOG.debug("Num new leases to create: " + newLeasesToCreate.size());
for (KinesisClientLease lease : newLeasesToCreate) {
long startTimeMillis = System.currentTimeMillis();
Expand Down Expand Up @@ -149,19 +162,37 @@ private static synchronized void syncShardLeases(IKinesisProxy kinesisProxy,

/** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls
* and a reshard operation.
* @param inconsistentShardIds
* @throws KinesisClientLibIOException
*/
private static void assertAllParentShardsAreClosed(Set<String> inconsistentShardIds)
throws KinesisClientLibIOException {
if (!inconsistentShardIds.isEmpty()) {
String ids = StringUtils.join(inconsistentShardIds, ' ');
throw new KinesisClientLibIOException(String.format("%d open child shards (%s) are inconsistent. "
+ "This can happen due to a race condition between describeStream and a reshard operation.",
inconsistentShardIds.size(), ids));
}
}

/**
* Helper method to construct the list of inconsistent shards, which are open shards with non-closed ancestor
* parent(s).
* @param shardIdToChildShardIdsMap
* @param shardIdToShardMap
* @throws KinesisClientLibIOException
* @return Set of inconsistent open shard ids for shards having open parents.
*/
private static void assertAllParentShardsAreClosed(Map<String, Set<String>> shardIdToChildShardIdsMap,
Map<String, Shard> shardIdToShardMap) throws KinesisClientLibIOException {
private static Set<String> findInconsistentShardIds(Map<String, Set<String>> shardIdToChildShardIdsMap,
Map<String, Shard> shardIdToShardMap) {
Set<String> result = new HashSet<String>();
for (String parentShardId : shardIdToChildShardIdsMap.keySet()) {
Shard parentShard = shardIdToShardMap.get(parentShardId);
if ((parentShardId == null) || (parentShard.getSequenceNumberRange().getEndingSequenceNumber() == null)) {
throw new KinesisClientLibIOException("Parent shardId " + parentShardId + " is not closed. "
+ "This can happen due to a race condition between describeStream and a reshard operation.");
Set<String> childShardIdsMap = shardIdToChildShardIdsMap.get(parentShardId);
result.addAll(childShardIdsMap);
}
}
return result;
}

/**
Expand Down Expand Up @@ -296,8 +327,8 @@ private static List<Shard> getShardList(IKinesisProxy kinesisProxy) throws Kines
/**
* Determine new leases to create and their initial checkpoint.
* Note: Package level access only for testing purposes.
*
* For each open (no ending sequence number) shard that doesn't already have a lease,
*
* For each open (no ending sequence number) shard without open parents that doesn't already have a lease,
* determine if it is a descendent of any shard which is or will be processed (e.g. for which a lease exists):
* If so, set checkpoint of the shard to TrimHorizon and also create leases for ancestors if needed.
* If not, set checkpoint of the shard to the initial position specified by the client.
Expand All @@ -315,27 +346,35 @@ private static List<Shard> getShardList(IKinesisProxy kinesisProxy) throws Kines
*
* For example:
* Shard structure (each level depicts a stream segment):
* 0 1 2 3 4 5- shards till epoch 102
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
* \ / | /\
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
* 0 1 2 3 4 5 - shards till epoch 102
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | / \
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
* Current leases: (3, 4, 5)
* New leases to create: (2, 6, 7, 8, 9, 10)
*
* The leases returned are sorted by the starting sequence number - following the same order
* when persisting the leases in DynamoDB will ensure that we recover gracefully if we fail
* before creating all the leases.
*
* If a shard has no existing lease, is open, and is a descendant of a parent which is still open, we ignore it
* here; this happens when the list of shards is inconsistent, which could be due to pagination delay for very
* high shard count streams (i.e., dynamodb streams for tables with thousands of partitions). This can only
* currently happen here if ignoreUnexpectedChildShards was true in syncShardleases.
*
*
* @param shards List of all shards in Kinesis (we'll create new leases based on this set)
* @param currentLeases List of current leases
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. We'll start fetching records from that
* location in the shard (when an application starts up for the first time - and there are no checkpoints).
* @param inconsistentShardIds Set of child shard ids having open parents.
* @return List of new leases to create sorted by starting sequenceNumber of the corresponding shard
*/
static List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards,
List<KinesisClientLease> currentLeases,
InitialPositionInStreamExtended initialPosition) {
InitialPositionInStreamExtended initialPosition,
Set<String> inconsistentShardIds) {
Map<String, KinesisClientLease> shardIdToNewLeaseMap = new HashMap<String, KinesisClientLease>();
Map<String, Shard> shardIdToShardMapOfAllKinesisShards = constructShardIdToShardMap(shards);

Expand All @@ -354,6 +393,8 @@ static List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards,
LOG.debug("Evaluating leases for open shard " + shardId + " and its ancestors.");
if (shardIdsOfCurrentLeases.contains(shardId)) {
LOG.debug("Lease for shardId " + shardId + " already exists. Not creating a lease");
} else if (inconsistentShardIds.contains(shardId)) {
LOG.info("shardId " + shardId + " is an inconsistent child. Not creating a lease");
} else {
LOG.debug("Need to create a lease for shardId " + shardId);
KinesisClientLease newLease = newKCLLease(shard);
Expand Down Expand Up @@ -407,6 +448,17 @@ static List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards,
return newLeasesToCreate;
}

/**
* Determine new leases to create and their initial checkpoint.
* Note: Package level access only for testing purposes.
*/
static List<KinesisClientLease> determineNewLeasesToCreate(List<Shard> shards,
List<KinesisClientLease> currentLeases,
InitialPositionInStreamExtended initialPosition) {
Set<String> inconsistentShardIds = new HashSet<String>();
return determineNewLeasesToCreate(shards, currentLeases, initialPosition, inconsistentShardIds);
}

/**
* Note: Package level access for testing purposes only.
* Check if this shard is a descendant of a shard that is (or will be) processed.
Expand Down
Loading