-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kinesis ingestion with empty shards #12792
Kinesis ingestion with empty shards #12792
Conversation
} | ||
|
||
@Override | ||
public boolean isAvailableWithEarliest(OrderedSequenceNumber<String> earliest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method name is a bit hard to understand. Maybe javadoc will help
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the feedback. Will add one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
@Override | ||
public boolean isMoreToReadBeforeReadingRecord(OrderedSequenceNumber<String> end) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method name is a bit hard to understand. Maybe javadoc will help
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is based on an existing method of the same name. I'll add a javadoc though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java
Show resolved
Hide resolved
...-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also add some Integration test (See integration test group "kinesis-index")?
@maytasm, thanks for the review.
Sure, I can add a simple test where the records are queried after ingesting from a stream with at least one empty shard. |
Sounds good to me! Thanks! |
@AmatyaAvadhanula
|
@@ -157,6 +158,14 @@ public Long getEarliestSequenceNumber(StreamPartition<Integer> partition) | |||
return nextPos; | |||
} | |||
|
|||
@Override | |||
public boolean isOffsetAvailable(StreamPartition<Integer> partition, OrderedSequenceNumber<Long> offset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this needs a lock on recordSupplierLock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thanks!
@@ -666,6 +670,30 @@ public String getEarliestSequenceNumber(StreamPartition<String> partition) | |||
return getSequenceNumber(partition, ShardIteratorType.TRIM_HORIZON); | |||
} | |||
|
|||
@Override | |||
public boolean isOffsetAvailable(StreamPartition<String> partition, OrderedSequenceNumber<String> offset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is quite different from the current logic in KinesisRecordSupplier#getSequenceNumber.
such as handling when closed
or the Retry for kinesis.getRecords
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous exception handling has been added
Added a few tests where empty shards are present
Yes, an end of shard marker is still being returned. If this doesn't happen immediately, the task will handle it appropriately after assignment.
Could you please clarify what timeoutMillis is? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this include a Design Review, we should get one more approval from a commiter
@@ -300,7 +300,6 @@ The `tuningConfig` is optional. If no `tuningConfig` is specified, default param | |||
|`recordBufferSize`|Integer|Size of the buffer (number of events) used between the Kinesis fetch threads and the main ingestion thread.|no (default == 10000)| | |||
|`recordBufferOfferTimeout`|Integer|Length of time in milliseconds to wait for space to become available in the buffer before timing out.| no (default == 5000)| | |||
|`recordBufferFullWait`|Integer|Length of time in milliseconds to wait for the buffer to drain before attempting to fetch records from Kinesis again.|no (default == 5000)| | |||
|`fetchSequenceNumberTimeout`|Integer|Length of time in milliseconds to wait for Kinesis to return the earliest or latest sequence number for a shard. Kinesis will not return the latest sequence number if no data is actively being written to that shard. In this case, this fetch call will repeatedly timeout and retry until fresh data is written to the stream.|no (default == 60000)| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if someone has this parameter in their ingestion spec?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is ignored. (Just like any parameter which isn't relevant in the payload, if at all)
@@ -128,8 +128,7 @@ protected void possiblyResetDataSourceMetadata( | |||
for (final StreamPartition<String> streamPartition : assignment) { | |||
String sequence = currOffsets.get(streamPartition.getPartitionId()); | |||
String earliestSequenceNumber = recordSupplier.getEarliestSequenceNumber(streamPartition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we pass this to the isOffsetAvailable
method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since its an expensive call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's no longer being used here. Thanks for catching this!
); | ||
} | ||
|
||
@Override | ||
public int compareTo(OrderedSequenceNumber<String> o) | ||
{ | ||
KinesisSequenceNumber num = (KinesisSequenceNumber) o; | ||
if (isUnread() && num.isUnread()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we fallback to comparing maxSequenceNumber in this case as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a boolean (isMaxSequenceNumber) which is used later. There is only one value to be compared per sequence number
@abhishekagarwal87 thanks for review! Could you please verify the changes in the latest commit as well? It tries to fall back to earliest sequence number comparison in case the primary check for kinesis offset availability fails. |
Thank you @AmatyaAvadhanula. I have merged your change. |
@AmatyaAvadhanula |
Enables Kinesis ingestion with empty shards
Description
Kinesis ingestion requires all shards to have at least 1 record at the required position in druid.
Even if this is satisified initially, resharding the stream can lead to empty intermediate shards. A significant delay in writing to newly created shards was also problematic.
Kinesis shard sequence numbers are big integers. Introduce two more custom sequence tokens UNREAD_TRIM_HORIZON and UNREAD_LATEST to indicate that a shard has not been read from and that it needs to be read from the start or the end respectively.
These values can be used to avoid the need to read at least one record to obtain a sequence number for ingesting a newly discovered shard.
If a record cannot be obtained immediately, use a marker to obtain the relevant shardIterator and use this shardIterator to obtain a valid sequence number. As long as a valid sequence number is not obtained, continue storing the token as the offset.
These tokens (UNREAD_TRIM_HORIZON and UNREAD_LATEST) are logically ordered to be earlier than any valid sequence number.
However, the ordering requires a few subtle changes to the existing mechanism for record sequence validation:
The sequence availability check ensures that the current offset is before the earliest available sequence in the shard. However, current token being an UNREAD token indicates that any sequence number in the shard is valid (despite the ordering)
Kinesis sequence numbers are inclusive i.e if current sequence == end sequence, there are more records left to read.
However, the equality check is exclusive when dealing with UNREAD tokens.
Key changed/added classes in this PR
KinesisSequenceNumber
KinesisRecordSupplier
This PR has: