Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize kinesis ingestion task assignment after resharding #12235

Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -667,28 +666,31 @@ public String getEarliestSequenceNumber(StreamPartition<String> partition)
* This makes the method resilient to LimitExceeded exceptions (compared to 100 shards, 10 TPS of describeStream)
*
* @param stream name of stream
*
* @return Set of Shard ids
* @return Immutable set of shards
*/
public Set<Shard> getShards(String stream)
{
ImmutableSet.Builder<Shard> shards = ImmutableSet.builder();
ListShardsRequest request = new ListShardsRequest().withStreamName(stream);
while (true) {
ListShardsResult result = kinesis.listShards(request);
shards.addAll(result.getShards());
String nextToken = result.getNextToken();
if (nextToken == null) {
return shards.build();
}
request = new ListShardsRequest().withNextToken(nextToken);
}
}

@Override
public Set<String> getPartitionIds(String stream)
{
return wrapExceptions(() -> {
final Set<String> retVal = new TreeSet<>();
ListShardsRequest request = new ListShardsRequest().withStreamName(stream);
while (true) {
ListShardsResult result = kinesis.listShards(request);
retVal.addAll(result.getShards()
.stream()
.map(Shard::getShardId)
.collect(Collectors.toList())
);
String nextToken = result.getNextToken();
if (nextToken == null) {
return retVal;
}
request = new ListShardsRequest().withNextToken(nextToken);
}
return ImmutableSet.copyOf(getShards(stream).stream()
.map(shard -> shard.getShardId())
.collect(Collectors.toList())
AmatyaAvadhanula marked this conversation as resolved.
Show resolved Hide resolved
);
});
}

Expand Down Expand Up @@ -750,6 +752,25 @@ public boolean isAnyFetchActive()
.anyMatch(fetch -> (fetch != null && !fetch.isDone()));
}

/**
* Is costly and requires polling the shard to determine if it's empty
AmatyaAvadhanula marked this conversation as resolved.
Show resolved Hide resolved
* @param stream to which shard belongs
* @param shardId of the closed shard
* @return if the closed shard is empty
AmatyaAvadhanula marked this conversation as resolved.
Show resolved Hide resolved
*/
public boolean isClosedShardEmpty(String stream, String shardId)
{
String shardIterator = kinesis.getShardIterator(stream,
shardId,
ShardIteratorType.TRIM_HORIZON.toString())
.getShardIterator();
GetRecordsRequest request = new GetRecordsRequest().withShardIterator(shardIterator)
.withLimit(1);
GetRecordsResult shardData = kinesis.getRecords(request);

return shardData.getRecords().isEmpty() && shardData.getNextShardIterator() == null;
}

/**
* Check that a {@link PartitionResource} has been assigned to this record supplier, and if so call
* {@link PartitionResource#seek} to move it to the latest offsets. Note that this method does not restart background
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

package org.apache.druid.indexing.kinesis.supervisor;

import com.amazonaws.services.kinesis.model.Shard;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.impl.ByteEntity;
Expand Down Expand Up @@ -64,6 +66,7 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;

/**
Expand All @@ -88,6 +91,11 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String,
private final AWSCredentialsConfig awsCredentialsConfig;
private volatile Map<String, Long> currentPartitionTimeLag;

// Maintain sets of currently closed shards to find "bad" (closed and empty) shards
AmatyaAvadhanula marked this conversation as resolved.
Show resolved Hide resolved
// Poll closed shards once and store the result to avoid redundant costly calls to kinesis
private final Set<String> emptyClosedShardIds = new TreeSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be in thread-safe container, or have access be protected with lock?

Copy link
Contributor

@kfaraz kfaraz Feb 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out, @zachjsh.
This code would be executed by the SeekableStreamSupervisor while executing a RunNotice (scheduled when status of a task changes) as well as a DynamicAllocationTasksNotice (scheduled for auto-scaling). There is a possibility of contention between these two executions.

We can make the part where the caches are updated synchronized.
Just changing these two caches to a Concurrent version might not be enough as a whole new list of active shards is fetched in updateClosedShardCache() and the caches must be updated with this new state before any other action is performed.

cc: @AmatyaAvadhanula

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synchronizing the whole method updateClosedShardCache would actually be preferable because the state returned by two subsequent calls to recordSupplier.getShards() can be different.
So this call should happen inside the synchronized block, as should the calls to recordSupplier.isClosedShardEmpty().

I hope this doesn't cause bottlenecks though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole method has been synchronized. Thanks!

private final Set<String> nonEmptyClosedShardIds = new TreeSet<>();

public KinesisSupervisor(
final TaskStorage taskStorage,
final TaskMaster taskMaster,
Expand Down Expand Up @@ -416,6 +424,49 @@ protected boolean supportsPartitionExpiration()
return true;
}

@Override
protected boolean shouldSkipIgnorablePartitions()
{
return spec.getSpec().getTuningConfig().shouldSkipIgnorableShards();
}

/**
* Closed and empty shards can be ignored for ingestion,
* Use this method if skipIgnorablePartitions is true in the spec
*
* These partitions can be safely ignored for both ingestion task assignment and autoscaler limits
*
* @return the set of ignorable shards' ids
*/
@Override
protected Set<String> getIgnorablePartitionIds()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - this should be called computeIgnorablePartitionIds() or loadIgnorablePartitionIds()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the current verb indicates that it is just a getter but behind the scenes it can do network calls etc to fetch the partition ids.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

{
updateClosedShardCache();
return getEmptyClosedShardIds();
}

private void updateClosedShardCache()
{
String stream = spec.getSource();
Set<Shard> allActiveShards = ((KinesisRecordSupplier) recordSupplier).getShards(stream);
Set<String> activeClosedShards = allActiveShards.stream()
.filter(shard -> isShardClosed(shard))
.map(Shard::getShardId).collect(Collectors.toSet());

// clear stale shards
emptyClosedShardIds.retainAll(activeClosedShards);
AmatyaAvadhanula marked this conversation as resolved.
Show resolved Hide resolved
nonEmptyClosedShardIds.retainAll(activeClosedShards);

// add newly closed shards to cache
for (String closedShardId : activeClosedShards) {
if (isClosedShardEmpty(stream, closedShardId)) {
emptyClosedShardIds.add(closedShardId);
} else {
nonEmptyClosedShardIds.add(closedShardId);
}
}
}

@Override
protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadataWithExpiredPartitions(
SeekableStreamDataSourceMetadata<String, String> currentMetadata, Set<String> expiredPartitionIds
Expand Down Expand Up @@ -481,4 +532,53 @@ private SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadat

return new KinesisDataSourceMetadata(newSequences);
}

/**
* Closed shards iff they have an ending sequence number
AmatyaAvadhanula marked this conversation as resolved.
Show resolved Hide resolved
*
* @param shard to be checked
* @return if shard is closed
*/
private boolean isShardClosed(Shard shard)
{
return shard.getSequenceNumberRange().getEndingSequenceNumber() != null;
}

/**
* Checking if a shard is empty requires polling for records which is quite expensive
* Fortunately, the results can be cached for closed shards as no more records can be written to them
*
* @param stream to which the shard belongs
* @param shardId of the shard
* @return if the shard is empty
*/
private boolean isClosedShardEmpty(String stream, String shardId)
{
// utilize cache
if (emptyClosedShardIds.contains(shardId)) {
return true;
}
if (nonEmptyClosedShardIds.contains(shardId)) {
return false;
}

// Make an expensive call to kinesis
return ((KinesisRecordSupplier) recordSupplier).isClosedShardEmpty(stream, shardId);
AmatyaAvadhanula marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* @return immutable copy of cache for empty, closed shards
*/
Set<String> getEmptyClosedShardIds()
{
return ImmutableSet.copyOf(emptyClosedShardIds);
}

/**
* @return immutable copy of cache for non-empty, closed shards
*/
Set<String> getNonEmptyClosedShardIds()
{
return ImmutableSet.copyOf(nonEmptyClosedShardIds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
private final Duration shutdownTimeout;
private final Duration repartitionTransitionDuration;
private final Duration offsetFetchPeriod;
private final boolean skipIgnorableShards;

public static KinesisSupervisorTuningConfig defaultConfig()
{
Expand Down Expand Up @@ -77,6 +78,7 @@ public static KinesisSupervisorTuningConfig defaultConfig()
null,
null,
null,
null,
null
);
}
Expand Down Expand Up @@ -114,7 +116,8 @@ public KinesisSupervisorTuningConfig(
@JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll,
@JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod,
@JsonProperty("repartitionTransitionDuration") Period repartitionTransitionDuration,
@JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod
@JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod,
@JsonProperty("skipIgnorableShards") Boolean skipIgnorableShards
)
{
super(
Expand Down Expand Up @@ -162,6 +165,7 @@ public KinesisSupervisorTuningConfig(
offsetFetchPeriod,
DEFAULT_OFFSET_FETCH_PERIOD
);
this.skipIgnorableShards = (skipIgnorableShards != null ? skipIgnorableShards : false);
}

@Override
Expand Down Expand Up @@ -212,6 +216,12 @@ public Duration getOffsetFetchPeriod()
return offsetFetchPeriod;
}

@JsonProperty
public boolean shouldSkipIgnorableShards()
AmatyaAvadhanula marked this conversation as resolved.
Show resolved Hide resolved
{
return skipIgnorableShards;
}

@Override
public String toString()
{
Expand Down Expand Up @@ -246,6 +256,7 @@ public String toString()
", maxRecordsPerPoll=" + getMaxRecordsPerPoll() +
", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() +
", repartitionTransitionDuration=" + getRepartitionTransitionDuration() +
", skipIgnorableShards=" + shouldSkipIgnorableShards() +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ public void testConvert()
null,
null,
null,
null,
null
);
KinesisIndexTaskTuningConfig copy = (KinesisIndexTaskTuningConfig) original.convertToTaskTuningConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -1023,4 +1024,61 @@ public void getPartitionTimeLag() throws InterruptedException
}
verifyAll();
}

@Test
public void testIsClosedShardEmpty()
{
AmazonKinesis mockKinesis = EasyMock.mock(AmazonKinesis.class);
KinesisRecordSupplier target = new KinesisRecordSupplier(mockKinesis,
recordsPerFetch,
0,
2,
false,
100,
5000,
5000,
60000,
5,
true
);
Record record = new Record();
String shardId;

// No records and null iterator -> empty
shardId = "0";
isClosedShardEmptyHelper(mockKinesis, shardId, new ArrayList<>(), null);
Assert.assertTrue(target.isClosedShardEmpty(STREAM, shardId));

// no records and non-null iterator -> non-empty
shardId = "1";
isClosedShardEmptyHelper(mockKinesis, shardId, new ArrayList<>(), "nextIterator");
Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));

// non-empty records and null iterator -> non-empty
shardId = "2";
isClosedShardEmptyHelper(mockKinesis, shardId, Collections.singletonList(record), null);
Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));

// non-empty records and non-null iterator -> non-empty
shardId = "3";
AmatyaAvadhanula marked this conversation as resolved.
Show resolved Hide resolved
isClosedShardEmptyHelper(mockKinesis, shardId, Collections.singletonList(record), "nextIterator");
Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));
}

private void isClosedShardEmptyHelper(AmazonKinesis kinesis, String shardId,
AmatyaAvadhanula marked this conversation as resolved.
Show resolved Hide resolved
List<Record> expectedRecords, String expectedNextIterator)
{
EasyMock.reset(kinesis);
String shardIteratorType = ShardIteratorType.TRIM_HORIZON.toString();
String shardIterator = "shardIterator" + shardId;
GetShardIteratorResult shardIteratorResult = EasyMock.mock(GetShardIteratorResult.class);
EasyMock.expect(kinesis.getShardIterator(STREAM, shardId, shardIteratorType)).andReturn(shardIteratorResult).once();
EasyMock.expect(shardIteratorResult.getShardIterator()).andReturn(shardIterator).once();
GetRecordsRequest request = new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1);
GetRecordsResult result = EasyMock.mock(GetRecordsResult.class);
EasyMock.expect(kinesis.getRecords(request)).andReturn(result).once();
EasyMock.expect(result.getNextShardIterator()).andReturn(expectedNextIterator).once();
EasyMock.expect(result.getRecords()).andReturn(expectedRecords).once();
EasyMock.replay(shardIteratorResult, result, kinesis);
}
}
Loading