-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-8511] [WIP] KinesisIO.Read enhanced fanout #9899
Conversation
Here are the diffs between old and new in case that makes it easier to see the changes. |
Thanks! Sorry for delay with response, I was a bit a busy this week but I'll take a look later. |
Btw, afaik, @cmachgodaddy also was working on adding new Kinesis fanout support into |
Thanks @aromanenko-dev, will do. |
This diff has just the changes for enhanced fanout before I did the SDK v2 conversion. There have been a few small changes / bugfixes since then but hopefully you can get the idea. |
@aromanenko-dev and @iemejia , as we have discussed about this IO migration, we may need to consider:
However, this PR seems to support a "fall back" strategy, which means if users don't provide consumer's ARN, it will fall back and use the standard way of pull records. I don't have any preferences, for which one would work better or cleaner, as long as we support backward compatibility. What do you think? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test adding reviewer
sdks/java/io/kinesis2/src/main/java/org/apache/beam/sdk/io/kinesis2/BasicKinesisProvider.java
Outdated
Show resolved
Hide resolved
Hi @aromanenko-dev any thoughts on this PR or Cam's comments? |
@jfarr Sorry again for a delay with review, I'll try to do this asap |
Run Python PreCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again for your contribution! And thank you for separate diff files that include only your changes. I did a review based on them and I expect that all other functionality remains as it was before.
In general, it looks fine, I left several comments inline and there are several things that I wanted to mention in general:
- I think it would make sense to move KinesisIO SDK V2 into new package
org.apache.beam.sdk.io.aws2
with all other Amazon SDK V2 related IOs. Initially, it was in separate package since KPL had Apache incompatible license, now it's fixed (see BEAM-3549 and BEAM-7894 ). - Could you add unit tests for new functionality?
- Have you tested it on real cluster and real data? If yes, any additional info about that would be very appreciated to have.
- I agree with @cmachgodaddy point that we need to make it clear for users that it supports two modes of reading - pull and subscribe. I think more examples in KinesisIO Javadoc should be enough.
- Does "subscribe mode" support closing or splitting the shards?
import software.amazon.kinesis.common.InitialPositionInStream; | ||
|
||
/** | ||
* {@link PTransform}s for reading from and writing to <a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, remove "writing" since this PR adds support of reading only.
* .withInitialPositionInStream(InitialPositionInStream.LATEST) | ||
* .withCustomWatermarkPolicy(new MyCustomPolicyFactory()) | ||
* }</pre> | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, add a quick example that shows using withConsumerArn()
Instant readTime, | ||
String streamName, | ||
String shardId) { | ||
this.data = copyData(data); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a deep copy here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The deep copy here is because of the call to KinesisClientRecord::fromRecord here:
This converts the AWS SDK Record
to a KCL KinesisClientRecord
so we can call deaggregate(), but also causes KinesisClientRecord's data
to be converted to a read-only HeapByteBuffer, which then throws ReadOnlyBufferException when we call array() here:
As far as I can tell a read-only byte buffer will not allow direct access to the backing array which I think is kind of the point. If I'm mistaken please let me know and I can change this. Initially I had fixed it by copying the data to a new byte array in getDataAsBytes() but it seemed more efficient to take the hit only once in the constructor.
I will add a comment here to explain the reason for the deep copy. Alternately, we could probably roll our own version of KinesisClientRecord::fromRecord that doesn't convert the data to a read-only buffer if you'd rather do that.
.apply("Prepare Kinesis input records", ParDo.of(new ConvertToBytes())) | ||
.apply( | ||
"Write to Kinesis", | ||
org.apache.beam.sdk.io.kinesis.KinesisIO.write() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd not recommend to depend on previous version of KinesisIO.Write. Just write messages using standard Kinesis client until we don't have KinesisIO.Write SDK V2 implemented.
} | ||
|
||
/** Read test dataset from Kinesis stream. */ | ||
private void runRead() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we can read messages using two different modes - pull and subscribe - then I think we need to have two different IT Read tests for that.
@aromanenko-dev thank you for the feedback. I’ll work on getting those changes in. I’m sorry, the last week or so has been very busy for me. I will be on vacation from my day job all of next week so I hope to wrap up this one and #9765 as much as I can. I have been testing this against our production data stream for a while now and functionally it’s been working very well but I am still seeing some unexpected latencies at higher throughputs and working to get to the bottom of that. I’ll try to post some info about that as well. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one minor question. I couldn't open the test file, is it required permission?
|
||
@Override | ||
public String toString() { | ||
return ToStringBuilder.reflectionToString(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also new. any reason, @jfarr ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cmachgodaddy I found it useful for debugging, so I left it in.
@cmachgodaddy Which test file? |
diff-test.txt attached in this PR |
@cmachgodaddy Sorry, no idea about permission. I just copied that to gist: https://gist.github.com/aromanenko-dev/b99ad6eb9d7b6de31b8e9607b37aaefb |
Hi @jfarr, do you have any updates on this PR? |
Hi @aromanenko-dev yes I'm sorry I haven't gotten back to this yet but I would like to finish it up. It looks like I have some tests to write and update, some Javadocs to update and add some examples, and to move the code to the new |
@jfarr No pressure, please, take your time and let me know if you need any help. Thanks! |
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
@jfarr Just wanted to ping you about your plans for this PR. |
@aromanenko-dev Hi, I still have plans to finish it. I've made a small amount of progress since the last round of comments. I got the code moved into the |
@jfarr Thank you for update! |
4ae4739
to
4bc0e51
Compare
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
@jfarr Kind ping on this PR. Would you have a time to finish this one? |
@aromanenko-dev Sure, I'm just waiting for a resolution on BEAM-9702. Did you still want to merge that first? |
@jfarr I added my thoughts there. I think we should finish moving |
Hi @aromanenko-dev, yes that's my plan. |
Hello @aromanenko-dev @jfarr What's the status of this feature? I came across a use-case where enhanced fan-out actually became necessary. Do you know if anyone is working on supporting this? Thanks. |
I am no longer working with Beam so it’s unlikely that I will ever finish this. Please feel free to pick up where I left off. |
Thanks
I will have some time to take a look next week, thanks |
@psolomin It would be great! Don't hesitate to ask any questions here or on related Jira if you have. |
Hey !
As the ticket BEAM-9702 is already solved, it have plans to continue the work here? |
cc: @mosche @biancaberdugo I'm not aware of any plans on this but it would be great feature to add. |
Since the request got some activity, I had plans to give it a try within this week @biancaberdugo |
@psolomin Be careful if you intend to resume work from this stale branch. The number of conflicts with the main branch is huge. I would recommend starting from scratch and following the PR description here (if helpful) |
Yes, this PR contains a lot of code and logic that either already implemented or won't be needed, so definitely this feature (enhanced fanout) should be developed from scratch but it will worth to take into account the ideas from here. And of course, it should be implemented only for KinesisIO based on AWS SDK v2. |
@mosche @aromanenko-dev I've prepared a draft PR which passes most basic smoke tests I did manually using my own AWS account. Mind giving a quick glance to validate the approach overall? Once I know I'm on the right direction, I will add more tests and clean my code a bit. It is still very raw now #23540 Thank you! |
#23540 was merged today and I expect it will be released with Beam 2.48.0 If you want to try out a pre-release version - in a couple of days it will be available in Maven snapshots repo, and you will be able to link it via:
Please, note that |
I've also updated examples here: aws-samples/amazon-kinesis-data-analytics-examples#60 Just in case you still use legacy |
Hi @aromanenko-dev, here is my idea of how KinesisIO could support enhanced fanout consumers using
subscribeToShard
from the AWS SDK v2. It's still WIP but I wanted to see what you think about the approach. As you probably know the main difference is that instead of polling getRecords() you subscribe to the shard with a callback. You also have to keep renewing the subscription every 5 minutes but subscribeToShard returns a CompletableFuture that completes when the subscription expires so you can just call join() then subscribeToShard() again in a loop. Other than that I think everything else is the same. So here's the approach I took:consumerArn
param to KinesisIO.Read and plumbed that through to KinesisSource, DynamicCheckpointGenerator, ShardCheckpoint, and ShardRecordsIterator.subscribeToShard
method to ShardCheckpoint that mirrors the starting position logic from getShardIterator. The exception is that it has aresubscribe
param which will cause it to always subscribe from LATEST if true.subscribeToShard
method to SimplifiedKinesisClient that handles callingKinesisAsyncClient.subscribeToShard
and wraps any exceptions.subscribeToShard
method to ShardRecordsIterator. This contains the Kinesis subscriber and error handler so this is where KinesisAsyncClient calls back to withSubscribeToShardEvent
s. This is so we can apply the RecordFilter to the list of returned events after we deaggregate and convert each KinesisClientRecord to a KinesisRecord and before handing it to the consumer that was passed in as a parameter. The CompletableFuture from subscribeToShard gets passed back to this point where we call join(), so this method is blocking for the duration of the subscription. This could maybe use some work to make it more responsive to interruption.subscribeLoop
method that loops on ShardRecordsIterator.subscribeToShard() instead. In startReadingShards() we call ShardRecordsIterator.hasConsumer() which checks whether consumerArn is populated then we run either readLoop() or subscribeLoop() on the executor service accordingly.putRecord
method in ShardReadersPool that gets called from readLoop() and also gets passed to ShardRecordsIterator.subscribeToShard() as the KinesisRecord consumer. This encapsulates putting the record ontorecordsQueue
and incrementing the per-shard record count.resubscribe
. This starts as false and is only updated to true when ackRecord() is called so we can guarantee that we were able to subscribe from the initial checkpoint starting position successfully before we start subsequently resubscribing from LATEST. Possibly this resubscribe param is overkill and it would work just as well if you always subscribe from the latest checkpoint but I haven't tested that yet.I also converted the existing code to using the AWS SDK v2 exclusively, which meant removing the writer from the v2 KinesisIO for now as it does not appear that there's a KPL based on the v2 AWS SDK yet. I think this could be separated from adding the enhanced fanout support if you want though.
wdyt?
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.