From dc96518ea4fb62395c977e91d1eda24d4dfda703 Mon Sep 17 00:00:00 2001 From: Amatya Date: Mon, 7 Feb 2022 08:05:50 +0530 Subject: [PATCH 01/14] Optimize kinesis ingestion task assignment by considering only relevant shards --- .../kinesis/KinesisRecordSupplier.java | 113 ++++++++++++++++-- .../seekablestream/common/RecordSupplier.java | 6 +- 2 files changed, 108 insertions(+), 11 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index c6b136298c21..379689a1dbf5 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -72,6 +72,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.BlockingQueue; @@ -414,6 +415,14 @@ private long getPartitionTimeLag() private volatile boolean closed = false; private AtomicBoolean partitionsFetchStarted = new AtomicBoolean(); + /** + * Determining if a shard has data requires a kinesis call for records, and is expensive + * Cache these values for each stream at a RecordSupplier level to avoid redundant calls + * Only streams which are part of at least one assigned PartitionResource are considered + */ + private final TreeMap> closedEmptyShardsInStream = new TreeMap<>(); + private final TreeMap> closedNonemptyShardsInStream = new TreeMap<>(); + public KinesisRecordSupplier( AmazonKinesis amazonKinesis, int recordsPerFetch, @@ -579,6 +588,26 @@ public void assign(Set> collection) entry.getValue().stopBackgroundFetch(); } } + + // Handle shard relevance cache at stream level + Set prevStreams = ImmutableSet.copyOf(closedEmptyShardsInStream.keySet()); + Set currentStreams = collection.stream() + .map(StreamPartition::getStream) + .collect(Collectors.toSet()); + // clear cache for unassigned streams + for (String prevStream : prevStreams) { + if (!currentStreams.contains(prevStream)) { + closedEmptyShardsInStream.remove(prevStream); + closedNonemptyShardsInStream.remove(prevStream); + } + } + // handle newly assigned streams + for (String currentStream : currentStreams) { + if (!prevStreams.contains(currentStream)) { + closedEmptyShardsInStream.put(currentStream, new TreeSet<>()); + closedNonemptyShardsInStream.put(currentStream, new TreeSet<>()); + } + } } @Override @@ -667,25 +696,42 @@ public String getEarliestSequenceNumber(StreamPartition partition) * This makes the method resilient to LimitExceeded exceptions (compared to 100 shards, 10 TPS of describeStream) * * @param stream name of stream - * - * @return Set of Shard ids + * @return Immutable set of ids of shards which are relevant for ingestion */ @Override public Set getPartitionIds(String stream) { return wrapExceptions(() -> { - final Set retVal = new TreeSet<>(); + final Set currentlyClosedShardIds = new TreeSet<>(); + ImmutableSet.Builder relevantShardIds = ImmutableSet.builder(); ListShardsRequest request = new ListShardsRequest().withStreamName(stream); + + boolean useCache = closedEmptyShardsInStream.containsKey(stream); + Set emptyShards = closedEmptyShardsInStream.get(stream); + Set nonemptyShards = closedNonemptyShardsInStream.get(stream); + while (true) { ListShardsResult result = kinesis.listShards(request); - retVal.addAll(result.getShards() - .stream() - .map(Shard::getShardId) - .collect(Collectors.toList()) - ); + for (Shard shard : result.getShards()) { + String shardId = shard.getShardId(); + // Open shards are relevant + if (isShardOpen(shard)) { + relevantShardIds.add(shardId); + } else { + currentlyClosedShardIds.add(shardId); + if (isClosedShardRelevant(stream, shardId, useCache, emptyShards, nonemptyShards)) { + relevantShardIds.add(shardId); + } + } + } String nextToken = result.getNextToken(); if (nextToken == null) { - return retVal; + // clean up expired shards + if (useCache) { + emptyShards.retainAll(currentlyClosedShardIds); + nonemptyShards.retainAll(currentlyClosedShardIds); + } + return relevantShardIds.build(); } request = new ListShardsRequest().withNextToken(nextToken); } @@ -945,4 +991,53 @@ private void filterBufferAndResetBackgroundFetch(Set> pa // restart fetching threads partitionResources.values().forEach(x -> x.stopBackgroundFetch()); } + + /** + * If a closed shard has no data when polled for the first time, it can be ignored for ingestion + * If it does have data, its offsets are periodically published to metadata by ingestion tasks + * + * @param stream Name of the stream + * @param shardId id of a closed string belonging to the stream + * @param useCache indicates if this record supplier maintains shard cache for this stream + * @param emptyShards cached set of closed and empty shards in stream + * @param nonemptyShards cached set of closed and non-empty shards in stream + * @return relevance of shard for ingestion + */ + private boolean isClosedShardRelevant(String stream, String shardId, boolean useCache, + Set emptyShards, Set nonemptyShards) + { + if (useCache) { + if (emptyShards.contains(shardId)) { + return false; + } + if (nonemptyShards.contains(shardId)) { + return true; + } + } + + // Fetch records from a shard at most once, when it is closed for the first time + String shardIterator = kinesis.getShardIterator(stream, + shardId, + ShardIteratorType.TRIM_HORIZON.toString()) + .getShardIterator(); + GetRecordsRequest request = new GetRecordsRequest().withShardIterator(shardIterator) + .withLimit(1); + GetRecordsResult shardData = kinesis.getRecords(request); + + boolean isEmpty = shardData.getRecords().isEmpty() + && shardData.getNextShardIterator() == null; + if (useCache) { + if (isEmpty) { + emptyShards.add(shardId); + } else { + nonemptyShards.add(shardId); + } + } + return !isEmpty; + } + + private boolean isShardOpen(Shard shard) + { + return shard.getSequenceNumberRange().getEndingSequenceNumber() == null; + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java index 64a5eebbe237..fbc63e835fb8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java @@ -116,11 +116,13 @@ public interface RecordSupplier partition); /** - * returns the set of partitions under the given stream + * returns the set of relevant partitions under the given stream + * For kafka, this returns all the partitions + * For kinesis, only shards which are either currently OPEN or have at least one record are relevant * * @param stream name of stream * - * @return set of partitions + * @return set of partition ids belonging to the stream */ Set getPartitionIds(String stream); From f8dd11e1152c58d508021becc62c6c3778f4ec5b Mon Sep 17 00:00:00 2001 From: Amatya Date: Mon, 7 Feb 2022 15:28:08 +0530 Subject: [PATCH 02/14] Refactor for better readability --- .../kinesis/KinesisRecordSupplier.java | 69 ++++++++++--------- 1 file changed, 36 insertions(+), 33 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index 379689a1dbf5..8cd868fc5caf 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -420,8 +420,8 @@ private long getPartitionTimeLag() * Cache these values for each stream at a RecordSupplier level to avoid redundant calls * Only streams which are part of at least one assigned PartitionResource are considered */ - private final TreeMap> closedEmptyShardsInStream = new TreeMap<>(); - private final TreeMap> closedNonemptyShardsInStream = new TreeMap<>(); + private final Map> emptyClosedShardsMap = new TreeMap<>(); + private final Map> nonEmptyClosedShardsMap = new TreeMap<>(); public KinesisRecordSupplier( AmazonKinesis amazonKinesis, @@ -590,22 +590,22 @@ public void assign(Set> collection) } // Handle shard relevance cache at stream level - Set prevStreams = ImmutableSet.copyOf(closedEmptyShardsInStream.keySet()); + Set prevStreams = ImmutableSet.copyOf(emptyClosedShardsMap.keySet()); Set currentStreams = collection.stream() .map(StreamPartition::getStream) .collect(Collectors.toSet()); // clear cache for unassigned streams for (String prevStream : prevStreams) { if (!currentStreams.contains(prevStream)) { - closedEmptyShardsInStream.remove(prevStream); - closedNonemptyShardsInStream.remove(prevStream); + emptyClosedShardsMap.remove(prevStream); + nonEmptyClosedShardsMap.remove(prevStream); } } // handle newly assigned streams for (String currentStream : currentStreams) { if (!prevStreams.contains(currentStream)) { - closedEmptyShardsInStream.put(currentStream, new TreeSet<>()); - closedNonemptyShardsInStream.put(currentStream, new TreeSet<>()); + emptyClosedShardsMap.put(currentStream, new TreeSet<>()); + nonEmptyClosedShardsMap.put(currentStream, new TreeSet<>()); } } } @@ -706,9 +706,9 @@ public Set getPartitionIds(String stream) ImmutableSet.Builder relevantShardIds = ImmutableSet.builder(); ListShardsRequest request = new ListShardsRequest().withStreamName(stream); - boolean useCache = closedEmptyShardsInStream.containsKey(stream); - Set emptyShards = closedEmptyShardsInStream.get(stream); - Set nonemptyShards = closedNonemptyShardsInStream.get(stream); + final boolean useCache = nonEmptyClosedShardsMap.containsKey(stream); + final Set emptyClosedShards = emptyClosedShardsMap.get(stream); + final Set nonEmptyClosedShards = nonEmptyClosedShardsMap.get(stream); while (true) { ListShardsResult result = kinesis.listShards(request); @@ -719,7 +719,7 @@ public Set getPartitionIds(String stream) relevantShardIds.add(shardId); } else { currentlyClosedShardIds.add(shardId); - if (isClosedShardRelevant(stream, shardId, useCache, emptyShards, nonemptyShards)) { + if (isClosedShardNonEmpty(stream, shardId, useCache, emptyClosedShards, nonEmptyClosedShards)) { relevantShardIds.add(shardId); } } @@ -728,8 +728,8 @@ public Set getPartitionIds(String stream) if (nextToken == null) { // clean up expired shards if (useCache) { - emptyShards.retainAll(currentlyClosedShardIds); - nonemptyShards.retainAll(currentlyClosedShardIds); + emptyClosedShards.retainAll(currentlyClosedShardIds); + nonEmptyClosedShards.retainAll(currentlyClosedShardIds); } return relevantShardIds.build(); } @@ -999,45 +999,48 @@ private void filterBufferAndResetBackgroundFetch(Set> pa * @param stream Name of the stream * @param shardId id of a closed string belonging to the stream * @param useCache indicates if this record supplier maintains shard cache for this stream - * @param emptyShards cached set of closed and empty shards in stream - * @param nonemptyShards cached set of closed and non-empty shards in stream + * @param emptyClosedShards cached set of closed and empty shards in stream + * @param nonEmptyClosedShards cached set of closed and non-empty shards in stream * @return relevance of shard for ingestion */ - private boolean isClosedShardRelevant(String stream, String shardId, boolean useCache, - Set emptyShards, Set nonemptyShards) + private boolean isClosedShardNonEmpty(String stream, String shardId, boolean useCache, + Set emptyClosedShards, Set nonEmptyClosedShards) { if (useCache) { - if (emptyShards.contains(shardId)) { + if (emptyClosedShards.contains(shardId)) { return false; } - if (nonemptyShards.contains(shardId)) { + if (nonEmptyClosedShards.contains(shardId)) { return true; } } - // Fetch records from a shard at most once, when it is closed for the first time - String shardIterator = kinesis.getShardIterator(stream, - shardId, - ShardIteratorType.TRIM_HORIZON.toString()) - .getShardIterator(); - GetRecordsRequest request = new GetRecordsRequest().withShardIterator(shardIterator) - .withLimit(1); - GetRecordsResult shardData = kinesis.getRecords(request); + boolean emptyAndClosed = isShardEmpty(stream, shardId); - boolean isEmpty = shardData.getRecords().isEmpty() - && shardData.getNextShardIterator() == null; if (useCache) { - if (isEmpty) { - emptyShards.add(shardId); + if (emptyAndClosed) { + emptyClosedShards.add(shardId); } else { - nonemptyShards.add(shardId); + nonEmptyClosedShards.add(shardId); } } - return !isEmpty; + return !emptyAndClosed; } private boolean isShardOpen(Shard shard) { return shard.getSequenceNumberRange().getEndingSequenceNumber() == null; } + + private boolean isShardEmpty(String stream, String shardId) { + String shardIterator = kinesis.getShardIterator(stream, + shardId, + ShardIteratorType.TRIM_HORIZON.toString()) + .getShardIterator(); + GetRecordsRequest request = new GetRecordsRequest().withShardIterator(shardIterator) + .withLimit(1); + GetRecordsResult shardData = kinesis.getRecords(request); + + return shardData.getRecords().isEmpty() && shardData.getNextShardIterator() == null; + } } From df9a72ee6c7b7b72e75c65ed7fbbd6a92f02b0f4 Mon Sep 17 00:00:00 2001 From: Amatya Date: Tue, 8 Feb 2022 13:17:30 +0530 Subject: [PATCH 03/14] Move shard cache from KinesisRecordSupplier to KinesisSupervisor --- .../kinesis/KinesisRecordSupplier.java | 155 +++++------------- .../kinesis/supervisor/KinesisSupervisor.java | 84 ++++++++++ .../KinesisSupervisorTuningConfig.java | 13 +- .../KinesisIndexTaskTuningConfigTest.java | 1 + .../supervisor/KinesisSupervisorTest.java | 3 + .../seekablestream/common/RecordSupplier.java | 4 +- .../supervisor/SeekableStreamSupervisor.java | 19 ++- .../testing/utils/KinesisAdminClient.java | 2 +- 8 files changed, 159 insertions(+), 122 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index 8cd868fc5caf..182475377419 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -72,8 +72,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; @@ -415,14 +413,6 @@ private long getPartitionTimeLag() private volatile boolean closed = false; private AtomicBoolean partitionsFetchStarted = new AtomicBoolean(); - /** - * Determining if a shard has data requires a kinesis call for records, and is expensive - * Cache these values for each stream at a RecordSupplier level to avoid redundant calls - * Only streams which are part of at least one assigned PartitionResource are considered - */ - private final Map> emptyClosedShardsMap = new TreeMap<>(); - private final Map> nonEmptyClosedShardsMap = new TreeMap<>(); - public KinesisRecordSupplier( AmazonKinesis amazonKinesis, int recordsPerFetch, @@ -588,26 +578,6 @@ public void assign(Set> collection) entry.getValue().stopBackgroundFetch(); } } - - // Handle shard relevance cache at stream level - Set prevStreams = ImmutableSet.copyOf(emptyClosedShardsMap.keySet()); - Set currentStreams = collection.stream() - .map(StreamPartition::getStream) - .collect(Collectors.toSet()); - // clear cache for unassigned streams - for (String prevStream : prevStreams) { - if (!currentStreams.contains(prevStream)) { - emptyClosedShardsMap.remove(prevStream); - nonEmptyClosedShardsMap.remove(prevStream); - } - } - // handle newly assigned streams - for (String currentStream : currentStreams) { - if (!prevStreams.contains(currentStream)) { - emptyClosedShardsMap.put(currentStream, new TreeSet<>()); - nonEmptyClosedShardsMap.put(currentStream, new TreeSet<>()); - } - } } @Override @@ -696,45 +666,31 @@ public String getEarliestSequenceNumber(StreamPartition partition) * This makes the method resilient to LimitExceeded exceptions (compared to 100 shards, 10 TPS of describeStream) * * @param stream name of stream - * @return Immutable set of ids of shards which are relevant for ingestion + * @return Immutable set of shards */ + public Set getShards(String stream) + { + ImmutableSet.Builder shards = ImmutableSet.builder(); + ListShardsRequest request = new ListShardsRequest().withStreamName(stream); + while (true) { + ListShardsResult result = kinesis.listShards(request); + shards.addAll(result.getShards()); + String nextToken = result.getNextToken(); + if (nextToken == null) { + return shards.build(); + } + request = new ListShardsRequest().withNextToken(nextToken); + } + } + @Override public Set getPartitionIds(String stream) { return wrapExceptions(() -> { - final Set currentlyClosedShardIds = new TreeSet<>(); - ImmutableSet.Builder relevantShardIds = ImmutableSet.builder(); - ListShardsRequest request = new ListShardsRequest().withStreamName(stream); - - final boolean useCache = nonEmptyClosedShardsMap.containsKey(stream); - final Set emptyClosedShards = emptyClosedShardsMap.get(stream); - final Set nonEmptyClosedShards = nonEmptyClosedShardsMap.get(stream); - - while (true) { - ListShardsResult result = kinesis.listShards(request); - for (Shard shard : result.getShards()) { - String shardId = shard.getShardId(); - // Open shards are relevant - if (isShardOpen(shard)) { - relevantShardIds.add(shardId); - } else { - currentlyClosedShardIds.add(shardId); - if (isClosedShardNonEmpty(stream, shardId, useCache, emptyClosedShards, nonEmptyClosedShards)) { - relevantShardIds.add(shardId); - } - } - } - String nextToken = result.getNextToken(); - if (nextToken == null) { - // clean up expired shards - if (useCache) { - emptyClosedShards.retainAll(currentlyClosedShardIds); - nonEmptyClosedShards.retainAll(currentlyClosedShardIds); - } - return relevantShardIds.build(); - } - request = new ListShardsRequest().withNextToken(nextToken); - } + return ImmutableSet.copyOf(getShards(stream).stream() + .map(shard -> shard.getShardId()) + .collect(Collectors.toList()) + ); }); } @@ -796,6 +752,25 @@ public boolean isAnyFetchActive() .anyMatch(fetch -> (fetch != null && !fetch.isDone())); } + /** + * Is costly and requires polling the shard to determine if it's empty + * @param stream to which shard belongs + * @param shardId of the shard + * @return if the shard is empty + */ + public boolean isShardEmpty(String stream, String shardId) + { + String shardIterator = kinesis.getShardIterator(stream, + shardId, + ShardIteratorType.TRIM_HORIZON.toString()) + .getShardIterator(); + GetRecordsRequest request = new GetRecordsRequest().withShardIterator(shardIterator) + .withLimit(1); + GetRecordsResult shardData = kinesis.getRecords(request); + + return shardData.getRecords().isEmpty() && shardData.getNextShardIterator() == null; + } + /** * Check that a {@link PartitionResource} has been assigned to this record supplier, and if so call * {@link PartitionResource#seek} to move it to the latest offsets. Note that this method does not restart background @@ -991,56 +966,4 @@ private void filterBufferAndResetBackgroundFetch(Set> pa // restart fetching threads partitionResources.values().forEach(x -> x.stopBackgroundFetch()); } - - /** - * If a closed shard has no data when polled for the first time, it can be ignored for ingestion - * If it does have data, its offsets are periodically published to metadata by ingestion tasks - * - * @param stream Name of the stream - * @param shardId id of a closed string belonging to the stream - * @param useCache indicates if this record supplier maintains shard cache for this stream - * @param emptyClosedShards cached set of closed and empty shards in stream - * @param nonEmptyClosedShards cached set of closed and non-empty shards in stream - * @return relevance of shard for ingestion - */ - private boolean isClosedShardNonEmpty(String stream, String shardId, boolean useCache, - Set emptyClosedShards, Set nonEmptyClosedShards) - { - if (useCache) { - if (emptyClosedShards.contains(shardId)) { - return false; - } - if (nonEmptyClosedShards.contains(shardId)) { - return true; - } - } - - boolean emptyAndClosed = isShardEmpty(stream, shardId); - - if (useCache) { - if (emptyAndClosed) { - emptyClosedShards.add(shardId); - } else { - nonEmptyClosedShards.add(shardId); - } - } - return !emptyAndClosed; - } - - private boolean isShardOpen(Shard shard) - { - return shard.getSequenceNumberRange().getEndingSequenceNumber() == null; - } - - private boolean isShardEmpty(String stream, String shardId) { - String shardIterator = kinesis.getShardIterator(stream, - shardId, - ShardIteratorType.TRIM_HORIZON.toString()) - .getShardIterator(); - GetRecordsRequest request = new GetRecordsRequest().withShardIterator(shardIterator) - .withLimit(1); - GetRecordsResult shardData = kinesis.getRecords(request); - - return shardData.getRecords().isEmpty() && shardData.getNextShardIterator() == null; - } } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index e33f458a8337..086d8b2acea8 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -19,10 +19,12 @@ package org.apache.druid.indexing.kinesis.supervisor; +import com.amazonaws.services.kinesis.model.Shard; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.data.input.impl.ByteEntity; @@ -64,6 +66,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import java.util.stream.Collectors; /** @@ -88,6 +91,11 @@ public class KinesisSupervisor extends SeekableStreamSupervisor currentPartitionTimeLag; + // Maintain sets of currently closed shards to find "bad" (closed and empty) shards + // Poll closed shards once and store the result to avoid redundant costly calls to kinesis + private final Set emptyClosedShardIds = new TreeSet<>(); + private final Set nonEmptyClosedShardIds = new TreeSet<>(); + public KinesisSupervisor( final TaskStorage taskStorage, final TaskMaster taskMaster, @@ -416,6 +424,49 @@ protected boolean supportsPartitionExpiration() return true; } + @Override + protected boolean shouldSkipIgnorablePartitions() + { + return spec.getSpec().getTuningConfig().shouldSkipIgnorableShards(); + } + + /** + * Closed and empty shards can be ignored for ingestion, + * Use this method if skipIgnorablePartitions is true in the spec + * + * These partitions can be safely ignored for both ingesetion task assignment and autoscaler limits + * + * @return the set of ignorable shards' ids + */ + @Override + protected Set getIgnorablePartitionIds() + { + updateClosedShardCache(); + return ImmutableSet.copyOf(emptyClosedShardIds); + } + + private void updateClosedShardCache() + { + String stream = spec.getSource(); + Set allActiveShards = ((KinesisRecordSupplier) recordSupplier).getShards(stream); + Set activeClosedShards = allActiveShards.stream() + .filter(shard -> isShardOpen(shard)) + .map(Shard::getShardId).collect(Collectors.toSet()); + + // clear stale shards + emptyClosedShardIds.retainAll(activeClosedShards); + nonEmptyClosedShardIds.retainAll(activeClosedShards); + + // add newly closed shards to cache + for (String closedShardId : activeClosedShards) { + if (isClosedShardEmpty(stream, closedShardId)) { + emptyClosedShardIds.add(closedShardId); + } else { + nonEmptyClosedShardIds.add(closedShardId); + } + } + } + @Override protected SeekableStreamDataSourceMetadata createDataSourceMetadataWithExpiredPartitions( SeekableStreamDataSourceMetadata currentMetadata, Set expiredPartitionIds @@ -481,4 +532,37 @@ private SeekableStreamDataSourceMetadata createDataSourceMetadat return new KinesisDataSourceMetadata(newSequences); } + + /** + * Open shards iff they don't have an ending sequence number + * + * @param shard to be checked + * @return if shard is open + */ + private boolean isShardOpen(Shard shard) + { + return shard.getSequenceNumberRange().getEndingSequenceNumber() == null; + } + + /** + * Checking if a shard is empty requires polling for records which is quite expensive + * Fortunately, the results can be cached for closed shards as no more records can be written to them + * + * @param stream to which the shard belongs + * @param shardId of the shard + * @return if the shard is empty + */ + private boolean isClosedShardEmpty(String stream, String shardId) + { + // utilize cache + if (emptyClosedShardIds.contains(shardId)) { + return true; + } + if (nonEmptyClosedShardIds.contains(shardId)) { + return false; + } + + // Make an expensive call to kinesis + return ((KinesisRecordSupplier) recordSupplier).isShardEmpty(stream, shardId); + } } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java index 6fba06d611b5..d583bc77a490 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java @@ -41,6 +41,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig private final Duration shutdownTimeout; private final Duration repartitionTransitionDuration; private final Duration offsetFetchPeriod; + private final boolean skipIgnorableShards; public static KinesisSupervisorTuningConfig defaultConfig() { @@ -77,6 +78,7 @@ public static KinesisSupervisorTuningConfig defaultConfig() null, null, null, + null, null ); } @@ -114,7 +116,8 @@ public KinesisSupervisorTuningConfig( @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll, @JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod, @JsonProperty("repartitionTransitionDuration") Period repartitionTransitionDuration, - @JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod + @JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod, + @JsonProperty("skipIgnorableShards") Boolean skipIgnorableShards ) { super( @@ -162,6 +165,7 @@ public KinesisSupervisorTuningConfig( offsetFetchPeriod, DEFAULT_OFFSET_FETCH_PERIOD ); + this.skipIgnorableShards = (skipIgnorableShards != null ? skipIgnorableShards : false); } @Override @@ -212,6 +216,12 @@ public Duration getOffsetFetchPeriod() return offsetFetchPeriod; } + @JsonProperty + public boolean shouldSkipIgnorableShards() + { + return skipIgnorableShards; + } + @Override public String toString() { @@ -246,6 +256,7 @@ public String toString() ", maxRecordsPerPoll=" + getMaxRecordsPerPoll() + ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + ", repartitionTransitionDuration=" + getRepartitionTransitionDuration() + + ", skipIgnorableShards=" + shouldSkipIgnorableShards() + '}'; } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java index 3bba03e9136c..08a742efa6c3 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java @@ -317,6 +317,7 @@ public void testConvert() null, null, null, + null, null ); KinesisIndexTaskTuningConfig copy = (KinesisIndexTaskTuningConfig) original.convertToTaskTuningConfig(); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index d1ba49b366c6..924f76fcce92 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -204,6 +204,7 @@ public void setupTest() null, null, null, + null, null ); rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); @@ -3941,6 +3942,7 @@ public void testIsTaskCurrent() 42, // This property is different from tuningConfig null, null, + null, null ); @@ -4995,6 +4997,7 @@ public KinesisIndexTaskClient build( null, null, null, + null, null ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java index fbc63e835fb8..7487892b4936 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java @@ -116,9 +116,7 @@ public interface RecordSupplier partition); /** - * returns the set of relevant partitions under the given stream - * For kafka, this returns all the partitions - * For kinesis, only shards which are either currently OPEN or have at least one record are relevant + * returns the set of all available partitions under the given stream * * @param stream name of stream * diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 8a1c898072b8..630df58bfef6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -2291,9 +2291,23 @@ protected boolean supportsPartitionExpiration() return false; } + protected boolean shouldSkipIgnorablePartitions() + { + return false; + } + + protected Set getIgnorablePartitionIds() + { + return ImmutableSet.of(); + } + public int getPartitionCount() { - return recordSupplier.getPartitionIds(ioConfig.getStream()).size(); + int partitionCount = recordSupplier.getPartitionIds(ioConfig.getStream()).size(); + if (shouldSkipIgnorablePartitions()) { + partitionCount -= getIgnorablePartitionIds().size(); + } + return partitionCount; } private boolean updatePartitionDataFromStream() @@ -2303,6 +2317,9 @@ private boolean updatePartitionDataFromStream() recordSupplierLock.lock(); try { partitionIdsFromSupplier = recordSupplier.getPartitionIds(ioConfig.getStream()); + if (shouldSkipIgnorablePartitions()) { + partitionIdsFromSupplier.removeAll(getIgnorablePartitionIds()); + } } catch (Exception e) { stateManager.recordThrowableEvent(e); diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java index 53e328477a08..91f5a2e94cc8 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java @@ -174,7 +174,7 @@ private Set listShards(String streamName) if (nextToken == null) { return shards.build(); } - listShardsRequest = new ListShardsRequest().withNextToken(listShardsResult.getNextToken()); + listShardsRequest = new ListShardsRequest().withNextToken(nextToken); } } From 7c708dd668d6a1327ef4a7ebf3d0faa34c8be1c4 Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 9 Feb 2022 09:04:17 +0530 Subject: [PATCH 04/14] Add KinesisRecordSupplier test --- .../kinesis/KinesisRecordSupplier.java | 6 +- .../kinesis/supervisor/KinesisSupervisor.java | 14 ++--- .../kinesis/KinesisRecordSupplierTest.java | 58 +++++++++++++++++++ 3 files changed, 68 insertions(+), 10 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index 182475377419..9c88a2af5778 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -755,10 +755,10 @@ public boolean isAnyFetchActive() /** * Is costly and requires polling the shard to determine if it's empty * @param stream to which shard belongs - * @param shardId of the shard - * @return if the shard is empty + * @param shardId of the closed shard + * @return if the closed shard is empty */ - public boolean isShardEmpty(String stream, String shardId) + public boolean isClosedShardEmpty(String stream, String shardId) { String shardIterator = kinesis.getShardIterator(stream, shardId, diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index 086d8b2acea8..ec72ce9f02df 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -434,7 +434,7 @@ protected boolean shouldSkipIgnorablePartitions() * Closed and empty shards can be ignored for ingestion, * Use this method if skipIgnorablePartitions is true in the spec * - * These partitions can be safely ignored for both ingesetion task assignment and autoscaler limits + * These partitions can be safely ignored for both ingestion task assignment and autoscaler limits * * @return the set of ignorable shards' ids */ @@ -450,7 +450,7 @@ private void updateClosedShardCache() String stream = spec.getSource(); Set allActiveShards = ((KinesisRecordSupplier) recordSupplier).getShards(stream); Set activeClosedShards = allActiveShards.stream() - .filter(shard -> isShardOpen(shard)) + .filter(shard -> isShardClosed(shard)) .map(Shard::getShardId).collect(Collectors.toSet()); // clear stale shards @@ -534,14 +534,14 @@ private SeekableStreamDataSourceMetadata createDataSourceMetadat } /** - * Open shards iff they don't have an ending sequence number + * Closed shards iff they have an ending sequence number * * @param shard to be checked - * @return if shard is open + * @return if shard is closed */ - private boolean isShardOpen(Shard shard) + private boolean isShardClosed(Shard shard) { - return shard.getSequenceNumberRange().getEndingSequenceNumber() == null; + return shard.getSequenceNumberRange().getEndingSequenceNumber() != null; } /** @@ -563,6 +563,6 @@ private boolean isClosedShardEmpty(String stream, String shardId) } // Make an expensive call to kinesis - return ((KinesisRecordSupplier) recordSupplier).isShardEmpty(stream, shardId); + return ((KinesisRecordSupplier) recordSupplier).isClosedShardEmpty(stream, shardId); } } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java index dda0e2079b31..aaf337b3bf64 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java @@ -50,6 +50,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -1023,4 +1024,61 @@ public void getPartitionTimeLag() throws InterruptedException } verifyAll(); } + + @Test + public void isClosedShardEmpty() + { + AmazonKinesis mockKinesis = EasyMock.mock(AmazonKinesis.class); + KinesisRecordSupplier target = new KinesisRecordSupplier(mockKinesis, + recordsPerFetch, + 0, + 2, + false, + 100, + 5000, + 5000, + 60000, + 5, + true + ); + Record record = new Record(); + String shardId; + + // No records and null iterator -> empty + shardId = "0"; + isClosedShardEmptyHelper(mockKinesis, shardId, new ArrayList<>(), null); + Assert.assertTrue(target.isClosedShardEmpty(STREAM, shardId)); + + // no records and non-null iterator -> non-empty + shardId = "1"; + isClosedShardEmptyHelper(mockKinesis, shardId, new ArrayList<>(), "nextIterator"); + Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId)); + + // non-empty records and null iterator -> non-empty + shardId = "2"; + isClosedShardEmptyHelper(mockKinesis, shardId, Collections.singletonList(record), null); + Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId)); + + // non-empty records and non-null iterator -> non-empty + shardId = "3"; + isClosedShardEmptyHelper(mockKinesis, shardId, Collections.singletonList(record), "nextIterator"); + Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId)); + } + + private void isClosedShardEmptyHelper(AmazonKinesis kinesis, String shardId, + List expectedRecords, String expectedNextIterator) + { + EasyMock.reset(kinesis); + String shardIteratorType = ShardIteratorType.TRIM_HORIZON.toString(); + String shardIterator = "shardIterator" + shardId; + GetShardIteratorResult shardIteratorResult = EasyMock.mock(GetShardIteratorResult.class); + EasyMock.expect(kinesis.getShardIterator(STREAM, shardId, shardIteratorType)).andReturn(shardIteratorResult).once(); + EasyMock.expect(shardIteratorResult.getShardIterator()).andReturn(shardIterator).once(); + GetRecordsRequest request = new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1); + GetRecordsResult result = EasyMock.mock(GetRecordsResult.class); + EasyMock.expect(kinesis.getRecords(request)).andReturn(result).once(); + EasyMock.expect(result.getNextShardIterator()).andReturn(expectedNextIterator).once(); + EasyMock.expect(result.getRecords()).andReturn(expectedRecords).once(); + EasyMock.replay(shardIteratorResult, result, kinesis); + } } From 53111261a0e5a834aaef996242d10699e3e146ed Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 9 Feb 2022 13:22:49 +0530 Subject: [PATCH 05/14] Add tests --- .../kinesis/supervisor/KinesisSupervisor.java | 18 +- .../kinesis/KinesisRecordSupplierTest.java | 2 +- .../supervisor/KinesisSupervisorTest.java | 340 +++++++++++++----- 3 files changed, 265 insertions(+), 95 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index ec72ce9f02df..c27a57d73a4a 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -442,7 +442,7 @@ protected boolean shouldSkipIgnorablePartitions() protected Set getIgnorablePartitionIds() { updateClosedShardCache(); - return ImmutableSet.copyOf(emptyClosedShardIds); + return getEmptyClosedShardIds(); } private void updateClosedShardCache() @@ -565,4 +565,20 @@ private boolean isClosedShardEmpty(String stream, String shardId) // Make an expensive call to kinesis return ((KinesisRecordSupplier) recordSupplier).isClosedShardEmpty(stream, shardId); } + + /** + * @return immutable copy of cache for empty, closed shards + */ + Set getEmptyClosedShardIds() + { + return ImmutableSet.copyOf(emptyClosedShardIds); + } + + /** + * @return immutable copy of cache for non-empty, closed shards + */ + Set getNonEmptyClosedShardIds() + { + return ImmutableSet.copyOf(nonEmptyClosedShardIds); + } } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java index aaf337b3bf64..518c874c7f82 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java @@ -1026,7 +1026,7 @@ public void getPartitionTimeLag() throws InterruptedException } @Test - public void isClosedShardEmpty() + public void testIsClosedShardEmpty() { AmazonKinesis mockKinesis = EasyMock.mock(AmazonKinesis.class); KinesisRecordSupplier target = new KinesisRecordSupplier(mockKinesis, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 924f76fcce92..cc70cbf21ce5 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -19,6 +19,8 @@ package org.apache.druid.indexing.kinesis.supervisor; +import com.amazonaws.services.kinesis.model.SequenceNumberRange; +import com.amazonaws.services.kinesis.model.Shard; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; @@ -106,6 +108,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -307,17 +310,17 @@ public void testNoInitialStateWithAutoScaleOut() throws Exception AutoScalerConfig autoScalerConfig = OBJECT_MAPPER.convertValue(autoScalerConfigMap, AutoScalerConfig.class); supervisor = getTestableSupervisor( - 1, - 1, - true, - "PT1H", - null, - null, - false, - null, - null, - autoScalerConfig - ); + 1, + 1, + true, + "PT1H", + null, + null, + false, + null, + null, + autoScalerConfig + ); KinesisSupervisorSpec kinesisSupervisorSpec = supervisor.getKinesisSupervisorSpec(); SupervisorTaskAutoScaler autoscaler = kinesisSupervisorSpec.createAutoscaler(supervisor); @@ -340,9 +343,9 @@ public void testNoInitialStateWithAutoScaleOut() throws Exception EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( - new KinesisDataSourceMetadata( - null - ) + new KinesisDataSourceMetadata( + null + ) ).anyTimes(); EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); @@ -381,16 +384,16 @@ public void testNoInitialStateWithAutoScaleIn() throws Exception AutoScalerConfig autoScalerConfig = OBJECT_MAPPER.convertValue(autoScalerConfigMap, AutoScalerConfig.class); supervisor = getTestableSupervisor( - 1, - 2, - true, - "PT1H", - null, - null, - false, - null, - null, - autoScalerConfig + 1, + 2, + true, + "PT1H", + null, + null, + false, + null, + null, + autoScalerConfig ); KinesisSupervisorSpec kinesisSupervisorSpec = supervisor.getKinesisSupervisorSpec(); @@ -415,9 +418,9 @@ public void testNoInitialStateWithAutoScaleIn() throws Exception EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock - .expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)) - .andReturn(new KinesisDataSourceMetadata(null)) - .anyTimes(); + .expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)) + .andReturn(new KinesisDataSourceMetadata(null)) + .anyTimes(); EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2); replayAll(); @@ -506,26 +509,26 @@ public void testKinesisIOConfigInitAndAutoscalerConfigCreation() { // create KinesisSupervisorIOConfig with autoScalerConfig null KinesisSupervisorIOConfig kinesisSupervisorIOConfigWithNullAutoScalerConfig = new KinesisSupervisorIOConfig( - STREAM, - INPUT_FORMAT, - "awsEndpoint", - null, - 1, - 1, - new Period("PT30M"), - new Period("P1D"), - new Period("PT30S"), - false, - new Period("PT30M"), - null, - null, - null, - 100, - 1000, - null, - null, - null, - false + STREAM, + INPUT_FORMAT, + "awsEndpoint", + null, + 1, + 1, + new Period("PT30M"), + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, + null, + 100, + 1000, + null, + null, + null, + false ); AutoScalerConfig autoscalerConfigNull = kinesisSupervisorIOConfigWithNullAutoScalerConfig.getAutoscalerConfig(); @@ -533,26 +536,26 @@ public void testKinesisIOConfigInitAndAutoscalerConfigCreation() // create KinesisSupervisorIOConfig with autoScalerConfig Empty KinesisSupervisorIOConfig kinesisSupervisorIOConfigWithEmptyAutoScalerConfig = new KinesisSupervisorIOConfig( - STREAM, - INPUT_FORMAT, - "awsEndpoint", - null, - 1, - 1, - new Period("PT30M"), - new Period("P1D"), - new Period("PT30S"), - false, - new Period("PT30M"), - null, - null, - null, - 100, - 1000, - null, - null, - OBJECT_MAPPER.convertValue(new HashMap<>(), AutoScalerConfig.class), - false + STREAM, + INPUT_FORMAT, + "awsEndpoint", + null, + 1, + 1, + new Period("PT30M"), + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, + null, + 100, + 1000, + null, + null, + OBJECT_MAPPER.convertValue(new HashMap<>(), AutoScalerConfig.class), + false ); AutoScalerConfig autoscalerConfig = kinesisSupervisorIOConfigWithEmptyAutoScalerConfig.getAutoscalerConfig(); @@ -1295,7 +1298,9 @@ public void testRequeueAdoptedTaskWhenFailed() throws Exception .times(1); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(captured.getValue())).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) + .andReturn(ImmutableList.of(captured.getValue())) + .anyTimes(); EasyMock.expect(taskStorage.getStatus(iHaveFailed.getId())) .andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId(), "Dummy task status failure err message"))); EasyMock.expect(taskStorage.getStatus(runningTaskId)) @@ -1973,7 +1978,9 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) + .andReturn(ImmutableList.of(id1, id2)) + .anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); @@ -2515,7 +2522,9 @@ public void testStopGracefully() throws Exception EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) + .andReturn(ImmutableList.of(id1, id2, id3)) + .anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -2574,11 +2583,11 @@ public void testStopGracefully() throws Exception "1" ))); EasyMock.expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of( - SHARD_ID1, - "12", - SHARD_ID0, - "1" - ), true)) + SHARD_ID1, + "12", + SHARD_ID0, + "1" + ), true)) .andReturn(Futures.immediateFuture(true)); taskQueue.shutdown("id3", "Killing task for graceful shutdown"); EasyMock.expectLastCall().times(1); @@ -2750,7 +2759,9 @@ public void testGetOffsetFromStorageForPartitionWithResetOffsetAutomatically() t .anyTimes(); supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("300").anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())) + .andReturn("300") + .anyTimes(); EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("400").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); @@ -2918,7 +2929,9 @@ public void testResetRunningTasks() throws Exception EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) + .andReturn(ImmutableList.of(id1, id2, id3)) + .anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -3073,7 +3086,9 @@ public void testNoDataIngestionTasks() throws Exception EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) + .andReturn(ImmutableList.of(id1, id2, id3)) + .anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -3229,7 +3244,9 @@ public void testCheckpointForInactiveTaskGroup() throws InterruptedException EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) + .andReturn(ImmutableList.of(id1, id2, id3)) + .anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -3237,9 +3254,11 @@ public void testCheckpointForInactiveTaskGroup() throws InterruptedException EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); EasyMock.expect( - indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new KinesisDataSourceMetadata( - null) - ).anyTimes(); + indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)) + .andReturn(new KinesisDataSourceMetadata( + null) + ) + .anyTimes(); EasyMock.expect(taskClient.getStatusAsync("id1")) .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)); EasyMock.expect(taskClient.getStatusAsync("id2")) @@ -3381,7 +3400,9 @@ public void testCheckpointForUnknownTaskGroup() EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) + .andReturn(ImmutableList.of(id1, id2, id3)) + .anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -3389,9 +3410,11 @@ public void testCheckpointForUnknownTaskGroup() EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); EasyMock.expect( - indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new KinesisDataSourceMetadata( - null) - ).anyTimes(); + indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)) + .andReturn(new KinesisDataSourceMetadata( + null) + ) + .anyTimes(); replayAll(); @@ -3552,7 +3575,9 @@ public void testSuspendedRunningTasks() throws Exception EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) + .andReturn(ImmutableList.of(id1, id2, id3)) + .anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -3604,11 +3629,11 @@ public void testSuspendedRunningTasks() throws Exception "1" ))); EasyMock.expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of( - SHARD_ID1, - "12", - SHARD_ID0, - "1" - ), true)) + SHARD_ID1, + "12", + SHARD_ID0, + "1" + ), true)) .andReturn(Futures.immediateFuture(true)); taskQueue.shutdown("id3", "Killing task for graceful shutdown"); EasyMock.expectLastCall().times(1); @@ -4887,6 +4912,135 @@ private void testShardMergePhaseThree(List phaseTwoTasks) throws Exception Assert.assertEquals(expectedPartitionOffsets, supervisor.getPartitionOffsets()); } + @Test + public void testUpdateClosedShardCache() + { + supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null); + supervisor.setupRecordSupplier(); + supervisor.tryInit(); + String stream = supervisor.getKinesisSupervisorSpec().getSource(); + Shard openShard = EasyMock.mock(Shard.class); + Shard emptyClosedShard = EasyMock.mock(Shard.class); + Shard nonEmptyClosedShard = EasyMock.mock(Shard.class); + Set activeShards; + Set emptyClosedShardIds; + Set nonEmptyClosedShardIds; + + // ITERATION 0: + // active shards: an open shard, closed-empty shard and closed-nonEmpty shard + activeShards = getActiveShards(openShard, true, + emptyClosedShard, true, + nonEmptyClosedShard, true); + + EasyMock.reset(supervisorRecordSupplier); + EasyMock.expect(supervisorRecordSupplier.getShards(stream)).andReturn(activeShards).once(); + // The following two calls DO happen since the shards are processed for the first time after being closed + EasyMock.expect(supervisorRecordSupplier.isClosedShardEmpty(stream, emptyClosedShard.getShardId())).andReturn(true).once(); + EasyMock.expect(supervisorRecordSupplier.isClosedShardEmpty(stream, nonEmptyClosedShard.getShardId())).andReturn(false).once(); + EasyMock.replay(supervisorRecordSupplier); + + supervisor.getIgnorablePartitionIds(); + emptyClosedShardIds = supervisor.getEmptyClosedShardIds(); + Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), emptyClosedShardIds); + nonEmptyClosedShardIds = supervisor.getNonEmptyClosedShardIds(); + Assert.assertEquals(Collections.singleton(nonEmptyClosedShard.getShardId()), nonEmptyClosedShardIds); + + + // ITERATION 1: + // active shards: an open shard, closed-empty shard and closed-nonEmpty shard + activeShards = getActiveShards(openShard, true, + emptyClosedShard, true, + nonEmptyClosedShard, true); + + EasyMock.reset(supervisorRecordSupplier); + EasyMock.expect(supervisorRecordSupplier.getShards(stream)).andReturn(activeShards).once(); + // calls to KinesisRecordSupplier#isClosedShardEmpty DO NOT happen + EasyMock.replay(supervisorRecordSupplier); + + supervisor.getIgnorablePartitionIds(); + emptyClosedShardIds = supervisor.getEmptyClosedShardIds(); + Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), emptyClosedShardIds); + nonEmptyClosedShardIds = supervisor.getNonEmptyClosedShardIds(); + Assert.assertEquals(Collections.singleton(nonEmptyClosedShard.getShardId()), nonEmptyClosedShardIds); + + + // ITERATION 2: + // active shards: an open shard, closed-empty shard. closed-nonEmpty shard has expired + activeShards = getActiveShards(openShard, true, + emptyClosedShard, true, + nonEmptyClosedShard, false); + + EasyMock.reset(supervisorRecordSupplier); + EasyMock.expect(supervisorRecordSupplier.getShards(stream)).andReturn(activeShards).once(); + // calls to KinesisRecordSupplier#isClosedShardEmpty DO NOT happen + EasyMock.replay(supervisorRecordSupplier); + + supervisor.getIgnorablePartitionIds(); + emptyClosedShardIds = supervisor.getEmptyClosedShardIds(); + Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), emptyClosedShardIds); + // Expired shard is cleared from cache + nonEmptyClosedShardIds = supervisor.getNonEmptyClosedShardIds(); + Assert.assertEquals(Collections.emptySet(), nonEmptyClosedShardIds); + + // ITERATION 3: + // active shards: an open shard, closed-empty shard. closed-nonEmpty shard has expired + activeShards = getActiveShards(openShard, true, + emptyClosedShard, false, + nonEmptyClosedShard, false); + + EasyMock.reset(supervisorRecordSupplier); + EasyMock.expect(supervisorRecordSupplier.getShards(stream)).andReturn(activeShards).once(); + // calls to KinesisRecordSupplier#isClosedShardEmpty DO NOT happen + EasyMock.replay(supervisorRecordSupplier); + + supervisor.getIgnorablePartitionIds(); + // Expired shard is cleared from cache + emptyClosedShardIds = supervisor.getEmptyClosedShardIds(); + Assert.assertEquals(Collections.emptySet(), emptyClosedShardIds); + // Expired shard is cleared from cache + nonEmptyClosedShardIds = supervisor.getNonEmptyClosedShardIds(); + Assert.assertEquals(Collections.emptySet(), nonEmptyClosedShardIds); + } + + private Set getActiveShards(Shard openShard, boolean isOpenShardActive, + Shard emptyClosedShard, boolean isEmptyClosedShardActive, + Shard nonEmptyClosedShard, boolean isNonEmptyClosedShardActive) + { + ImmutableSet.Builder activeShards = ImmutableSet.builder(); + + if (isOpenShardActive) { + EasyMock.reset(openShard); + EasyMock.expect(openShard.getShardId()).andReturn("openShard"); + SequenceNumberRange openShardRange = EasyMock.mock(SequenceNumberRange.class); + EasyMock.expect(openShardRange.getEndingSequenceNumber()).andReturn(null); + EasyMock.expect(openShard.getSequenceNumberRange()).andReturn(openShardRange); + EasyMock.replay(openShard, openShardRange); + activeShards.add(openShard); + } + + if (isEmptyClosedShardActive) { + EasyMock.reset(emptyClosedShard); + EasyMock.expect(emptyClosedShard.getShardId()).andReturn("emptyClosedShard").times(3); + SequenceNumberRange emptyClosedShardRange = EasyMock.mock(SequenceNumberRange.class); + EasyMock.expect(emptyClosedShardRange.getEndingSequenceNumber()).andReturn("notNull"); + EasyMock.expect(emptyClosedShard.getSequenceNumberRange()).andReturn(emptyClosedShardRange); + EasyMock.replay(emptyClosedShard, emptyClosedShardRange); + activeShards.add(emptyClosedShard); + } + + if (isNonEmptyClosedShardActive) { + EasyMock.reset(nonEmptyClosedShard); + EasyMock.expect(nonEmptyClosedShard.getShardId()).andReturn("nonEmptyClosedShard").times(3); + SequenceNumberRange nonEmptyClosedShardRange = EasyMock.mock(SequenceNumberRange.class); + EasyMock.expect(nonEmptyClosedShardRange.getEndingSequenceNumber()).andReturn("notNull"); + EasyMock.expect(nonEmptyClosedShard.getSequenceNumberRange()).andReturn(nonEmptyClosedShardRange); + EasyMock.replay(nonEmptyClosedShard, nonEmptyClosedShardRange); + activeShards.add(nonEmptyClosedShard); + } + + return activeShards.build(); + } + private TestableKinesisSupervisor getTestableSupervisor( int replicas, int taskCount, @@ -5084,7 +5238,7 @@ private TestableKinesisSupervisor getTestableSupervisor( fetchDelayMillis, null, null, - autoScalerConfig, + autoScalerConfig, false ); From fde98736bb7404627c8999c55e7fe77573e1bcca Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 9 Feb 2022 13:26:54 +0530 Subject: [PATCH 06/14] Remove unnecessary import --- .../druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index cc70cbf21ce5..051463520af8 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -108,7 +108,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; From 2b1f4ad7cc4760b04a484b53582b7f00fc200429 Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 9 Feb 2022 13:28:00 +0530 Subject: [PATCH 07/14] Fix indentation --- .../indexing/kinesis/supervisor/KinesisSupervisorTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 051463520af8..891c6e064cd2 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -5002,8 +5002,8 @@ public void testUpdateClosedShardCache() } private Set getActiveShards(Shard openShard, boolean isOpenShardActive, - Shard emptyClosedShard, boolean isEmptyClosedShardActive, - Shard nonEmptyClosedShard, boolean isNonEmptyClosedShardActive) + Shard emptyClosedShard, boolean isEmptyClosedShardActive, + Shard nonEmptyClosedShard, boolean isNonEmptyClosedShardActive) { ImmutableSet.Builder activeShards = ImmutableSet.builder(); From 82a89a80e355b3584af36c7dd88060a83b6770ed Mon Sep 17 00:00:00 2001 From: Amatya Date: Thu, 10 Feb 2022 12:00:18 +0530 Subject: [PATCH 08/14] Minor refactoring --- .../kinesis/supervisor/KinesisSupervisor.java | 35 +++++++++---------- .../KinesisSupervisorTuningConfig.java | 4 +-- .../supervisor/SeekableStreamSupervisor.java | 7 ++++ 3 files changed, 25 insertions(+), 21 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index c27a57d73a4a..af0c588c5be3 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.common.aws.AWSCredentialsConfig; @@ -427,16 +428,12 @@ protected boolean supportsPartitionExpiration() @Override protected boolean shouldSkipIgnorablePartitions() { - return spec.getSpec().getTuningConfig().shouldSkipIgnorableShards(); + return spec.getSpec().getTuningConfig().isSkipIgnorableShards(); } /** - * Closed and empty shards can be ignored for ingestion, - * Use this method if skipIgnorablePartitions is true in the spec - * - * These partitions can be safely ignored for both ingestion task assignment and autoscaler limits - * - * @return the set of ignorable shards' ids + * A kinesis shard is considered to be an ignorable partition if it is both closed and empty + * @return set of shards ignorable by kinesis ingestion */ @Override protected Set getIgnorablePartitionIds() @@ -451,14 +448,20 @@ private void updateClosedShardCache() Set allActiveShards = ((KinesisRecordSupplier) recordSupplier).getShards(stream); Set activeClosedShards = allActiveShards.stream() .filter(shard -> isShardClosed(shard)) - .map(Shard::getShardId).collect(Collectors.toSet()); + .map(Shard::getShardId) + .collect(Collectors.toSet()); // clear stale shards emptyClosedShardIds.retainAll(activeClosedShards); nonEmptyClosedShardIds.retainAll(activeClosedShards); - // add newly closed shards to cache for (String closedShardId : activeClosedShards) { + // Try to utilize cache + if (emptyClosedShardIds.contains(closedShardId) || nonEmptyClosedShardIds.contains(closedShardId)) { + continue; + } + + // Check using kinesis and add to cache if (isClosedShardEmpty(stream, closedShardId)) { emptyClosedShardIds.add(closedShardId); } else { @@ -534,7 +537,7 @@ private SeekableStreamDataSourceMetadata createDataSourceMetadat } /** - * Closed shards iff they have an ending sequence number + * A shard is considered closed iff it has an ending sequence number. * * @param shard to be checked * @return if shard is closed @@ -547,6 +550,7 @@ private boolean isShardClosed(Shard shard) /** * Checking if a shard is empty requires polling for records which is quite expensive * Fortunately, the results can be cached for closed shards as no more records can be written to them + * Please use this method only if the info is absent from the cache * * @param stream to which the shard belongs * @param shardId of the shard @@ -554,21 +558,13 @@ private boolean isShardClosed(Shard shard) */ private boolean isClosedShardEmpty(String stream, String shardId) { - // utilize cache - if (emptyClosedShardIds.contains(shardId)) { - return true; - } - if (nonEmptyClosedShardIds.contains(shardId)) { - return false; - } - - // Make an expensive call to kinesis return ((KinesisRecordSupplier) recordSupplier).isClosedShardEmpty(stream, shardId); } /** * @return immutable copy of cache for empty, closed shards */ + @VisibleForTesting Set getEmptyClosedShardIds() { return ImmutableSet.copyOf(emptyClosedShardIds); @@ -577,6 +573,7 @@ Set getEmptyClosedShardIds() /** * @return immutable copy of cache for non-empty, closed shards */ + @VisibleForTesting Set getNonEmptyClosedShardIds() { return ImmutableSet.copyOf(nonEmptyClosedShardIds); diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java index d583bc77a490..6bde063d1546 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java @@ -217,7 +217,7 @@ public Duration getOffsetFetchPeriod() } @JsonProperty - public boolean shouldSkipIgnorableShards() + public boolean isSkipIgnorableShards() { return skipIgnorableShards; } @@ -256,7 +256,7 @@ public String toString() ", maxRecordsPerPoll=" + getMaxRecordsPerPoll() + ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + ", repartitionTransitionDuration=" + getRepartitionTransitionDuration() + - ", skipIgnorableShards=" + shouldSkipIgnorableShards() + + ", skipIgnorableShards=" + isSkipIgnorableShards() + '}'; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 630df58bfef6..0d65ee59b03a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -2296,6 +2296,13 @@ protected boolean shouldSkipIgnorablePartitions() return false; } + /** + * Use this method if skipIgnorablePartitions is true in the spec + * + * These partitions can be safely ignored for both ingestion task assignment and autoscaler limits + * + * @return set of ids of ignorable partitions + */ protected Set getIgnorablePartitionIds() { return ImmutableSet.of(); From 2353b974003992f17e9c625c3b03d2430662dd90 Mon Sep 17 00:00:00 2001 From: Amatya Date: Thu, 10 Feb 2022 12:09:14 +0530 Subject: [PATCH 09/14] Revert to previous indentation for unmodified code --- .../supervisor/KinesisSupervisorTest.java | 164 +++++++++--------- 1 file changed, 80 insertions(+), 84 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 891c6e064cd2..fd97d6c3edc2 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -309,17 +309,17 @@ public void testNoInitialStateWithAutoScaleOut() throws Exception AutoScalerConfig autoScalerConfig = OBJECT_MAPPER.convertValue(autoScalerConfigMap, AutoScalerConfig.class); supervisor = getTestableSupervisor( - 1, - 1, - true, - "PT1H", - null, - null, - false, - null, - null, - autoScalerConfig - ); + 1, + 1, + true, + "PT1H", + null, + null, + false, + null, + null, + autoScalerConfig + ); KinesisSupervisorSpec kinesisSupervisorSpec = supervisor.getKinesisSupervisorSpec(); SupervisorTaskAutoScaler autoscaler = kinesisSupervisorSpec.createAutoscaler(supervisor); @@ -383,16 +383,16 @@ public void testNoInitialStateWithAutoScaleIn() throws Exception AutoScalerConfig autoScalerConfig = OBJECT_MAPPER.convertValue(autoScalerConfigMap, AutoScalerConfig.class); supervisor = getTestableSupervisor( - 1, - 2, - true, - "PT1H", - null, - null, - false, - null, - null, - autoScalerConfig + 1, + 2, + true, + "PT1H", + null, + null, + false, + null, + null, + autoScalerConfig ); KinesisSupervisorSpec kinesisSupervisorSpec = supervisor.getKinesisSupervisorSpec(); @@ -417,9 +417,9 @@ public void testNoInitialStateWithAutoScaleIn() throws Exception EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); EasyMock - .expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)) - .andReturn(new KinesisDataSourceMetadata(null)) - .anyTimes(); + .expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)) + .andReturn(new KinesisDataSourceMetadata(null)) + .anyTimes(); EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2); replayAll(); @@ -508,26 +508,26 @@ public void testKinesisIOConfigInitAndAutoscalerConfigCreation() { // create KinesisSupervisorIOConfig with autoScalerConfig null KinesisSupervisorIOConfig kinesisSupervisorIOConfigWithNullAutoScalerConfig = new KinesisSupervisorIOConfig( - STREAM, - INPUT_FORMAT, - "awsEndpoint", - null, - 1, - 1, - new Period("PT30M"), - new Period("P1D"), - new Period("PT30S"), - false, - new Period("PT30M"), - null, - null, - null, - 100, - 1000, - null, - null, - null, - false + STREAM, + INPUT_FORMAT, + "awsEndpoint", + null, + 1, + 1, + new Period("PT30M"), + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, + null, + 100, + 1000, + null, + null, + null, + false ); AutoScalerConfig autoscalerConfigNull = kinesisSupervisorIOConfigWithNullAutoScalerConfig.getAutoscalerConfig(); @@ -535,26 +535,26 @@ public void testKinesisIOConfigInitAndAutoscalerConfigCreation() // create KinesisSupervisorIOConfig with autoScalerConfig Empty KinesisSupervisorIOConfig kinesisSupervisorIOConfigWithEmptyAutoScalerConfig = new KinesisSupervisorIOConfig( - STREAM, - INPUT_FORMAT, - "awsEndpoint", - null, - 1, - 1, - new Period("PT30M"), - new Period("P1D"), - new Period("PT30S"), - false, - new Period("PT30M"), - null, - null, - null, - 100, - 1000, - null, - null, - OBJECT_MAPPER.convertValue(new HashMap<>(), AutoScalerConfig.class), - false + STREAM, + INPUT_FORMAT, + "awsEndpoint", + null, + 1, + 1, + new Period("PT30M"), + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, + null, + 100, + 1000, + null, + null, + OBJECT_MAPPER.convertValue(new HashMap<>(), AutoScalerConfig.class), + false ); AutoScalerConfig autoscalerConfig = kinesisSupervisorIOConfigWithEmptyAutoScalerConfig.getAutoscalerConfig(); @@ -2582,11 +2582,11 @@ public void testStopGracefully() throws Exception "1" ))); EasyMock.expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of( - SHARD_ID1, - "12", - SHARD_ID0, - "1" - ), true)) + SHARD_ID1, + "12", + SHARD_ID0, + "1" + ), true)) .andReturn(Futures.immediateFuture(true)); taskQueue.shutdown("id3", "Killing task for graceful shutdown"); EasyMock.expectLastCall().times(1); @@ -3253,11 +3253,9 @@ public void testCheckpointForInactiveTaskGroup() throws InterruptedException EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); EasyMock.expect( - indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)) - .andReturn(new KinesisDataSourceMetadata( - null) - ) - .anyTimes(); + indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new KinesisDataSourceMetadata( + null) + ).anyTimes(); EasyMock.expect(taskClient.getStatusAsync("id1")) .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)); EasyMock.expect(taskClient.getStatusAsync("id2")) @@ -3409,11 +3407,9 @@ public void testCheckpointForUnknownTaskGroup() EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); EasyMock.expect( - indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)) - .andReturn(new KinesisDataSourceMetadata( - null) - ) - .anyTimes(); + indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new KinesisDataSourceMetadata( + null) + ).anyTimes(); replayAll(); @@ -3628,11 +3624,11 @@ public void testSuspendedRunningTasks() throws Exception "1" ))); EasyMock.expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of( - SHARD_ID1, - "12", - SHARD_ID0, - "1" - ), true)) + SHARD_ID1, + "12", + SHARD_ID0, + "1" + ), true)) .andReturn(Futures.immediateFuture(true)); taskQueue.shutdown("id3", "Killing task for graceful shutdown"); EasyMock.expectLastCall().times(1); From a1c5b1acf1ef93a62584667c48a8db6cbda07a75 Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 16 Feb 2022 16:15:30 +0530 Subject: [PATCH 10/14] Address review comments and avoid reset in new tests --- .../kinesis/KinesisRecordSupplier.java | 13 +- .../kinesis/supervisor/KinesisSupervisor.java | 23 +-- .../kinesis/KinesisRecordSupplierTest.java | 40 ++-- .../supervisor/KinesisSupervisorTest.java | 175 +++++++----------- 4 files changed, 93 insertions(+), 158 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index 9c88a2af5778..c191d8ee1e4b 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -687,10 +687,11 @@ public Set getShards(String stream) public Set getPartitionIds(String stream) { return wrapExceptions(() -> { - return ImmutableSet.copyOf(getShards(stream).stream() - .map(shard -> shard.getShardId()) - .collect(Collectors.toList()) - ); + ImmutableSet.Builder partitionIds = ImmutableSet.builder(); + for (Shard shard : getShards(stream)) { + partitionIds.add(shard.getShardId()); + } + return partitionIds.build(); }); } @@ -753,10 +754,10 @@ public boolean isAnyFetchActive() } /** - * Is costly and requires polling the shard to determine if it's empty + * Fetches records from the specified shard to determine if it is empty. * @param stream to which shard belongs * @param shardId of the closed shard - * @return if the closed shard is empty + * @return true if the closed shard is empty, false otherwise. */ public boolean isClosedShardEmpty(String stream, String shardId) { diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index af0c588c5be3..07a77f5c4889 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.common.aws.AWSCredentialsConfig; @@ -439,7 +438,7 @@ protected boolean shouldSkipIgnorablePartitions() protected Set getIgnorablePartitionIds() { updateClosedShardCache(); - return getEmptyClosedShardIds(); + return ImmutableSet.copyOf(emptyClosedShardIds); } private void updateClosedShardCache() @@ -548,7 +547,7 @@ private boolean isShardClosed(Shard shard) } /** - * Checking if a shard is empty requires polling for records which is quite expensive + * Checking if a shard is empty requires fetching records which is quite expensive * Fortunately, the results can be cached for closed shards as no more records can be written to them * Please use this method only if the info is absent from the cache * @@ -560,22 +559,4 @@ private boolean isClosedShardEmpty(String stream, String shardId) { return ((KinesisRecordSupplier) recordSupplier).isClosedShardEmpty(stream, shardId); } - - /** - * @return immutable copy of cache for empty, closed shards - */ - @VisibleForTesting - Set getEmptyClosedShardIds() - { - return ImmutableSet.copyOf(emptyClosedShardIds); - } - - /** - * @return immutable copy of cache for non-empty, closed shards - */ - @VisibleForTesting - Set getNonEmptyClosedShardIds() - { - return ImmutableSet.copyOf(nonEmptyClosedShardIds); - } } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java index 518c874c7f82..3b31d4781970 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java @@ -1042,33 +1042,33 @@ public void testIsClosedShardEmpty() true ); Record record = new Record(); - String shardId; + // Setup mock expectations // No records and null iterator -> empty - shardId = "0"; - isClosedShardEmptyHelper(mockKinesis, shardId, new ArrayList<>(), null); - Assert.assertTrue(target.isClosedShardEmpty(STREAM, shardId)); - + final String emptyAndClosed = "0"; + setupMockKinesisForShardId(mockKinesis, emptyAndClosed, new ArrayList<>(), null); // no records and non-null iterator -> non-empty - shardId = "1"; - isClosedShardEmptyHelper(mockKinesis, shardId, new ArrayList<>(), "nextIterator"); - Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId)); - + final String emptyAndOpen = "1"; + setupMockKinesisForShardId(mockKinesis, emptyAndOpen, new ArrayList<>(), "nextIterator"); // non-empty records and null iterator -> non-empty - shardId = "2"; - isClosedShardEmptyHelper(mockKinesis, shardId, Collections.singletonList(record), null); - Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId)); - + final String nonEmptyAndClosed = "2"; + setupMockKinesisForShardId(mockKinesis, nonEmptyAndClosed, Collections.singletonList(record), null); // non-empty records and non-null iterator -> non-empty - shardId = "3"; - isClosedShardEmptyHelper(mockKinesis, shardId, Collections.singletonList(record), "nextIterator"); - Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId)); + final String nonEmptyAndOpen = "3"; + setupMockKinesisForShardId(mockKinesis, nonEmptyAndOpen, Collections.singletonList(record), "nextIterator"); + + EasyMock.replay(mockKinesis); + + Assert.assertTrue(target.isClosedShardEmpty(STREAM, emptyAndClosed)); + Assert.assertFalse(target.isClosedShardEmpty(STREAM, emptyAndOpen)); + Assert.assertFalse(target.isClosedShardEmpty(STREAM, nonEmptyAndClosed)); + Assert.assertFalse(target.isClosedShardEmpty(STREAM, nonEmptyAndOpen)); } - private void isClosedShardEmptyHelper(AmazonKinesis kinesis, String shardId, - List expectedRecords, String expectedNextIterator) + // make it less verbose + private void setupMockKinesisForShardId(AmazonKinesis kinesis, String shardId, + List expectedRecords, String expectedNextIterator) { - EasyMock.reset(kinesis); String shardIteratorType = ShardIteratorType.TRIM_HORIZON.toString(); String shardIterator = "shardIterator" + shardId; GetShardIteratorResult shardIteratorResult = EasyMock.mock(GetShardIteratorResult.class); @@ -1079,6 +1079,6 @@ private void isClosedShardEmptyHelper(AmazonKinesis kinesis, String shardId, EasyMock.expect(kinesis.getRecords(request)).andReturn(result).once(); EasyMock.expect(result.getNextShardIterator()).andReturn(expectedNextIterator).once(); EasyMock.expect(result.getRecords()).andReturn(expectedRecords).once(); - EasyMock.replay(shardIteratorResult, result, kinesis); + EasyMock.replay(shardIteratorResult, result); } } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index fd97d6c3edc2..92cc7afebdd3 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -108,12 +108,15 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.stream.Collectors; +import java.util.stream.Stream; public class KinesisSupervisorTest extends EasyMockSupport { @@ -341,11 +344,9 @@ public void testNoInitialStateWithAutoScaleOut() throws Exception EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); - EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( - new KinesisDataSourceMetadata( - null - ) - ).anyTimes(); + EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)) + .andReturn(new KinesisDataSourceMetadata(null)) + .anyTimes(); EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); replayAll(); @@ -4908,132 +4909,84 @@ private void testShardMergePhaseThree(List phaseTwoTasks) throws Exception } @Test - public void testUpdateClosedShardCache() + public void testGetIgnorablePartitionIds() { supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null); supervisor.setupRecordSupplier(); supervisor.tryInit(); String stream = supervisor.getKinesisSupervisorSpec().getSource(); + + // Open shard Shard openShard = EasyMock.mock(Shard.class); + EasyMock.expect(openShard.getShardId()).andReturn("openShard").anyTimes(); + SequenceNumberRange openShardRange = EasyMock.mock(SequenceNumberRange.class); + EasyMock.expect(openShard.getSequenceNumberRange()).andReturn(openShardRange).anyTimes(); + EasyMock.expect(openShardRange.getEndingSequenceNumber()).andReturn(null).anyTimes(); + + // Empty, closed shard Shard emptyClosedShard = EasyMock.mock(Shard.class); + EasyMock.expect(emptyClosedShard.getShardId()).andReturn("emptyClosedShard").anyTimes(); + SequenceNumberRange emptyClosedShardRange = EasyMock.mock(SequenceNumberRange.class); + EasyMock.expect(emptyClosedShard.getSequenceNumberRange()).andReturn(emptyClosedShardRange).anyTimes(); + EasyMock.expect(emptyClosedShardRange.getEndingSequenceNumber()).andReturn("notNull").anyTimes(); + + // Non-empty, closed shard Shard nonEmptyClosedShard = EasyMock.mock(Shard.class); - Set activeShards; - Set emptyClosedShardIds; - Set nonEmptyClosedShardIds; + EasyMock.expect(nonEmptyClosedShard.getShardId()).andReturn("nonEmptyClosedShard").anyTimes(); + SequenceNumberRange nonEmptyClosedShardRange = EasyMock.mock(SequenceNumberRange.class); + EasyMock.expect(nonEmptyClosedShard.getSequenceNumberRange()).andReturn(nonEmptyClosedShardRange).anyTimes(); + EasyMock.expect(nonEmptyClosedShardRange.getEndingSequenceNumber()).andReturn("notNull").anyTimes(); - // ITERATION 0: - // active shards: an open shard, closed-empty shard and closed-nonEmpty shard - activeShards = getActiveShards(openShard, true, - emptyClosedShard, true, - nonEmptyClosedShard, true); + EasyMock.replay(openShardRange, emptyClosedShardRange, nonEmptyClosedShardRange); + EasyMock.replay(openShard, emptyClosedShard, nonEmptyClosedShard); - EasyMock.reset(supervisorRecordSupplier); - EasyMock.expect(supervisorRecordSupplier.getShards(stream)).andReturn(activeShards).once(); + // ITERATION 0: + // active shards: open shard, closed-empty shard and closed-nonEmpty shard + Set activeShards0 = Stream.of(openShard, emptyClosedShard, nonEmptyClosedShard).collect(Collectors.toSet()); + EasyMock.expect(supervisorRecordSupplier.getShards(stream)).andReturn(activeShards0).once(); // The following two calls DO happen since the shards are processed for the first time after being closed - EasyMock.expect(supervisorRecordSupplier.isClosedShardEmpty(stream, emptyClosedShard.getShardId())).andReturn(true).once(); - EasyMock.expect(supervisorRecordSupplier.isClosedShardEmpty(stream, nonEmptyClosedShard.getShardId())).andReturn(false).once(); - EasyMock.replay(supervisorRecordSupplier); - - supervisor.getIgnorablePartitionIds(); - emptyClosedShardIds = supervisor.getEmptyClosedShardIds(); - Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), emptyClosedShardIds); - nonEmptyClosedShardIds = supervisor.getNonEmptyClosedShardIds(); - Assert.assertEquals(Collections.singleton(nonEmptyClosedShard.getShardId()), nonEmptyClosedShardIds); - + EasyMock.expect(supervisorRecordSupplier.isClosedShardEmpty(stream, emptyClosedShard.getShardId())) + .andReturn(true).once(); + EasyMock.expect(supervisorRecordSupplier.isClosedShardEmpty(stream, nonEmptyClosedShard.getShardId())) + .andReturn(false).once(); // ITERATION 1: - // active shards: an open shard, closed-empty shard and closed-nonEmpty shard - activeShards = getActiveShards(openShard, true, - emptyClosedShard, true, - nonEmptyClosedShard, true); - - EasyMock.reset(supervisorRecordSupplier); - EasyMock.expect(supervisorRecordSupplier.getShards(stream)).andReturn(activeShards).once(); - // calls to KinesisRecordSupplier#isClosedShardEmpty DO NOT happen - EasyMock.replay(supervisorRecordSupplier); - - supervisor.getIgnorablePartitionIds(); - emptyClosedShardIds = supervisor.getEmptyClosedShardIds(); - Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), emptyClosedShardIds); - nonEmptyClosedShardIds = supervisor.getNonEmptyClosedShardIds(); - Assert.assertEquals(Collections.singleton(nonEmptyClosedShard.getShardId()), nonEmptyClosedShardIds); - + // active shards: open shard, closed-empty shard and closed-nonEmpty shard + Set activeShards1 = Stream.of(openShard, emptyClosedShard, nonEmptyClosedShard).collect(Collectors.toSet()); + // No calls to KinesisRecordSupplier#isClosedShardEmpty happen since the values have been cached + EasyMock.expect(supervisorRecordSupplier.getShards(stream)).andReturn(activeShards1).once(); // ITERATION 2: - // active shards: an open shard, closed-empty shard. closed-nonEmpty shard has expired - activeShards = getActiveShards(openShard, true, - emptyClosedShard, true, - nonEmptyClosedShard, false); - - EasyMock.reset(supervisorRecordSupplier); - EasyMock.expect(supervisorRecordSupplier.getShards(stream)).andReturn(activeShards).once(); - // calls to KinesisRecordSupplier#isClosedShardEmpty DO NOT happen - EasyMock.replay(supervisorRecordSupplier); - - supervisor.getIgnorablePartitionIds(); - emptyClosedShardIds = supervisor.getEmptyClosedShardIds(); - Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), emptyClosedShardIds); - // Expired shard is cleared from cache - nonEmptyClosedShardIds = supervisor.getNonEmptyClosedShardIds(); - Assert.assertEquals(Collections.emptySet(), nonEmptyClosedShardIds); + // active shards: open shard, closed-empty shard + Set activeShards2 = Stream.of(openShard, emptyClosedShard).collect(Collectors.toSet()); + // No calls to KinesisRecordSupplier#isClosedShardEmpty happen since the values have been cached + EasyMock.expect(supervisorRecordSupplier.getShards(stream)).andReturn(activeShards2).once(); // ITERATION 3: - // active shards: an open shard, closed-empty shard. closed-nonEmpty shard has expired - activeShards = getActiveShards(openShard, true, - emptyClosedShard, false, - nonEmptyClosedShard, false); + // active shards: open shard + Set activeShards3 = Stream.of(openShard).collect(Collectors.toSet()); + // No calls to KinesisRecordSupplier#isClosedShardEmpty happen since the values have been cached + EasyMock.expect(supervisorRecordSupplier.getShards(stream)).andReturn(activeShards3).once(); + + + // ITERATION 4: + // active shards: open shard, closed-empty shard and closed-nonEmpty shard + Set activeShards4 = Stream.of(openShard, emptyClosedShard, nonEmptyClosedShard) + .collect(Collectors.toSet()); + EasyMock.expect(supervisorRecordSupplier.getShards(stream)).andReturn(activeShards4).once(); + // The following two calls DO happen since the shards are processed after the cache has been cleared + EasyMock.expect(supervisorRecordSupplier.isClosedShardEmpty(stream, emptyClosedShard.getShardId())) + .andReturn(true).once(); + EasyMock.expect(supervisorRecordSupplier.isClosedShardEmpty(stream, nonEmptyClosedShard.getShardId())) + .andReturn(false).once(); - EasyMock.reset(supervisorRecordSupplier); - EasyMock.expect(supervisorRecordSupplier.getShards(stream)).andReturn(activeShards).once(); - // calls to KinesisRecordSupplier#isClosedShardEmpty DO NOT happen EasyMock.replay(supervisorRecordSupplier); - supervisor.getIgnorablePartitionIds(); - // Expired shard is cleared from cache - emptyClosedShardIds = supervisor.getEmptyClosedShardIds(); - Assert.assertEquals(Collections.emptySet(), emptyClosedShardIds); - // Expired shard is cleared from cache - nonEmptyClosedShardIds = supervisor.getNonEmptyClosedShardIds(); - Assert.assertEquals(Collections.emptySet(), nonEmptyClosedShardIds); - } - - private Set getActiveShards(Shard openShard, boolean isOpenShardActive, - Shard emptyClosedShard, boolean isEmptyClosedShardActive, - Shard nonEmptyClosedShard, boolean isNonEmptyClosedShardActive) - { - ImmutableSet.Builder activeShards = ImmutableSet.builder(); - - if (isOpenShardActive) { - EasyMock.reset(openShard); - EasyMock.expect(openShard.getShardId()).andReturn("openShard"); - SequenceNumberRange openShardRange = EasyMock.mock(SequenceNumberRange.class); - EasyMock.expect(openShardRange.getEndingSequenceNumber()).andReturn(null); - EasyMock.expect(openShard.getSequenceNumberRange()).andReturn(openShardRange); - EasyMock.replay(openShard, openShardRange); - activeShards.add(openShard); - } - - if (isEmptyClosedShardActive) { - EasyMock.reset(emptyClosedShard); - EasyMock.expect(emptyClosedShard.getShardId()).andReturn("emptyClosedShard").times(3); - SequenceNumberRange emptyClosedShardRange = EasyMock.mock(SequenceNumberRange.class); - EasyMock.expect(emptyClosedShardRange.getEndingSequenceNumber()).andReturn("notNull"); - EasyMock.expect(emptyClosedShard.getSequenceNumberRange()).andReturn(emptyClosedShardRange); - EasyMock.replay(emptyClosedShard, emptyClosedShardRange); - activeShards.add(emptyClosedShard); - } - - if (isNonEmptyClosedShardActive) { - EasyMock.reset(nonEmptyClosedShard); - EasyMock.expect(nonEmptyClosedShard.getShardId()).andReturn("nonEmptyClosedShard").times(3); - SequenceNumberRange nonEmptyClosedShardRange = EasyMock.mock(SequenceNumberRange.class); - EasyMock.expect(nonEmptyClosedShardRange.getEndingSequenceNumber()).andReturn("notNull"); - EasyMock.expect(nonEmptyClosedShard.getSequenceNumberRange()).andReturn(nonEmptyClosedShardRange); - EasyMock.replay(nonEmptyClosedShard, nonEmptyClosedShardRange); - activeShards.add(nonEmptyClosedShard); - } - - return activeShards.build(); + Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.getIgnorablePartitionIds()); + Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.getIgnorablePartitionIds()); + Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.getIgnorablePartitionIds()); + Assert.assertEquals(new HashSet<>(), supervisor.getIgnorablePartitionIds()); + Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.getIgnorablePartitionIds()); } private TestableKinesisSupervisor getTestableSupervisor( From 7239f05b3cabe83b59c0674bc8a4fc50cbdacada Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 16 Feb 2022 17:35:31 +0530 Subject: [PATCH 11/14] Refactor tests and avoid unnecessary mocks, indenting changes --- .../kinesis/KinesisRecordSupplierTest.java | 45 ++++---- .../supervisor/KinesisSupervisorTest.java | 107 ++++++------------ 2 files changed, 54 insertions(+), 98 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java index 3b31d4781970..2f21878af141 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java @@ -1043,42 +1043,37 @@ public void testIsClosedShardEmpty() ); Record record = new Record(); - // Setup mock expectations - // No records and null iterator -> empty - final String emptyAndClosed = "0"; - setupMockKinesisForShardId(mockKinesis, emptyAndClosed, new ArrayList<>(), null); - // no records and non-null iterator -> non-empty - final String emptyAndOpen = "1"; - setupMockKinesisForShardId(mockKinesis, emptyAndOpen, new ArrayList<>(), "nextIterator"); - // non-empty records and null iterator -> non-empty - final String nonEmptyAndClosed = "2"; - setupMockKinesisForShardId(mockKinesis, nonEmptyAndClosed, Collections.singletonList(record), null); - // non-empty records and non-null iterator -> non-empty - final String nonEmptyAndOpen = "3"; - setupMockKinesisForShardId(mockKinesis, nonEmptyAndOpen, Collections.singletonList(record), "nextIterator"); + //Empty if and only if returned records are empty and the iterator is null + final String recordsAbsentAndNullNext = "0"; + setupMockKinesisForShardId(mockKinesis, recordsAbsentAndNullNext, new ArrayList<>(), null); + + final String recordsPresentAndNullNext = "1"; + setupMockKinesisForShardId(mockKinesis, recordsPresentAndNullNext, Collections.singletonList(record), null); + + final String recordsAbsentAndNonNullNext = "2"; + setupMockKinesisForShardId(mockKinesis, recordsAbsentAndNonNullNext, new ArrayList<>(), "nextIterator"); + + final String recordsPresentAndNonNullNext = "3"; + setupMockKinesisForShardId(mockKinesis, recordsPresentAndNonNullNext, Collections.singletonList(record), "nextIterator"); EasyMock.replay(mockKinesis); - Assert.assertTrue(target.isClosedShardEmpty(STREAM, emptyAndClosed)); - Assert.assertFalse(target.isClosedShardEmpty(STREAM, emptyAndOpen)); - Assert.assertFalse(target.isClosedShardEmpty(STREAM, nonEmptyAndClosed)); - Assert.assertFalse(target.isClosedShardEmpty(STREAM, nonEmptyAndOpen)); + Assert.assertTrue(target.isClosedShardEmpty(STREAM, recordsAbsentAndNullNext)); + Assert.assertFalse(target.isClosedShardEmpty(STREAM, recordsPresentAndNullNext)); + Assert.assertFalse(target.isClosedShardEmpty(STREAM, recordsAbsentAndNonNullNext)); + Assert.assertFalse(target.isClosedShardEmpty(STREAM, recordsPresentAndNonNullNext)); } - // make it less verbose private void setupMockKinesisForShardId(AmazonKinesis kinesis, String shardId, List expectedRecords, String expectedNextIterator) { String shardIteratorType = ShardIteratorType.TRIM_HORIZON.toString(); String shardIterator = "shardIterator" + shardId; - GetShardIteratorResult shardIteratorResult = EasyMock.mock(GetShardIteratorResult.class); + GetShardIteratorResult shardIteratorResult = new GetShardIteratorResult().withShardIterator(shardIterator); EasyMock.expect(kinesis.getShardIterator(STREAM, shardId, shardIteratorType)).andReturn(shardIteratorResult).once(); - EasyMock.expect(shardIteratorResult.getShardIterator()).andReturn(shardIterator).once(); GetRecordsRequest request = new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1); - GetRecordsResult result = EasyMock.mock(GetRecordsResult.class); - EasyMock.expect(kinesis.getRecords(request)).andReturn(result).once(); - EasyMock.expect(result.getNextShardIterator()).andReturn(expectedNextIterator).once(); - EasyMock.expect(result.getRecords()).andReturn(expectedRecords).once(); - EasyMock.replay(shardIteratorResult, result); + GetRecordsResult result = new GetRecordsResult().withRecords(expectedRecords) + .withNextShardIterator(expectedNextIterator); + EasyMock.expect(kinesis.getRecords(request)).andReturn(result); } } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 92cc7afebdd3..553f73e4f752 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -344,9 +344,11 @@ public void testNoInitialStateWithAutoScaleOut() throws Exception EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); - EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)) - .andReturn(new KinesisDataSourceMetadata(null)) - .anyTimes(); + EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( + new KinesisDataSourceMetadata( + null + ) + ).anyTimes(); EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); replayAll(); @@ -1298,9 +1300,7 @@ public void testRequeueAdoptedTaskWhenFailed() throws Exception .times(1); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) - .andReturn(ImmutableList.of(captured.getValue())) - .anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(captured.getValue())).anyTimes(); EasyMock.expect(taskStorage.getStatus(iHaveFailed.getId())) .andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId(), "Dummy task status failure err message"))); EasyMock.expect(taskStorage.getStatus(runningTaskId)) @@ -1978,9 +1978,7 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) - .andReturn(ImmutableList.of(id1, id2)) - .anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); @@ -2522,9 +2520,7 @@ public void testStopGracefully() throws Exception EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) - .andReturn(ImmutableList.of(id1, id2, id3)) - .anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -2759,9 +2755,7 @@ public void testGetOffsetFromStorageForPartitionWithResetOffsetAutomatically() t .anyTimes(); supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())) - .andReturn("300") - .anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("300").anyTimes(); EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("400").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); @@ -2929,9 +2923,7 @@ public void testResetRunningTasks() throws Exception EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) - .andReturn(ImmutableList.of(id1, id2, id3)) - .anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -3086,9 +3078,7 @@ public void testNoDataIngestionTasks() throws Exception EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) - .andReturn(ImmutableList.of(id1, id2, id3)) - .anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -3244,9 +3234,7 @@ public void testCheckpointForInactiveTaskGroup() throws InterruptedException EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) - .andReturn(ImmutableList.of(id1, id2, id3)) - .anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -3398,9 +3386,7 @@ public void testCheckpointForUnknownTaskGroup() EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) - .andReturn(ImmutableList.of(id1, id2, id3)) - .anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -3571,9 +3557,7 @@ public void testSuspendedRunningTasks() throws Exception EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)) - .andReturn(ImmutableList.of(id1, id2, id3)) - .anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); @@ -4915,77 +4899,54 @@ public void testGetIgnorablePartitionIds() supervisor.setupRecordSupplier(); supervisor.tryInit(); String stream = supervisor.getKinesisSupervisorSpec().getSource(); + SequenceNumberRange openShardRange = new SequenceNumberRange().withEndingSequenceNumber(null); + SequenceNumberRange closedShardRange = new SequenceNumberRange().withEndingSequenceNumber("non-null"); // Open shard Shard openShard = EasyMock.mock(Shard.class); EasyMock.expect(openShard.getShardId()).andReturn("openShard").anyTimes(); - SequenceNumberRange openShardRange = EasyMock.mock(SequenceNumberRange.class); EasyMock.expect(openShard.getSequenceNumberRange()).andReturn(openShardRange).anyTimes(); - EasyMock.expect(openShardRange.getEndingSequenceNumber()).andReturn(null).anyTimes(); // Empty, closed shard Shard emptyClosedShard = EasyMock.mock(Shard.class); EasyMock.expect(emptyClosedShard.getShardId()).andReturn("emptyClosedShard").anyTimes(); - SequenceNumberRange emptyClosedShardRange = EasyMock.mock(SequenceNumberRange.class); - EasyMock.expect(emptyClosedShard.getSequenceNumberRange()).andReturn(emptyClosedShardRange).anyTimes(); - EasyMock.expect(emptyClosedShardRange.getEndingSequenceNumber()).andReturn("notNull").anyTimes(); + EasyMock.expect(emptyClosedShard.getSequenceNumberRange()).andReturn(closedShardRange).anyTimes(); // Non-empty, closed shard Shard nonEmptyClosedShard = EasyMock.mock(Shard.class); EasyMock.expect(nonEmptyClosedShard.getShardId()).andReturn("nonEmptyClosedShard").anyTimes(); - SequenceNumberRange nonEmptyClosedShardRange = EasyMock.mock(SequenceNumberRange.class); - EasyMock.expect(nonEmptyClosedShard.getSequenceNumberRange()).andReturn(nonEmptyClosedShardRange).anyTimes(); - EasyMock.expect(nonEmptyClosedShardRange.getEndingSequenceNumber()).andReturn("notNull").anyTimes(); + EasyMock.expect(nonEmptyClosedShard.getSequenceNumberRange()).andReturn(closedShardRange).anyTimes(); - EasyMock.replay(openShardRange, emptyClosedShardRange, nonEmptyClosedShardRange); EasyMock.replay(openShard, emptyClosedShard, nonEmptyClosedShard); - // ITERATION 0: - // active shards: open shard, closed-empty shard and closed-nonEmpty shard Set activeShards0 = Stream.of(openShard, emptyClosedShard, nonEmptyClosedShard).collect(Collectors.toSet()); - EasyMock.expect(supervisorRecordSupplier.getShards(stream)).andReturn(activeShards0).once(); - // The following two calls DO happen since the shards are processed for the first time after being closed - EasyMock.expect(supervisorRecordSupplier.isClosedShardEmpty(stream, emptyClosedShard.getShardId())) - .andReturn(true).once(); - EasyMock.expect(supervisorRecordSupplier.isClosedShardEmpty(stream, nonEmptyClosedShard.getShardId())) - .andReturn(false).once(); - - // ITERATION 1: - // active shards: open shard, closed-empty shard and closed-nonEmpty shard Set activeShards1 = Stream.of(openShard, emptyClosedShard, nonEmptyClosedShard).collect(Collectors.toSet()); - // No calls to KinesisRecordSupplier#isClosedShardEmpty happen since the values have been cached - EasyMock.expect(supervisorRecordSupplier.getShards(stream)).andReturn(activeShards1).once(); - - // ITERATION 2: - // active shards: open shard, closed-empty shard Set activeShards2 = Stream.of(openShard, emptyClosedShard).collect(Collectors.toSet()); - // No calls to KinesisRecordSupplier#isClosedShardEmpty happen since the values have been cached - EasyMock.expect(supervisorRecordSupplier.getShards(stream)).andReturn(activeShards2).once(); - - // ITERATION 3: - // active shards: open shard Set activeShards3 = Stream.of(openShard).collect(Collectors.toSet()); - // No calls to KinesisRecordSupplier#isClosedShardEmpty happen since the values have been cached - EasyMock.expect(supervisorRecordSupplier.getShards(stream)).andReturn(activeShards3).once(); - - - // ITERATION 4: - // active shards: open shard, closed-empty shard and closed-nonEmpty shard - Set activeShards4 = Stream.of(openShard, emptyClosedShard, nonEmptyClosedShard) - .collect(Collectors.toSet()); - EasyMock.expect(supervisorRecordSupplier.getShards(stream)).andReturn(activeShards4).once(); - // The following two calls DO happen since the shards are processed after the cache has been cleared + Set activeShards4 = Stream.of(openShard, emptyClosedShard, nonEmptyClosedShard).collect(Collectors.toSet()); + EasyMock.expect(supervisorRecordSupplier.getShards(stream)) + .andReturn(activeShards0).once() + .andReturn(activeShards1).once() + .andReturn(activeShards2).once() + .andReturn(activeShards3).once() + .andReturn(activeShards4).once(); + + // The following calls happen twice, once during the first call since there was no cache, + // and once during the last since the cache was cleared prior to it EasyMock.expect(supervisorRecordSupplier.isClosedShardEmpty(stream, emptyClosedShard.getShardId())) - .andReturn(true).once(); + .andReturn(true).times(2); EasyMock.expect(supervisorRecordSupplier.isClosedShardEmpty(stream, nonEmptyClosedShard.getShardId())) - .andReturn(false).once(); + .andReturn(false).times(2); EasyMock.replay(supervisorRecordSupplier); + // There is a closed and empty shard, which can be ignored Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.getIgnorablePartitionIds()); Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.getIgnorablePartitionIds()); Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.getIgnorablePartitionIds()); + // The closed and empty shard expired and no longer needs to be considered Assert.assertEquals(new HashSet<>(), supervisor.getIgnorablePartitionIds()); + // A closed and empty shard has been added again, which can be ignored Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.getIgnorablePartitionIds()); } @@ -5186,7 +5147,7 @@ private TestableKinesisSupervisor getTestableSupervisor( fetchDelayMillis, null, null, - autoScalerConfig, + autoScalerConfig, false ); From 8908f87c053270ee6aacbf92484cca905f8a1710 Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 16 Feb 2022 18:50:41 +0530 Subject: [PATCH 12/14] Refactor tests and avoid unnecessary mocks --- .../kinesis/KinesisRecordSupplierTest.java | 26 +++++----- .../supervisor/KinesisSupervisorTest.java | 51 ++++++++----------- 2 files changed, 34 insertions(+), 43 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java index 2f21878af141..435c7b11b7cf 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java @@ -1043,25 +1043,25 @@ public void testIsClosedShardEmpty() ); Record record = new Record(); - //Empty if and only if returned records are empty and the iterator is null - final String recordsAbsentAndNullNext = "0"; - setupMockKinesisForShardId(mockKinesis, recordsAbsentAndNullNext, new ArrayList<>(), null); + final String shardWithoutRecordsAndNullNextIterator = "0"; + setupMockKinesisForShardId(mockKinesis, shardWithoutRecordsAndNullNextIterator, new ArrayList<>(), null); - final String recordsPresentAndNullNext = "1"; - setupMockKinesisForShardId(mockKinesis, recordsPresentAndNullNext, Collections.singletonList(record), null); + final String shardWithRecordsAndNullNextIterator = "1"; + setupMockKinesisForShardId(mockKinesis, shardWithRecordsAndNullNextIterator, Collections.singletonList(record), null); - final String recordsAbsentAndNonNullNext = "2"; - setupMockKinesisForShardId(mockKinesis, recordsAbsentAndNonNullNext, new ArrayList<>(), "nextIterator"); + final String shardWithoutRecordsAndNonNullNextIterator = "2"; + setupMockKinesisForShardId(mockKinesis, shardWithoutRecordsAndNonNullNextIterator, new ArrayList<>(), "nextIterator"); - final String recordsPresentAndNonNullNext = "3"; - setupMockKinesisForShardId(mockKinesis, recordsPresentAndNonNullNext, Collections.singletonList(record), "nextIterator"); + final String shardWithRecordsAndNonNullNextIterator = "3"; + setupMockKinesisForShardId(mockKinesis, shardWithRecordsAndNonNullNextIterator, Collections.singletonList(record), "nextIterator"); EasyMock.replay(mockKinesis); - Assert.assertTrue(target.isClosedShardEmpty(STREAM, recordsAbsentAndNullNext)); - Assert.assertFalse(target.isClosedShardEmpty(STREAM, recordsPresentAndNullNext)); - Assert.assertFalse(target.isClosedShardEmpty(STREAM, recordsAbsentAndNonNullNext)); - Assert.assertFalse(target.isClosedShardEmpty(STREAM, recordsPresentAndNonNullNext)); + // A closed shard is empty only when the records are empty and the next iterator is null + Assert.assertTrue(target.isClosedShardEmpty(STREAM, shardWithoutRecordsAndNullNextIterator)); + Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardWithRecordsAndNullNextIterator)); + Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardWithoutRecordsAndNonNullNextIterator)); + Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardWithRecordsAndNonNullNextIterator)); } private void setupMockKinesisForShardId(AmazonKinesis kinesis, String shardId, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 553f73e4f752..e58b17f2330f 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -4902,34 +4902,19 @@ public void testGetIgnorablePartitionIds() SequenceNumberRange openShardRange = new SequenceNumberRange().withEndingSequenceNumber(null); SequenceNumberRange closedShardRange = new SequenceNumberRange().withEndingSequenceNumber("non-null"); - // Open shard - Shard openShard = EasyMock.mock(Shard.class); - EasyMock.expect(openShard.getShardId()).andReturn("openShard").anyTimes(); - EasyMock.expect(openShard.getSequenceNumberRange()).andReturn(openShardRange).anyTimes(); - - // Empty, closed shard - Shard emptyClosedShard = EasyMock.mock(Shard.class); - EasyMock.expect(emptyClosedShard.getShardId()).andReturn("emptyClosedShard").anyTimes(); - EasyMock.expect(emptyClosedShard.getSequenceNumberRange()).andReturn(closedShardRange).anyTimes(); - - // Non-empty, closed shard - Shard nonEmptyClosedShard = EasyMock.mock(Shard.class); - EasyMock.expect(nonEmptyClosedShard.getShardId()).andReturn("nonEmptyClosedShard").anyTimes(); - EasyMock.expect(nonEmptyClosedShard.getSequenceNumberRange()).andReturn(closedShardRange).anyTimes(); - - EasyMock.replay(openShard, emptyClosedShard, nonEmptyClosedShard); - - Set activeShards0 = Stream.of(openShard, emptyClosedShard, nonEmptyClosedShard).collect(Collectors.toSet()); - Set activeShards1 = Stream.of(openShard, emptyClosedShard, nonEmptyClosedShard).collect(Collectors.toSet()); - Set activeShards2 = Stream.of(openShard, emptyClosedShard).collect(Collectors.toSet()); - Set activeShards3 = Stream.of(openShard).collect(Collectors.toSet()); - Set activeShards4 = Stream.of(openShard, emptyClosedShard, nonEmptyClosedShard).collect(Collectors.toSet()); + Shard openShard = new Shard().withShardId("openShard") + .withSequenceNumberRange(openShardRange); + Shard emptyClosedShard = new Shard().withShardId("emptyClosedShard") + .withSequenceNumberRange(closedShardRange); + Shard nonEmptyClosedShard = new Shard().withShardId("nonEmptyClosedShard") + .withSequenceNumberRange(closedShardRange); + EasyMock.expect(supervisorRecordSupplier.getShards(stream)) - .andReturn(activeShards0).once() - .andReturn(activeShards1).once() - .andReturn(activeShards2).once() - .andReturn(activeShards3).once() - .andReturn(activeShards4).once(); + .andReturn(ImmutableSet.of(openShard, nonEmptyClosedShard, emptyClosedShard)).once() + .andReturn(ImmutableSet.of(openShard, nonEmptyClosedShard, emptyClosedShard)).once() + .andReturn(ImmutableSet.of(openShard, emptyClosedShard)).once() + .andReturn(ImmutableSet.of(openShard)).once() + .andReturn(ImmutableSet.of(openShard, nonEmptyClosedShard, emptyClosedShard)).once(); // The following calls happen twice, once during the first call since there was no cache, // and once during the last since the cache was cleared prior to it @@ -4940,13 +4925,19 @@ public void testGetIgnorablePartitionIds() EasyMock.replay(supervisorRecordSupplier); - // There is a closed and empty shard, which can be ignored + // ActiveShards = {open, empty-closed, nonEmpty-closed}, IgnorableShards = {empty-closed} + // {empty-closed, nonEmpty-closed} added to cache Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.getIgnorablePartitionIds()); + // ActiveShards = {open, empty-closed, nonEmpty-closed}, IgnorableShards = {empty-closed} Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.getIgnorablePartitionIds()); + // ActiveShards = {open, empty-closed}, IgnorableShards = {empty-closed} + // {nonEmpty-closed} removed from cache Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.getIgnorablePartitionIds()); - // The closed and empty shard expired and no longer needs to be considered + // ActiveShards = {open}, IgnorableShards = {} + // {empty-closed} removed from cache Assert.assertEquals(new HashSet<>(), supervisor.getIgnorablePartitionIds()); - // A closed and empty shard has been added again, which can be ignored + // ActiveShards = {open, empty-closed, nonEmpty-closed}, IgnorableShards = {empty-closed} + // {empty-closed, nonEmpty-closed} re-added to cache Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.getIgnorablePartitionIds()); } From b975f82062dbdd19e35393693365851fccedb361 Mon Sep 17 00:00:00 2001 From: Amatya Date: Thu, 17 Feb 2022 13:28:10 +0530 Subject: [PATCH 13/14] Synchronize the method to update the cache for closed shards --- .../kinesis/supervisor/KinesisSupervisor.java | 35 ++++++------------- .../supervisor/KinesisSupervisorTest.java | 2 -- 2 files changed, 11 insertions(+), 26 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index 07a77f5c4889..730b05bf4acb 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -91,7 +91,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor currentPartitionTimeLag; - // Maintain sets of currently closed shards to find "bad" (closed and empty) shards + // Maintain sets of currently closed shards to find ignorable (closed and empty) shards // Poll closed shards once and store the result to avoid redundant costly calls to kinesis private final Set emptyClosedShardIds = new TreeSet<>(); private final Set nonEmptyClosedShardIds = new TreeSet<>(); @@ -441,14 +441,15 @@ protected Set getIgnorablePartitionIds() return ImmutableSet.copyOf(emptyClosedShardIds); } - private void updateClosedShardCache() + private synchronized void updateClosedShardCache() { - String stream = spec.getSource(); - Set allActiveShards = ((KinesisRecordSupplier) recordSupplier).getShards(stream); - Set activeClosedShards = allActiveShards.stream() - .filter(shard -> isShardClosed(shard)) - .map(Shard::getShardId) - .collect(Collectors.toSet()); + final KinesisRecordSupplier kinesisRecordSupplier = (KinesisRecordSupplier) recordSupplier; + final String stream = spec.getSource(); + final Set allActiveShards = kinesisRecordSupplier.getShards(stream); + final Set activeClosedShards = allActiveShards.stream() + .filter(shard -> isShardClosed(shard)) + .map(Shard::getShardId) + .collect(Collectors.toSet()); // clear stale shards emptyClosedShardIds.retainAll(activeClosedShards); @@ -460,8 +461,8 @@ private void updateClosedShardCache() continue; } - // Check using kinesis and add to cache - if (isClosedShardEmpty(stream, closedShardId)) { + // Check if it is closed using kinesis and add to cache + if (kinesisRecordSupplier.isClosedShardEmpty(stream, closedShardId)) { emptyClosedShardIds.add(closedShardId); } else { nonEmptyClosedShardIds.add(closedShardId); @@ -545,18 +546,4 @@ private boolean isShardClosed(Shard shard) { return shard.getSequenceNumberRange().getEndingSequenceNumber() != null; } - - /** - * Checking if a shard is empty requires fetching records which is quite expensive - * Fortunately, the results can be cached for closed shards as no more records can be written to them - * Please use this method only if the info is absent from the cache - * - * @param stream to which the shard belongs - * @param shardId of the shard - * @return if the shard is empty - */ - private boolean isClosedShardEmpty(String stream, String shardId) - { - return ((KinesisRecordSupplier) recordSupplier).isClosedShardEmpty(stream, shardId); - } } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index e58b17f2330f..e300fdbe37aa 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -115,8 +115,6 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; -import java.util.stream.Collectors; -import java.util.stream.Stream; public class KinesisSupervisorTest extends EasyMockSupport { From 31fd84cc3a5bff949f6e36a307904dfe74bc3fd7 Mon Sep 17 00:00:00 2001 From: Amatya Date: Thu, 17 Feb 2022 15:52:47 +0530 Subject: [PATCH 14/14] Rename method appropriately --- .../indexing/kinesis/supervisor/KinesisSupervisor.java | 2 +- .../kinesis/supervisor/KinesisSupervisorTest.java | 10 +++++----- .../supervisor/SeekableStreamSupervisor.java | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index 730b05bf4acb..75549a3fe6ca 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -435,7 +435,7 @@ protected boolean shouldSkipIgnorablePartitions() * @return set of shards ignorable by kinesis ingestion */ @Override - protected Set getIgnorablePartitionIds() + protected Set computeIgnorablePartitionIds() { updateClosedShardCache(); return ImmutableSet.copyOf(emptyClosedShardIds); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index e300fdbe37aa..f2a3cf5beb7a 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -4925,18 +4925,18 @@ public void testGetIgnorablePartitionIds() // ActiveShards = {open, empty-closed, nonEmpty-closed}, IgnorableShards = {empty-closed} // {empty-closed, nonEmpty-closed} added to cache - Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.getIgnorablePartitionIds()); + Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.computeIgnorablePartitionIds()); // ActiveShards = {open, empty-closed, nonEmpty-closed}, IgnorableShards = {empty-closed} - Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.getIgnorablePartitionIds()); + Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.computeIgnorablePartitionIds()); // ActiveShards = {open, empty-closed}, IgnorableShards = {empty-closed} // {nonEmpty-closed} removed from cache - Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.getIgnorablePartitionIds()); + Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.computeIgnorablePartitionIds()); // ActiveShards = {open}, IgnorableShards = {} // {empty-closed} removed from cache - Assert.assertEquals(new HashSet<>(), supervisor.getIgnorablePartitionIds()); + Assert.assertEquals(new HashSet<>(), supervisor.computeIgnorablePartitionIds()); // ActiveShards = {open, empty-closed, nonEmpty-closed}, IgnorableShards = {empty-closed} // {empty-closed, nonEmpty-closed} re-added to cache - Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.getIgnorablePartitionIds()); + Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.computeIgnorablePartitionIds()); } private TestableKinesisSupervisor getTestableSupervisor( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 0d65ee59b03a..5dc55904a02b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -2303,7 +2303,7 @@ protected boolean shouldSkipIgnorablePartitions() * * @return set of ids of ignorable partitions */ - protected Set getIgnorablePartitionIds() + protected Set computeIgnorablePartitionIds() { return ImmutableSet.of(); } @@ -2312,7 +2312,7 @@ public int getPartitionCount() { int partitionCount = recordSupplier.getPartitionIds(ioConfig.getStream()).size(); if (shouldSkipIgnorablePartitions()) { - partitionCount -= getIgnorablePartitionIds().size(); + partitionCount -= computeIgnorablePartitionIds().size(); } return partitionCount; } @@ -2325,7 +2325,7 @@ private boolean updatePartitionDataFromStream() try { partitionIdsFromSupplier = recordSupplier.getPartitionIds(ioConfig.getStream()); if (shouldSkipIgnorablePartitions()) { - partitionIdsFromSupplier.removeAll(getIgnorablePartitionIds()); + partitionIdsFromSupplier.removeAll(computeIgnorablePartitionIds()); } } catch (Exception e) {