-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed Shards Prevent Startup #12
Comments
It does appear that eventually the worker will pick up the other shards, but it also keeps attempting to start consumers for the closed shard. I would imagine on a long-lived stream this would only keep happening more often, and cause more and more (unpredictable) start-up latency. It also takes multiple shard syncs before it randomly chooses a different unclosed shard which further exacerbates the long startup time, so I'm still a bit confused about how or why that works the way it does. In this example, it takes almost 5 minutes to start listening on two shards, but I've seen (most) other instances of this test where it keeps trying to take It's also worth mentioning that I tried this with vmware-go-kcl and had the same result. |
Hello. To workaround that:
In that way, resharding are handled in an automatic way for us. |
@spentakota any input on this problem? @arl's input is helpful but this seems like a real problem, and not something that the library user should need to concern themselves with. Resharding can potentially never succeed because consumers get stuck re-leasing closed shards forever. If this is intended behavior, it should be mentioned in the documentation, and an intended solution should be outlined (potentially what @arl suggested or some other recommended solution). |
@calebstewart @arl - A closed shard might still have records to process. Records do not just go away as soon as you split the shard. New records get written to the post-split/post-merge shards, but the stream consumer may not have processed all the records on the old shard(s) yet. Records will stay available for consumers based on the stream's configured retention period. |
@vmwjc I have this at the bottom of my
The issue that I have is that when the scaling of kinesis shards happens, I still get this behavior of it looping over the Parent shards that no longer have any records to read. What method needs to happen to ensure the The I agree with @calebstewart that this should be handled within the library, probably within the It does look like @arl has the right solution for making sure Current state of my dynamo checkpoint table after scaling Kinesis shards, and not including the explicit call to I also confirmed this has the same behavior in both the |
As much as it makes sense to me conceptually, the upstream KCL documentation (which this library attempts to stick closely to) indicate that library users should:
The above comes from the AWS docs here and here Given that this library attempts to maintain rough API compatability with the upstream KCL, I think the current behavior makes sense. The documentation on such an important component feels a little lacking, but since this library defers to AWS KCL documentation, that's not technically a problem here. That being said, an example project that uses this library would be immensely helpful. This would be akin to the examples found in the AWS docs here. Maybe converting the Python example to Golang would be helpful for new users. If I find some time, I'll make a PR with a basic example that mimics the official Python KCL consumer examples linked above. edit: mixed up the upstream KCL version links |
Ah, they even have the test case showing this: That's good enough for me as an example. It also follows the same premise that @arl showed in their implementation. I have this in mine as well, and this issue is no longer a problem for me. The Dynamo table gets the |
Describe the bug
To preface, I am new to Kinesis and the KCL, and this could be a configuration issue (either with Kinesis itself or with the KCL). If I'm doing something incredibly wrong, please let me know. I'd be happy if this was "user error". 😛
As a basic example, if I have a Kinesis stream with a single shard, everything works as expected. When the worker starts up, it takes a lease for that shard, and starts processing records. However, if I re-shard the stream to instead have two shards, then AWS closes the first shard, and creates two new shards.
The result is that the KCL worker repeatedly attempts to take out a lease on the closed shard, and then immediately closes it. This coupled with the fact that the worker will only start a job for a single shard per shard sync and always iterates over shards ordered by ID (meaning older shards first) means that the worker never actually starts jobs for the other (valid) shards. It just repeatedly tries to take a lease for the closed shard, then the polling job exits immediately since the shard is closed. On the next sync, it once again grabs a lease for the closed shard, and the process repeats forever. It does not seem to ever stop visiting the closed shard, and because it will only look at one shard per-sync, it never starts jobs for the other two valid shards.
Reproduction steps
The worker is as simple as it gets. I observe the same behavior when using enhanced fan-out or polling.
Running the above example with a single shard, before re-sharding looks like this:
Running after re-sharding (where
shardId-000000000000
is now closed) looks like:As you can see, no polling shard consumers are started, so no records are being processed because the worker only attempted to start a single consumer, which immediately exited. When a shard sync triggered ~60 seconds later, it tried to pick up the same closed shard, and still did not start processing records.
Expected behavior
The worker starts up, and takes leases up to
MaxLeasesForWorker
, and starts a polling (or enhanced fan-out) job for each of the leased shards immediately.Additional context
There are multiple things that don't make sense to me going on here.
MaxLeasesForWorker
, and start jobs for each shard that it leases, but in practice it only takes out one lease at a time per-sync. If you have many shards, and use the defaultShardSyncInterval
(1 minute), then it will take a very long time for the worker to startup and listen on all the expected shards. Even if you had a small number of shards like 10, and 2 workers, then it would take 5 full minutes at a minimum to even startup a single worker assuming they each lease 5 shards (these are just made up numbers to explain the point; not saying that configuration makes sense in a production context).Regarding the "only starting one job per sync" question, it seems this happens because of the
break
statement here. I'm struggling to understand why this happens at all, to be honest.Is it normal to take an extremely long time to start up a worker? Is this a bug? Do people just set the shard sync interval much lower than default in practice? I can't find anything in the upstream AWS Java KCL documentation that mentions this. It all seems to indicate that on startup, a worker will take out leases up to the max lease limit, and start the appropriate processing jobs.
The text was updated successfully, but these errors were encountered: