Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize kinesis ingestion task assignment after resharding #12235

Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -667,28 +666,32 @@ public String getEarliestSequenceNumber(StreamPartition<String> partition)
* This makes the method resilient to LimitExceeded exceptions (compared to 100 shards, 10 TPS of describeStream)
*
* @param stream name of stream
*
* @return Set of Shard ids
* @return Immutable set of shards
*/
public Set<Shard> getShards(String stream)
{
ImmutableSet.Builder<Shard> shards = ImmutableSet.builder();
ListShardsRequest request = new ListShardsRequest().withStreamName(stream);
while (true) {
ListShardsResult result = kinesis.listShards(request);
shards.addAll(result.getShards());
String nextToken = result.getNextToken();
if (nextToken == null) {
return shards.build();
}
request = new ListShardsRequest().withNextToken(nextToken);
}
}

@Override
public Set<String> getPartitionIds(String stream)
{
return wrapExceptions(() -> {
final Set<String> retVal = new TreeSet<>();
ListShardsRequest request = new ListShardsRequest().withStreamName(stream);
while (true) {
ListShardsResult result = kinesis.listShards(request);
retVal.addAll(result.getShards()
.stream()
.map(Shard::getShardId)
.collect(Collectors.toList())
);
String nextToken = result.getNextToken();
if (nextToken == null) {
return retVal;
}
request = new ListShardsRequest().withNextToken(nextToken);
ImmutableSet.Builder<String> partitionIds = ImmutableSet.builder();
for (Shard shard : getShards(stream)) {
partitionIds.add(shard.getShardId());
}
return partitionIds.build();
});
}

Expand Down Expand Up @@ -750,6 +753,25 @@ public boolean isAnyFetchActive()
.anyMatch(fetch -> (fetch != null && !fetch.isDone()));
}

/**
* Fetches records from the specified shard to determine if it is empty.
* @param stream to which shard belongs
* @param shardId of the closed shard
* @return true if the closed shard is empty, false otherwise.
*/
public boolean isClosedShardEmpty(String stream, String shardId)
{
String shardIterator = kinesis.getShardIterator(stream,
shardId,
ShardIteratorType.TRIM_HORIZON.toString())
.getShardIterator();
GetRecordsRequest request = new GetRecordsRequest().withShardIterator(shardIterator)
.withLimit(1);
GetRecordsResult shardData = kinesis.getRecords(request);

return shardData.getRecords().isEmpty() && shardData.getNextShardIterator() == null;
}

/**
* Check that a {@link PartitionResource} has been assigned to this record supplier, and if so call
* {@link PartitionResource#seek} to move it to the latest offsets. Note that this method does not restart background
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

package org.apache.druid.indexing.kinesis.supervisor;

import com.amazonaws.services.kinesis.model.Shard;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.impl.ByteEntity;
Expand Down Expand Up @@ -64,6 +66,7 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;

/**
Expand All @@ -88,6 +91,11 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String,
private final AWSCredentialsConfig awsCredentialsConfig;
private volatile Map<String, Long> currentPartitionTimeLag;

// Maintain sets of currently closed shards to find "bad" (closed and empty) shards
AmatyaAvadhanula marked this conversation as resolved.
Show resolved Hide resolved
// Poll closed shards once and store the result to avoid redundant costly calls to kinesis
private final Set<String> emptyClosedShardIds = new TreeSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be in thread-safe container, or have access be protected with lock?

Copy link
Contributor

@kfaraz kfaraz Feb 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out, @zachjsh.
This code would be executed by the SeekableStreamSupervisor while executing a RunNotice (scheduled when status of a task changes) as well as a DynamicAllocationTasksNotice (scheduled for auto-scaling). There is a possibility of contention between these two executions.

We can make the part where the caches are updated synchronized.
Just changing these two caches to a Concurrent version might not be enough as a whole new list of active shards is fetched in updateClosedShardCache() and the caches must be updated with this new state before any other action is performed.

cc: @AmatyaAvadhanula

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synchronizing the whole method updateClosedShardCache would actually be preferable because the state returned by two subsequent calls to recordSupplier.getShards() can be different.
So this call should happen inside the synchronized block, as should the calls to recordSupplier.isClosedShardEmpty().

I hope this doesn't cause bottlenecks though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole method has been synchronized. Thanks!

private final Set<String> nonEmptyClosedShardIds = new TreeSet<>();

public KinesisSupervisor(
final TaskStorage taskStorage,
final TaskMaster taskMaster,
Expand Down Expand Up @@ -416,6 +424,51 @@ protected boolean supportsPartitionExpiration()
return true;
}

@Override
protected boolean shouldSkipIgnorablePartitions()
{
return spec.getSpec().getTuningConfig().isSkipIgnorableShards();
}

/**
* A kinesis shard is considered to be an ignorable partition if it is both closed and empty
* @return set of shards ignorable by kinesis ingestion
*/
@Override
protected Set<String> getIgnorablePartitionIds()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - this should be called computeIgnorablePartitionIds() or loadIgnorablePartitionIds()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the current verb indicates that it is just a getter but behind the scenes it can do network calls etc to fetch the partition ids.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

{
updateClosedShardCache();
return ImmutableSet.copyOf(emptyClosedShardIds);
}

private void updateClosedShardCache()
{
String stream = spec.getSource();
Set<Shard> allActiveShards = ((KinesisRecordSupplier) recordSupplier).getShards(stream);
Set<String> activeClosedShards = allActiveShards.stream()
.filter(shard -> isShardClosed(shard))
.map(Shard::getShardId)
.collect(Collectors.toSet());

// clear stale shards
emptyClosedShardIds.retainAll(activeClosedShards);
AmatyaAvadhanula marked this conversation as resolved.
Show resolved Hide resolved
nonEmptyClosedShardIds.retainAll(activeClosedShards);

for (String closedShardId : activeClosedShards) {
// Try to utilize cache
if (emptyClosedShardIds.contains(closedShardId) || nonEmptyClosedShardIds.contains(closedShardId)) {
continue;
}

// Check using kinesis and add to cache
if (isClosedShardEmpty(stream, closedShardId)) {
emptyClosedShardIds.add(closedShardId);
} else {
nonEmptyClosedShardIds.add(closedShardId);
}
}
}

@Override
protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadataWithExpiredPartitions(
SeekableStreamDataSourceMetadata<String, String> currentMetadata, Set<String> expiredPartitionIds
Expand Down Expand Up @@ -481,4 +534,29 @@ private SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadat

return new KinesisDataSourceMetadata(newSequences);
}

/**
* A shard is considered closed iff it has an ending sequence number.
*
* @param shard to be checked
* @return if shard is closed
*/
private boolean isShardClosed(Shard shard)
{
return shard.getSequenceNumberRange().getEndingSequenceNumber() != null;
}

/**
* Checking if a shard is empty requires fetching records which is quite expensive
* Fortunately, the results can be cached for closed shards as no more records can be written to them
* Please use this method only if the info is absent from the cache
*
* @param stream to which the shard belongs
* @param shardId of the shard
* @return if the shard is empty
*/
private boolean isClosedShardEmpty(String stream, String shardId)
{
return ((KinesisRecordSupplier) recordSupplier).isClosedShardEmpty(stream, shardId);
AmatyaAvadhanula marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
private final Duration shutdownTimeout;
private final Duration repartitionTransitionDuration;
private final Duration offsetFetchPeriod;
private final boolean skipIgnorableShards;

public static KinesisSupervisorTuningConfig defaultConfig()
{
Expand Down Expand Up @@ -77,6 +78,7 @@ public static KinesisSupervisorTuningConfig defaultConfig()
null,
null,
null,
null,
null
);
}
Expand Down Expand Up @@ -114,7 +116,8 @@ public KinesisSupervisorTuningConfig(
@JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll,
@JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod,
@JsonProperty("repartitionTransitionDuration") Period repartitionTransitionDuration,
@JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod
@JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod,
@JsonProperty("skipIgnorableShards") Boolean skipIgnorableShards
)
{
super(
Expand Down Expand Up @@ -162,6 +165,7 @@ public KinesisSupervisorTuningConfig(
offsetFetchPeriod,
DEFAULT_OFFSET_FETCH_PERIOD
);
this.skipIgnorableShards = (skipIgnorableShards != null ? skipIgnorableShards : false);
}

@Override
Expand Down Expand Up @@ -212,6 +216,12 @@ public Duration getOffsetFetchPeriod()
return offsetFetchPeriod;
}

@JsonProperty
public boolean isSkipIgnorableShards()
{
return skipIgnorableShards;
}

@Override
public String toString()
{
Expand Down Expand Up @@ -246,6 +256,7 @@ public String toString()
", maxRecordsPerPoll=" + getMaxRecordsPerPoll() +
", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() +
", repartitionTransitionDuration=" + getRepartitionTransitionDuration() +
", skipIgnorableShards=" + isSkipIgnorableShards() +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ public void testConvert()
null,
null,
null,
null,
null
);
KinesisIndexTaskTuningConfig copy = (KinesisIndexTaskTuningConfig) original.convertToTaskTuningConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -1023,4 +1024,56 @@ public void getPartitionTimeLag() throws InterruptedException
}
verifyAll();
}

@Test
public void testIsClosedShardEmpty()
{
AmazonKinesis mockKinesis = EasyMock.mock(AmazonKinesis.class);
KinesisRecordSupplier target = new KinesisRecordSupplier(mockKinesis,
recordsPerFetch,
0,
2,
false,
100,
5000,
5000,
60000,
5,
true
);
Record record = new Record();

final String shardWithoutRecordsAndNullNextIterator = "0";
setupMockKinesisForShardId(mockKinesis, shardWithoutRecordsAndNullNextIterator, new ArrayList<>(), null);

final String shardWithRecordsAndNullNextIterator = "1";
setupMockKinesisForShardId(mockKinesis, shardWithRecordsAndNullNextIterator, Collections.singletonList(record), null);

final String shardWithoutRecordsAndNonNullNextIterator = "2";
setupMockKinesisForShardId(mockKinesis, shardWithoutRecordsAndNonNullNextIterator, new ArrayList<>(), "nextIterator");

final String shardWithRecordsAndNonNullNextIterator = "3";
setupMockKinesisForShardId(mockKinesis, shardWithRecordsAndNonNullNextIterator, Collections.singletonList(record), "nextIterator");

EasyMock.replay(mockKinesis);

// A closed shard is empty only when the records are empty and the next iterator is null
Assert.assertTrue(target.isClosedShardEmpty(STREAM, shardWithoutRecordsAndNullNextIterator));
Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardWithRecordsAndNullNextIterator));
Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardWithoutRecordsAndNonNullNextIterator));
Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardWithRecordsAndNonNullNextIterator));
}

private void setupMockKinesisForShardId(AmazonKinesis kinesis, String shardId,
List<Record> expectedRecords, String expectedNextIterator)
{
String shardIteratorType = ShardIteratorType.TRIM_HORIZON.toString();
String shardIterator = "shardIterator" + shardId;
GetShardIteratorResult shardIteratorResult = new GetShardIteratorResult().withShardIterator(shardIterator);
EasyMock.expect(kinesis.getShardIterator(STREAM, shardId, shardIteratorType)).andReturn(shardIteratorResult).once();
GetRecordsRequest request = new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1);
GetRecordsResult result = new GetRecordsResult().withRecords(expectedRecords)
.withNextShardIterator(expectedNextIterator);
EasyMock.expect(kinesis.getRecords(request)).andReturn(result);
}
}
Loading