-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Kafka Indexing Service] Decouple Druid segment partitioning from Kafka partitioning #4016
Comments
@gianm If ordering is a problem then polled records can be ordered by partition first. So, all records for a single partition are consumed and then records for next partitions and so on. Or records can be sorted on combination of (partition + offset) and then consumed. This way ordering can be maintained independently at each replica. Does this make sense ? |
I think that wouldn't work in practice when Druid is caught up to the Kafka stream (the normal case for real time indexing), it would mean that it won't read later partitions until very late in the task. |
I meant ordering events after they have been polled basically after each However, I can see a problem in ordering after each poll call as well, if poll returns different number of records for partitions (or some partition records are missing) on replica tasks then also the segments can go out of sync. |
Ah I see. In that case, I think you're right, there's a problem since there's no guarantee that different replicas will get the exact same batches from poll. |
one idea that floated was to let KafkaIndexTask create segments per kafka partition, eventually when time comes to publish those segments ... merge all "per partition" segments into a final segment and publish that one instead. |
@himanshug hmm that sounds like it could work. The final segment would need to have a higher version number than the earlier ones but that should be doable. There might also be more than one final segment if the task handled a lot of data. |
@gianm actually final segment should have a higher version as it should overshadow the "per partition" segments served from peons |
yeah, that makes sense. |
ok, had a short chat with @pjain1 and it appears merging at the end is not enough as the original problem faced by customer is related to slower queries due to many small "realtime" segments which would exist even when we do merging at the end. |
Presumably it would still be useful though to your customer, since it would mean a lot fewer segments on historicals after handoff. |
another thing we briefly chatted about removing "maxRowsPerSegment" setting and in that world segment size would be controlled by number of of kafka partitions per task and segment granularity. |
they run batch reindexing to do the merging already. |
The setting prevents having segments that are crazy huge so I think it's still useful. But you can always set it to something crazy huge and then it's effectively disabled. I guess we could also have an "unlimited" option for it like -1. |
@himanshug May I ask running batch reindexing means to submit a merge task? |
@elloooooo I meant hadoop based re-indexing , see http://druid.io/docs/0.9.2/ingestion/update-existing-data.html |
@gianm @dclim I have updated the issue with the approaches that we (@cheddar and @himanshug) came up after a discussion. Please see it that makes sense. Thanks |
Is the idea with (2) that maxRowsPerSegment would become more of a loose guideline than a hard limit? It sounds like replicas would potentially need to go beyond maxRowsPerSegment to reach the same checkpoint as other replicas. |
yes maxRowsPerSegment would not be a hard limit, I assume it should not go beyond a lot if Supervisor is up and running and network is behaving correctly. One thing that can be done is that task pauses when they reach maxRowsPerSegment and get resumed by Supervisor. Still it can go beyond maxRowsPerSegment depending on how much partition offset are out of sync on replicas (which can be a significant number in worst case situations). Currently, cannot think of any better approach. |
I can't think of a way to make it a hard limit without separating segments by Kafka partition, or by doing a leader/follower approach, both of which have overhead. I think it's fine for maxRowsPerSegment to become a loose limit, but IMO the name of the parameter should change since "maxRowsPerSegment" sounds like a hard limit. I do think it makes sense for the tasks to pause themselves when they hit maxRowsPerSegment, notify the supervisor, and then wait for a checkpoint. They might as well do the same thing when they hit taskDuration. @dclim what do you think? |
@gianm currently tasks do not check for taskDuration at all, its the supervisor who does the check for taskDuration for tasks and initiates the shutdown process. |
What I meant was, the tasks could pause themselves when they hit taskDuration too and also notify the supervisor. The two functionalities (duration, max segment size) seem similar with the checkpoint change you proposed and might be nice to run them the same way. |
Oh OK, that sounds great. |
@pjain1 in this section:
Do you actually mean handoff (writing the segment metadata, pushing the segment to S3, and then waiting for it to be loaded by a historical) or writing the segment to local disk and starting a partition? Right now, it does the latter, and handoff only happens after taskDuration elapses. If we implemented handoff before the task completes, we would theoretically be able to get rid of the concept of taskDuration and could support longer running tasks without the indexing tasks accumulating too much data and affecting query performance. Longer running tasks would further reduce the number of segments created and would be a more efficient use of worker capacity. @gianm do you think it makes sense to look into this at this time? |
@dclim yes I meant actual handoff not just persisting to disk |
It would be nice to have the tasks run forever, like @dclim says it'll help performance and it'll also require less worker capacity. Other than implementing multiple stages of handoff per task, we also would need some changes to how ForkingTaskRunner handles logs (it doesn't rotate or upload them until a task exits, which would cause problems if they run forever). It seems worth it though. |
Related to the logging situation: #401 |
@pjain1 are you planning on working on a patch for this? |
Yes..I am working on this. I will try to send a PR by end of this week. |
Great, thanks. Let me know if you want to discuss anything, otherwise looking forward to it! |
Update: Still working on it, will require few more days. |
@pjain1, just wondering, are you still looking at this? anything we could help with? |
Hey, sorry it got stretched out a bit. I am done with the coding and currently testing it on our metrics cluster. I am yet to write tests for it, however I will try to do a PR without it as soon as possible so that you can have a look. Thanks |
@gianm What is the expected behavior in case of stopGracefully being called on task. If a driver is at |
stopGracefully is used for the "restoreTasksOnRestart" feature. It's called on all restorable tasks (canRestore = true) when a middleManager shuts down and is meant to prepare for the task being restored when the middleManager starts back up. Any behavior is ok as long as it satisfies the needs of that feature.
I'm not sure, maybe @dclim knows. |
@pjain1 I think the KafkaSupervisor generally first tries to stop tasks gracefully and then kills them after a timeout; in the case you mentioned, it should be fine to just kill them immediately |
Thanks |
Is this still open? |
@fedecasanova72 yes. @pjain1 is working on this in #4178. |
Currently Kafka indexing task creates a segment partition corresponding to a Kafka partition. However, in many cases Kafka partition does not directly indicate the amount of data that ends up in Druid segments and as a result many times the tasks end up creating a lot of small segments.
Therefore, this proposal is to decouple the segment partitioning from Kafka partitioning and thus allowing to tune Kafka partitions and Druid segment partitions independently. Thus, a Kafka indexing task will create a single segment per segment interval (still consuming from multiple Kafka partitions) and number of tasks running concurrently will decide the number of segment partitions.
Any thoughts ?
Edit:
After having discussion with @cheddar and @himanshug we came up with few ideas -
As soon as a task hits the maxRowsInSegment limit it should signal Supervisor about it and then Supervisor essentially pauses all replica tasks, gets the highest offset for all partitions and sets the new offset on all replicas; asks them to publish and shutdown and start new set of tasks for consuming further offsets. In this case it is a little hard to reason about the number of concurrently running tasks and task duration. Also, the
taskDuration
config will be ignored.As soon as a task hits the maxRowsInSegment limit it should signal Supervisor about it and then Supervisor essentially pauses all replica tasks, gets the highest offset for all partitions and sends these partition offset tuples as a check point to all tasks. If any task fetches a record with offset for any partition which is beyond this check point then the task will create a new appenderator driver with a new appenderator and hands out this record to the new appenderator for consumption. For each record pulled from Kafka the task will check the offset and hands out the record to relevant appenderator based on the check points it has. Once the task has consumed past all the offsets for all partitions for an individual check point then it can close the relevant appenderator and will hand off the segments handled by this appenderator. When the taskDuration is reached for a task then it will publish (or wait for) hand off of segments for each check point in FIFO manner before shutdown.
Second approach looks better and I will try to implement it if it sounds good.
The text was updated successfully, but these errors were encountered: