Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kafka Indexing Service] Decouple Druid segment partitioning from Kafka partitioning #4016

Closed
pjain1 opened this issue Mar 6, 2017 · 40 comments · Fixed by #4815
Closed

[Kafka Indexing Service] Decouple Druid segment partitioning from Kafka partitioning #4016

pjain1 opened this issue Mar 6, 2017 · 40 comments · Fixed by #4815

Comments

@pjain1
Copy link
Member

pjain1 commented Mar 6, 2017

Currently Kafka indexing task creates a segment partition corresponding to a Kafka partition. However, in many cases Kafka partition does not directly indicate the amount of data that ends up in Druid segments and as a result many times the tasks end up creating a lot of small segments.

Therefore, this proposal is to decouple the segment partitioning from Kafka partitioning and thus allowing to tune Kafka partitions and Druid segment partitions independently. Thus, a Kafka indexing task will create a single segment per segment interval (still consuming from multiple Kafka partitions) and number of tasks running concurrently will decide the number of segment partitions.

Any thoughts ?

Edit:
After having discussion with @cheddar and @himanshug we came up with few ideas -

  1. As soon as a task hits the maxRowsInSegment limit it should signal Supervisor about it and then Supervisor essentially pauses all replica tasks, gets the highest offset for all partitions and sets the new offset on all replicas; asks them to publish and shutdown and start new set of tasks for consuming further offsets. In this case it is a little hard to reason about the number of concurrently running tasks and task duration. Also, the taskDuration config will be ignored.

  2. As soon as a task hits the maxRowsInSegment limit it should signal Supervisor about it and then Supervisor essentially pauses all replica tasks, gets the highest offset for all partitions and sends these partition offset tuples as a check point to all tasks. If any task fetches a record with offset for any partition which is beyond this check point then the task will create a new appenderator driver with a new appenderator and hands out this record to the new appenderator for consumption. For each record pulled from Kafka the task will check the offset and hands out the record to relevant appenderator based on the check points it has. Once the task has consumed past all the offsets for all partitions for an individual check point then it can close the relevant appenderator and will hand off the segments handled by this appenderator. When the taskDuration is reached for a task then it will publish (or wait for) hand off of segments for each check point in FIFO manner before shutdown.

Second approach looks better and I will try to implement it if it sounds good.

@gianm
Copy link
Contributor

gianm commented Mar 6, 2017

The task does a separate segment per partition to make replicated ingestion work, see #2703 and #2844 for description of what can go wrong if multiple Kafka partitions go into the same Druid segment.

It is a big pain point though so maybe there's a creative solution to this problem…

@pjain1
Copy link
Member Author

pjain1 commented Mar 6, 2017

@gianm If ordering is a problem then polled records can be ordered by partition first. So, all records for a single partition are consumed and then records for next partitions and so on. Or records can be sorted on combination of (partition + offset) and then consumed. This way ordering can be maintained independently at each replica. Does this make sense ?

@gianm
Copy link
Contributor

gianm commented Mar 6, 2017

I think that wouldn't work in practice when Druid is caught up to the Kafka stream (the normal case for real time indexing), it would mean that it won't read later partitions until very late in the task.

@pjain1
Copy link
Member Author

pjain1 commented Mar 6, 2017

I meant ordering events after they have been polled basically after each consumer.poll() call. I did not meant that the task should consume events from one partition and then gets assigned to next partition and consume that and so on, this will not work.

However, I can see a problem in ordering after each poll call as well, if poll returns different number of records for partitions (or some partition records are missing) on replica tasks then also the segments can go out of sync.

@gianm
Copy link
Contributor

gianm commented Mar 6, 2017

Ah I see. In that case, I think you're right, there's a problem since there's no guarantee that different replicas will get the exact same batches from poll.

@himanshug
Copy link
Contributor

one idea that floated was to let KafkaIndexTask create segments per kafka partition, eventually when time comes to publish those segments ... merge all "per partition" segments into a final segment and publish that one instead.

@gianm
Copy link
Contributor

gianm commented Mar 7, 2017

@himanshug hmm that sounds like it could work. The final segment would need to have a higher version number than the earlier ones but that should be doable. There might also be more than one final segment if the task handled a lot of data.

@himanshug
Copy link
Contributor

@gianm actually final segment should have a higher version as it should overshadow the "per partition" segments served from peons

@gianm
Copy link
Contributor

gianm commented Mar 7, 2017

yeah, that makes sense.

@himanshug
Copy link
Contributor

ok, had a short chat with @pjain1 and it appears merging at the end is not enough as the original problem faced by customer is related to slower queries due to many small "realtime" segments which would exist even when we do merging at the end.

@gianm
Copy link
Contributor

gianm commented Mar 7, 2017

Presumably it would still be useful though to your customer, since it would mean a lot fewer segments on historicals after handoff.

@himanshug
Copy link
Contributor

himanshug commented Mar 7, 2017

another thing we briefly chatted about removing "maxRowsPerSegment" setting and in that world segment size would be controlled by number of of kafka partitions per task and segment granularity.

@himanshug
Copy link
Contributor

himanshug commented Mar 7, 2017

Presumably it would still be useful though to your customer, since it would mean a lot fewer segments on historicals after handoff.

they run batch reindexing to do the merging already.

@gianm
Copy link
Contributor

gianm commented Mar 7, 2017

another thing we briefly chatted about removing "maxRowsPerSegment" setting

The setting prevents having segments that are crazy huge so I think it's still useful. But you can always set it to something crazy huge and then it's effectively disabled. I guess we could also have an "unlimited" option for it like -1.

@elloooooo
Copy link
Contributor

elloooooo commented Mar 10, 2017

they run batch reindexing to do the merging already.

@himanshug May I ask running batch reindexing means to submit a merge task?

@himanshug
Copy link
Contributor

@elloooooo I meant hadoop based re-indexing , see http://druid.io/docs/0.9.2/ingestion/update-existing-data.html

@pjain1
Copy link
Member Author

pjain1 commented Mar 10, 2017

@gianm @dclim I have updated the issue with the approaches that we (@cheddar and @himanshug) came up after a discussion. Please see it that makes sense. Thanks

@gianm
Copy link
Contributor

gianm commented Mar 10, 2017

Is the idea with (2) that maxRowsPerSegment would become more of a loose guideline than a hard limit? It sounds like replicas would potentially need to go beyond maxRowsPerSegment to reach the same checkpoint as other replicas.

@pjain1
Copy link
Member Author

pjain1 commented Mar 10, 2017

yes maxRowsPerSegment would not be a hard limit, I assume it should not go beyond a lot if Supervisor is up and running and network is behaving correctly. One thing that can be done is that task pauses when they reach maxRowsPerSegment and get resumed by Supervisor. Still it can go beyond maxRowsPerSegment depending on how much partition offset are out of sync on replicas (which can be a significant number in worst case situations). Currently, cannot think of any better approach.

@gianm
Copy link
Contributor

gianm commented Mar 10, 2017

I can't think of a way to make it a hard limit without separating segments by Kafka partition, or by doing a leader/follower approach, both of which have overhead. I think it's fine for maxRowsPerSegment to become a loose limit, but IMO the name of the parameter should change since "maxRowsPerSegment" sounds like a hard limit.

I do think it makes sense for the tasks to pause themselves when they hit maxRowsPerSegment, notify the supervisor, and then wait for a checkpoint. They might as well do the same thing when they hit taskDuration.

@dclim what do you think?

@pjain1
Copy link
Member Author

pjain1 commented Mar 10, 2017

@gianm currently tasks do not check for taskDuration at all, its the supervisor who does the check for taskDuration for tasks and initiates the shutdown process.

@gianm
Copy link
Contributor

gianm commented Mar 10, 2017

What I meant was, the tasks could pause themselves when they hit taskDuration too and also notify the supervisor. The two functionalities (duration, max segment size) seem similar with the checkpoint change you proposed and might be nice to run them the same way.

@pjain1
Copy link
Member Author

pjain1 commented Mar 10, 2017

Oh OK, that sounds great.

@dclim
Copy link
Contributor

dclim commented Mar 13, 2017

@pjain1 approach 2 sounds good to me, I agree that maxRowsPerSegment as a guideline rather than a hard cutoff makes sense. We would also be able to support something like #3439 now.

@dclim
Copy link
Contributor

dclim commented Mar 13, 2017

@pjain1 in this section:

Once the task has consumed past all the offsets for all partitions for an individual check point then it can close the relevant appenderator and will hand off the segments handled by this appenderator.

Do you actually mean handoff (writing the segment metadata, pushing the segment to S3, and then waiting for it to be loaded by a historical) or writing the segment to local disk and starting a partition? Right now, it does the latter, and handoff only happens after taskDuration elapses. If we implemented handoff before the task completes, we would theoretically be able to get rid of the concept of taskDuration and could support longer running tasks without the indexing tasks accumulating too much data and affecting query performance. Longer running tasks would further reduce the number of segments created and would be a more efficient use of worker capacity. @gianm do you think it makes sense to look into this at this time?

@pjain1
Copy link
Member Author

pjain1 commented Mar 13, 2017

@dclim yes I meant actual handoff not just persisting to disk

@gianm
Copy link
Contributor

gianm commented Mar 13, 2017

It would be nice to have the tasks run forever, like @dclim says it'll help performance and it'll also require less worker capacity.

Other than implementing multiple stages of handoff per task, we also would need some changes to how ForkingTaskRunner handles logs (it doesn't rotate or upload them until a task exits, which would cause problems if they run forever). It seems worth it though.

@gianm
Copy link
Contributor

gianm commented Mar 15, 2017

Related to the logging situation: #401

@gianm
Copy link
Contributor

gianm commented Mar 16, 2017

@pjain1 are you planning on working on a patch for this?

@pjain1
Copy link
Member Author

pjain1 commented Mar 16, 2017

Yes..I am working on this. I will try to send a PR by end of this week.

@gianm
Copy link
Contributor

gianm commented Mar 16, 2017

Great, thanks. Let me know if you want to discuss anything, otherwise looking forward to it!

@pjain1
Copy link
Member Author

pjain1 commented Mar 19, 2017

Update: Still working on it, will require few more days.

@gianm
Copy link
Contributor

gianm commented Apr 10, 2017

@pjain1, just wondering, are you still looking at this? anything we could help with?

@pjain1
Copy link
Member Author

pjain1 commented Apr 10, 2017

Hey, sorry it got stretched out a bit. I am done with the coding and currently testing it on our metrics cluster. I am yet to write tests for it, however I will try to do a PR without it as soon as possible so that you can have a look. Thanks

@pjain1
Copy link
Member Author

pjain1 commented Apr 12, 2017

@gianm What is the expected behavior in case of stopGracefully being called on task. If a driver is at mergeAndPush stage then the task will not finish until all drivers have handed-off there segments otherwise just persist everything and be done ?
Also when the task fails to publish even after pending completion timeout, KafkaSupervisor stops the task gracefully and does not kill them. Is there any reason for this ?

@gianm
Copy link
Contributor

gianm commented Apr 12, 2017

@pjain1

What is the expected behavior in case of stopGracefully being called on task. If a driver is at mergeAndPush stage then the task will not finish until all drivers have handed-off there segments otherwise just persist everything and be done ?

stopGracefully is used for the "restoreTasksOnRestart" feature. It's called on all restorable tasks (canRestore = true) when a middleManager shuts down and is meant to prepare for the task being restored when the middleManager starts back up. Any behavior is ok as long as it satisfies the needs of that feature.

Also when the task fails to publish even after pending completion timeout, KafkaSupervisor stops the task gracefully and does not kill them. Is there any reason for this ?

I'm not sure, maybe @dclim knows.

@dclim
Copy link
Contributor

dclim commented Apr 12, 2017

@pjain1 I think the KafkaSupervisor generally first tries to stop tasks gracefully and then kills them after a timeout; in the case you mentioned, it should be fine to just kill them immediately

@pjain1
Copy link
Member Author

pjain1 commented Apr 13, 2017

Thanks

@fedecasanova72
Copy link

Is this still open?

@jihoonson
Copy link
Contributor

@fedecasanova72 yes. @pjain1 is working on this in #4178.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants