-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rolling Supervisor restarts at taskDuration #14396
Conversation
It will be really nice capability as it will make task scheduling more seamless. But is there going to be any drawback of this change? If the tasks start in a staggered fashion, would that affect processing throughput in any way? |
None that I can think of. The only thing I can imagine is if tasks are stuck in pending for such a long time, that it takes longer than the configured task handoff for all the tasks to cycle. But in that case, the tasks are still restarted from the earliest to latest, so I suspect everything should be ok. Each kafka task is handling reading form the partitions independently, so this should have no impact on ingestion throughput |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am +1 on not having this on by default (default would still be the current behavior) and having the config as number of tasks to roll at one time configurable per each supervisor). I am -1 on this PR as a whole due to the lack in unit testing for when the new config is used. I also have a few minor questions.
.../main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
Outdated
Show resolved
Hide resolved
I looked into writing tests for this and struggled. SeekableStreamSupervisor is a huge abstract class, and the part that needs testing uses If there are any suggestions or pointers on how to write tests for this class, I'm open to trying it out. Otherwise, I think the fact that this is disabled by default and undocumented provides the safety net we need that it won't break for Druid users. |
Description
Currently, the number of slots needed for for a streaming supervisor to handover quickly is 2 * number of tasks needed for reading from the stream. This is because when the taskDuration expires, the previously reading tasks need to publish the segments, and if this takes time, and there are no available slots on the cluster, Druid will not be reading from the stream until a slot is available.
This change makes it so that during regular operations, tasks are rolled over one at a time, so that an operator can plan to have a capacity of number of tasks need to read from the stream + 1. For config changes to the supervisor, like taskCount, all tasks will need to stop, so an operator should factor this in to their capacity planning. If config changes are rare, this will make it so that you do not need as much free capacity in the cluster for stable operations.
Current behavior
Here are some screenshots of metrics from a test where we ran streaming ingestion in a cluster with a capacity of 24 slots. The was ingesting data from a kafka topic with 24 partitions using 10 tasks.
During this test, there was a batch ingestion task running that took 7 slots.
This screenshot shows 17 tasks being used from ~ 8am - 9:35am. At 9:35am, the batch task dies, and the task count goes down to 10. The screenshot shows spikes in used task slots at 7:54, 9:34 and from 8:54-56.
These spikes are when the taskDuration has expired and they need to rollover. This shows that the system needs to preserve capacity for the tasks to publish segments otherwise there is a risk of increasing kafka lag. The next screenshot shows kafka lag during this run at ~8:55, which shows that there were some partitions that could not get a slot, and so they experienced a spike in lag.
Behavior when setting
stopTaskCount
Here we see the number of tasks rolling over at any point in time is capped, and that it takes ~ 10 minutes for all the tasks with an expired task duration to stop. Kafka lag metrics remain very low and there are no spikes when the taskDuration is hit.
Why not on by default / documented
This behavior of rolling a subset of tasks over is not on by default because it is not clear to me what a good default would be. Some defaults I've considered
The first 2 rely on assumptions that the time it takes to publish segments will be predictable, which may not be the case, so we can't provide guarantees for capacity planning that the number of tasks you will need is X.
The 3rd option is nice for capacity planning, but could result in tasks holding on to locks for a much longer time than anticipated if the number of tasks in a supervisor is large.
The config
stopTaskCount
is not documented so that we do not need to support it in the future when we decide what the best behavior is and what it should be by default.This PR has: