-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka Index Task that supports Incremental handoffs #4815
Conversation
|
@pjain1 thanks for your work on this issue. I'll review soon. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partially reviewed, will continue.
))); | ||
this.topic = ioConfig.getStartPartitions().getTopic(); | ||
this.sequences = new CopyOnWriteArrayList<>(); | ||
this.publishExecService = MoreExecutors.listeningDecorator(Execs.singleThreaded("publish-driver")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to create executor service somewhere else instead of constructor or this executor would be created/started every time a KafkaIndexTask instance is created e.g. when user supervisor creates them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably startExecutors()
is the right place.
@@ -276,13 +318,124 @@ public KafkaIOConfig getIOConfig() | |||
return ioConfig; | |||
} | |||
|
|||
private void startExecutors() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we call this createPublishExecService() ?
@@ -276,13 +318,124 @@ public KafkaIOConfig getIOConfig() | |||
return ioConfig; | |||
} | |||
|
|||
private void startExecutors() | |||
{ | |||
// start publish executor service |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary comment
{ | ||
// start publish executor service | ||
publishExecService.submit( | ||
(Runnable) () -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is type cast to Runnable needed ?
if (getContext() != null && getContext().get("checkpoints") != null) { | ||
log.info("Got checkpoints [%s]", (String) getContext().get("checkpoints")); | ||
final TreeMap<Integer, Map<Integer, Long>> checkpoints = toolbox.getObjectMapper().readValue( | ||
(String) getContext().get("checkpoints"), new TypeReference<TreeMap<Integer, Map<Integer, Long>>>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it required for checkpoints map to be represented as string in context ? can't we have non-string structures stored in task context ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
context can hold anything but as far as I remember there was serialization issue that's why I chose String. Will look at it again.
Added fix for #4743 |
thanks @pjain1, I've started reviewing this and will post comments in the next few days |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partially reviewed, seems pretty sound so far.
Do you have any performance benchmarks before and after the changes, even anecdotally? I'm interested in whether any of the additions to the while (stillReading)
loop had any measurable impact on throughput or maybe even if throughput is increased for Kafka topics with many partitions since you're allocating and persisting fewer segments.
ioConfig.getStartPartitions() | ||
.getPartitionOffsetMap() | ||
.get(partitionOffsetEntry.getKey()) | ||
) >= 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why >= 0 ? Should this just be == 0?
) >= 0 | ||
), "Sequence offsets are not compatible with start offsets of task"); | ||
log.info("Setting next offsets to [%s]", sequences.size() == 0 ? endOffsets : sequences.get(0).startOffsets); | ||
nextOffsets.putAll(sequences.size() == 0 ? endOffsets : sequences.get(0).startOffsets); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think sequences.size() can ever be 0 at this point so the conditional can probably be removed. But if it could be 0, you're using endOffsets
when I think you should be using the starting offsets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are when sequence size can be never be 0 if restored metadata is null (happens when task runs for first time). I was trying to handle the case when task is restarted after all sequences are published but before task shuts down, however in this case restored metadata will not be null, fixed the conditions accordingly. Thanks
publishingSequences.add(sequenceMetadata.getSequenceName()); | ||
try { | ||
Object result = driver.persist(committerSupplier.get()); | ||
log.info("Persist completed with results [%s]", result); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might as well combine these log messages
} | ||
} | ||
|
||
if (stopRequested) { | ||
// if stop is requested or task's end offset is set by call to setEndOffsets method with finish set to true | ||
if (stopRequested.get() || (sequences.get(sequences.size() - 1).isCheckpointed() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could sequences.size()
ever be 0 here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no sequences
list should never be empty inside the while (stillReading)
loop
checkAndMaybeThrowException(); | ||
|
||
if (!ioConfig.isPauseAfterRead()) { | ||
persistAndPublishSequences(committerSupplier); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should probably be called maybePersistAndPublishSequences
since it doesn't do this on every call.
if (tuningConfig.getHandoffConditionTimeout() == 0) { | ||
handedOff = handoffFuture.get(); | ||
handedOffList = Futures.allAsList(handOffWaitList).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, you might have a visibility concurrency issue here using an ArrayList for handOffWaitList
, even with the sentinel (http://gee.cs.oswego.edu/dl/cpj/jmm.html).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix, thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to CopyOnWriteArrayList
} else { | ||
handedOff = handoffFuture.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS); | ||
handedOffList = Futures.allAsList(handOffWaitList) | ||
.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When this throws a TimeoutException
, it looks like it'll cause the task to report as FAILED. Is this the behavior we want, or should we just log/emit errors and then report as SUCCESS?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current behavior is that it would report FAILED, there is a unit test for that also. But yes reporting task as SUCCESS and emitting an alert before that seems correct behavior, fixed it.
); | ||
for (SegmentsAndMetadata handedOff : handedOffList) { | ||
if (handedOff == null) { | ||
log.warn("Handoff failed for segments [%s]", handedOff.getSegments()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary brackets around %s
} | ||
} | ||
catch (InterruptedException | RejectedExecutionException e) { | ||
publishExecService.shutdownNow(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this better to put in the finally block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved
} | ||
} | ||
catch (InterruptedException | RejectedExecutionException e) { | ||
publishExecService.shutdownNow(); | ||
appenderator.closeNow(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems redundant with the closeNow()
call in the finally block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to use closeNow
in two cases - first when middle managers asks the task to stop, which is this case where an InterruptedException would be thrown. The second case is when some exception is thrown during processing and we want to fail the task which what closeNow
in finally captures. So, I could not find any easy way to capture the first case in finally block, therefore an explicit call here. Does this make sense ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah okay. One way you could do it is to just set throwableAtomicReference
in the InterruptedException catch, but this is fine.
@@ -660,14 +920,20 @@ private Access authorizationCheck(final HttpServletRequest req, Action action) | |||
return access; | |||
} | |||
|
|||
// used for unit tests | |||
Appenderator getAppenderator() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the @VisibleForTesting
annotation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wanted to do that but #4874 (comment) :)
Anyways I can add it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hah. I think that obviously the best option is if you're able to test this without relaxing visibility, but if not, then I think using the annotation is better than using a comment because at least it can be static analyzed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added @VisibleForTesting
} | ||
} | ||
|
||
final SequenceMetadata sequenceMetadata = sequences.get(sequences.size() - 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May as well define this earlier and use it instead of repeatedly doing sequences.get(sequences.size() - 1)
} | ||
} | ||
|
||
private synchronized void persistState(final TaskToolbox toolbox) throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any particular reason you're passing toolbox
in here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
return sentinel; | ||
} | ||
|
||
public void setCheckpointed() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't used outside of this class. If checkpointed
just means that endOffsets
are not all Long.MAX_VALUE, there's probably some cleanup you can do around this too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have removed this method. However, not sure what you mean by cleanup, do you mean remove the checkpointed
and use endOffsets
instead ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that's what I was suggesting, but I was more thinking out loud. If you feel it's more clear to have an explicit checkpointed
flag that basically means 'setEndOffsets()
has been called' that's fine too. Maybe that's better and less error-prone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, seems better to have the flag
@@ -290,19 +306,19 @@ public DateTime getStartTime(final String id) | |||
|
|||
public boolean setEndOffsets(final String id, final Map<Integer, Long> endOffsets) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can remove this, it isn't being called anywhere
@@ -153,6 +156,9 @@ | |||
import java.util.concurrent.TimeUnit; | |||
import java.util.concurrent.TimeoutException; | |||
|
|||
//CHECKSTYLE.OFF: Regexp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
@@ -153,6 +156,9 @@ | |||
import java.util.concurrent.TimeUnit; | |||
import java.util.concurrent.TimeoutException; | |||
|
|||
//CHECKSTYLE.OFF: Regexp | |||
//CHECKSTYLE.ON: Regexp | |||
|
|||
public class KafkaIndexTaskTest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add some tests around early publishing and loading sequence data from disk?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
final List<SegmentIdentifier> theSegments = new ArrayList<>(); | ||
synchronized (segments) { | ||
sequenceNames.stream() | ||
.filter(sequenceName -> !publishingSequences.contains(sequenceName)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
publishingSequences
never gets written to so this is always true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The motivation behind using this in-memory Set is to prevent duplicate publishes of same sequence during the lifetime of this driver. Just the persisted segment state cannot be used for de-duplication because upon task restart some segments may be in PUBLISHING
state because they were being published before restart but could not finish. So, the call to publish them again should actually start publishing them again as it is not a duplicate request in the current lifetime of driver.
The thing I am missing here though is a statement like publishingSequences.add(sequenceName)
in forEach
. I will add that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pjain1 I'm working on fixing a bug in indexTask and this needs to refactor AppenderatorDriver. To avoid making another bug, I want to make sure what this code means.
upon task restart some segments may be in PUBLISHING state because they were being published before restart but could not finish. So, the call to publish them again should actually start publishing them again as it is not a duplicate request in the current lifetime of driver.
- The segment state is changed to
PUBLISHING
, but is never checked. Why is it needed? - We can assume that all segments in the
segments
map are not published yet. If this is acceptable, is it possible to achieve the same thing by republishing segments without checking their state when publish() is called? Or, is it possible that publish() is called for the same sequence multiple times?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At any point of time, segment can be in 3 state -
- ACTIVE - Data is being added actively to it.
- INACTIVE - No data will be added to this segment, call to
moveSegmentOut
causes the segment to get into this state. Note that this segment is not yet started publishing by the AppenederatorDriver. - PUBLISHING - Segment is being actively published, the segment information will be removed from
segments
map once publish succeeds.
Currently, the driver does not distinguish between INACTIVE and PUBLISHING state for making decision like preventing duplicate publishes. The reason is that driver's public publishing methods does not enable the caller to publish at segment level, it allows publishing at sequence level. So, publishingSequences
Set is used to prevent duplicate sequence publishes. This is my understanding of the code and on the basis of this here are the answers to your questions -
The segment state is changed to PUBLISHING, but is never checked. Why is it needed?
It looks like PUBLISHING
state in not required and just changing the state to INACTIVE
should work unless some task actually want to distinguish between moved out segments and segments that are being published.
We can assume that all segments in the segments map are not published yet. If this is acceptable, is it possible to achieve the same thing by republishing segments without checking their state when publish() is called? Or, is it possible that publish() is called for the same sequence multiple times?
Currently, segments in the segments
map might be in either active, inactive or being published state. The segment information is removed only when it is successfully published using a Future
callback in publish
method.
Feel free to ask for any more information it its still not clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pjain1 thanks for the detailed explanation. I have one more follow-up question.
publishingSequences Set is used to prevent duplicate sequence publishes.
When is it possible to publish the same sequence multiple times? Here are my understandings.
- A new sequence is created whenever the supervisor sets a new end offset.
- The sequence name is always increased by 1 when a new sequence is created, so it's unique for each sequence and cannot be duplicated.
- Once a sequence is published, it is removed from the
segments
map. - Upon task restart, the restored segments in the
segments
map are not published yet. In other words, the segments which are not in thesegments
map are already published.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate publishes can only happen when a caller calls publish
for some sequence multiple times or calls publishAll
multiple times or some combination of both. I believe Kafka Index task does not do that so it should be ok. However, I see that IndexTask calls publishAll
twice at lines 711 and 743, so it needs to be checked if publishingSequences
Set is removed then no problem will happen and also in future if these methods are used anywhere else nothing problematic happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! IndexTask actually calls publish() synchronously, so it should also be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you are planning to remove publishingSequences
Set then I just noticed that calling publishAll
twice one after another immediately can be problematic as the code is relying on Future callback to remove the published sequences from segments
map. However, as far as I know callbacks on future are not guaranteed to be executed before get
on that future returns. So, it might happen that the sequence keys are not removed from the map before second call to publishAll
happens so please check this edge case.
Also I think while removing sequence keys from map in the callback using this statement -
sequenceNames.forEach(segments::remove);
should be done is a synchronized (segments)
block. I should have done that, its my mistake. Does this make sense ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it makes sense. I'll fix them too. Thanks for sharing!
@Override | ||
public void onSuccess(SegmentsAndMetadata result) | ||
{ | ||
if (result != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe log an error if this come back null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If result is null
then KafkaIndexTask will throw ISE
public void onFailure(Throwable t) | ||
{ | ||
// Do nothing, caller should handle the exception | ||
log.error("Error publishing sequences [%s]", sequenceNames); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log the stack trace?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KafkaIndexTask actually fails when there is any exception during publish and also logs that exception. So, just wanted to prevent duplicate traces, I guess I can add it here as well, doesn't hurt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For both onSuccess
and onFailure
I am not logging much and relying on caller to log and take appropriate action. Do you think it is ok, its same as the current behavior ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, that seems fine to me.
Right now it looks like tasks only get checkpointed when the number of rows in a segment exceed Also, it might make sense to add a 'perform checkpoint now' route to the supervisor API so that users can manually trigger an immediate checkpoint since it doesn't look like it would be a lot of additional work. Asides from being useful for testing, I could also see it being useful for cases where users are working through a big backlog of data in Kafka and their normal steady-state configs would fail with the load; rather than submit an updated supervisor, they could maintain the same configuration but just manually trigger checkpoints to hand off the data in smaller chunks. I don't see this being as useful as the time-based checkpointing, but again maybe worth it since there's very little to do to enable it. |
for (String taskId : taskGroup.taskIds()) { | ||
TreeMap<Integer, Map<Integer, Long>> checkpoints = null; | ||
try { | ||
checkpoints = taskClient.getCheckpoints( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider doing this asynchronously, otherwise if they have a lot of tasks and they're all misbehaving and not responding, this'll take a really long time to timeout and fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for suggestion, will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to asynchronous gets, please review
try { | ||
|
||
if (endOffsets.equals(taskGroup.partitionOffsets)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this condition is true, it just logs a warning and then continues on with the same logic. Is this missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, however actually this condition should never be true, don't remember why I added this probably can be removed, currently just changed to return endOffsets
{ | ||
Map<Integer, Long> startPartitions = taskGroups.get(groupId).partitionOffsets; | ||
Map<Integer, Long> endPartitions = new HashMap<>(); | ||
Map<Integer, Long> endPartitions = new HashMap<>(); // TODO if endOffsets were already set |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we resolve this TODO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed the TODO, this case need not be handled because if endOffsets are set that means the TaskGroup has moved from active to publish pending state and supervisor never creates tasks for pending publish TaskGroup.
@dclim Thanks for the review comments. I have resolved some and working on others. Regarding, indexing performance, I am also curious to compare that but didn't got a chance to do so. However, we have been running the new code with same task count as with previous code so guessing not much changed (it probably might have decreased a bit because of extra overhead). I am planning to deploy the previous indexing service and compare the ingest/events/processed metrics with current one. Regarding, time based checkpointing - are you suggesting when enabled apart from maxRowsInSegment threshold, time threshold will be considered as well and whichever threshold is hit earlier, checkpointing will be done ? In case of time based checkpointing, there is a potential to create small sized segments, so I am not sure what advantage it gives us over just waiting for either maxRowsInSegment limit to hit or taskDuration to get over. The only advantage I can see is if taskDuration is too high and ingest event rate is too low then there is potential to reindex events in case we loose all task replicas. Regarding, perform checkpoint now - I understand it might be a good feature to have. However, the example that you specifically mentioned I am not sure why it is useful when there is a big backlog, how is handing off data in smaller chunks help ? Is it just winning the race against Kafka retention rules so that we don't loose much if Kafka drops data before the task could publish segments for a sequence or anything else ? We can discuss this in next dev sync as well if that is more convenient. |
@pjain1 I tried this on one of our internal clusters and found one issue: when upgrading from an older version, since the AppenderatorDriverMetadata changed, any restored tasks immediately fail with NPE. They do seem to get retried fine, and the system eventually recovers, but it would be nice to handle it better if possible. Ideal would be to be able to restore older metadata in some sane way. If that's not possible then at least a better error message would be good, so users understand the error is related to upgrading and should be recovered soon. I'm wondering, are there any other potential issues related to upgrading? Has the protocol changed in such a way that old supervisors cannot talk to new tasks, or the new supervisor cannot talk to old tasks? I'll keep this patch running on our internal clusters so we can see how it shakes out there. The error I got was,
|
@gianm Yes you are correct, AppenderatorDriverMetadata has changed and would cause tasks to not recover. There are protocol changes on both supervisor and as well as task side. Upgrading overlord first is disastrous. On supervisor restart it will ask for checkpoints from the tasks and they wouldn't return anything and overlord will keep on killing tasks until Middle managers are running new code and can return checkpoints. If Middle Managers are upgraded first then there is a slight chance that some checkpoint limit is hit before overlord is upgraded and checkpoint action is submitted to supervisor but supervisor does not know how to handle it. As of now, only clean way of upgrading is to stop all supervisors and upgrade the code on both overlord and middle manager and resubmit them. I would also suggest to have at least all the commits till "fix pendingCompletionTaskGroup check - 96fb88b" in the code that you are trying on internal cluster. |
@pjain1 thanks for the details. Is there anything (hopefully somewhat simple) that we can do to make upgrading easier for people? "stop all supervisors and upgrade the code on both overlord and middle manager and resubmit" is pretty labor intensive and will incur some noticeable downtime. It's not absolutely critical (KIS is still experimental after all…) but it would be nice. Maybe have the supervisor and tasks fall back to some legacy mode if they can't confirm their counterpart is on a new enough version?
Thanks for the tip. I was missing that commit before, but will push it out to our internal clusters today. |
@gianm On the task restore part probably this can be tried - a version number can be stored with driver metadata. While restoring, the task can check the version and populate the driver data structures accordingly. If this is done middle managers can be upgraded first, however even after this, there is still a chance that the task might hit maxRowsInSegment before overlord is upgraded and try to submit |
IMO, if we can think of a way to support it, ideally it would be best to support a single middleManager running the new version for a long time, while the overlord and other middleManagers run the old version. That's a common pattern that people use when upgrading clusters (try one MM/historical first and then do the rest). It's also common for rolling updates of large clusters, which can take a while to roll through all middleManagers. Would it work to do something like: if CheckPointDataSourceMetadataAction is unrecognized by the server, the task could give up and just not do the incremental publish, and keep going? |
@gianm probably overlord can send a flag in the context, indicating whether it supports checkpointing or not. If not then task can revert back to previous logic of moving out segments out of active list than checkpointing. However, running new code on a single MM might not test much as its not doing incremental handoffs then its not actually testing the new feature for which they are supposed to be upgraded. Running new code on one or subset of MM also means that replication cannot be used as replicas should not get mixed up on new and old MM otherwise they will do different things. |
That sounds like a nice way of handling the compatibility.
That's fine, there's a lot of other stuff changing in 0.11.0 and it's still valuable to stage the rollout one machine at a time, if uptime of the system is critical.
That's less fine, since I guess I was hoping for a more seamless upgrade. If it's not feasible to do it seamlessly then I can live with it, but I think we should at least provide people a tool to help do the upgrade. Maybe a tool that shuts down all supervisors, monitors the tasks for when they end, and then can later restore them to their last running state. |
Awesome, I'm looking forward to seeing the results of the side-by-side comparison with the previous logic.
Yes that's what I'm suggesting. I'm looking at this from the angle of being able to support long-running tasks, which is interesting because currently having short task durations can create unnecessary shards at the task transition boundaries, whereas currently long task durations means the peon is responsible for serving more queryable data, merging time is longer, and as you mentioned there's a lot of re-work in the case of a failure. Being able to run long task durations with periodic handoffs will eliminate all of these issues. It would also be interesting if we can get to the point where we don't need 2x worker capacity for a given indexing load (currently we generally need capacity so we can have one task reading while another task is publishing). Having long-running tasks should help here. But for long-running tasks in general, depending solely on the maxRowsInSegment threshold is problematic in the cases where you are receiving less data in a time interval than maxRowsInSegment. For example, say you have taskDuration PT24H, segmentGranularity HOUR, maxRowsInSegment 5M, and you are receiving 4M post-rollup rows per hour. Since maxRowsInSegment never gets hit, the indexing task never does an intermediate handoff and holds onto all that data until taskDuration elapses which is the same behavior as the previous implementation and has all the same issues. Being able to set some kind of intermediateHandoffPeriod would be helpful here. I do agree it can lead to small segments potentially, but I think having this lever available for tuning (and having the option of not using it as well) is helpful.
Hm, typically the problems I see with handling backlogs of data is that since there's no way to limit the number of messages read or the rate of message reading from Kafka, tasks wind up ingesting far more data than they would under steady-state conditions which leads to various forms of out-of-memory conditions. One of the more annoying ones is where they run out of heap space for indexing but can't increase I think it might be interesting to consider supporting rate limiting as a solution for this scenario. But in the meantime, being able to manually trigger handoffs would provide a workaround that doesn't involve having to change the ingestion spec or make configuration changes. To me, having time-based intermediate handoffs is more important than manually-triggered checkpointing. |
@dclim Time base handoff sounds reasonable to me now. Probably it can be part of next PR, what do you think ?
Even if task ingests more data, the memory consumption is capped by maxRowsInMemory in each case (steady state and back log situation). When this limit is hit in-memory rows are persisted and during this time ingestion is blocked so memory consumption should ideally not go up. Not sure why it is happening. Anyways not opposed to the idea of manual trigger but just trying to understand it more. |
@pjain1 cool, doing a follow-on PR is fine. It's not the heap memory that's a problem; as you mentioned that's bounded by maxRowsInMemory. The problem is that decreasing maxRowsInMemory tends to generate more intermediate segments and the number of off-heap merge buffers used for merging intermediate segments is not bounded and increases with more intermediate segments until you hit the direct memory limit or run out of physical memory. Probably ideally those would also be bounded and merging would be done incrementally if it requires more buffers than are available, but anyway, that's an issue that we see once in a while. |
I think ultimately, it would be better to fix this by doing a multi-phase merge (not merging more than N segments per pass) so we don't need to worry about artificial limits. |
hey @pjain1, do you have more anticipated changes coming or is this ready for re-review? If the latter, could you fix the conflicts? |
@dclim I am working on making the PR backwards compatible as discussed in last dev sync, I am almost done. Also I need to add few unit tests around early publishing, apart from these I don't anticipate any other change. |
@pjain1 sounds great, let me know when things are ready for review. Thank you! |
@gianm @dclim I have made changes to the code so that rolling upgrade can be supported (given overlord is updated last). Now, the overlord sends a flag in task context which indicates whether it supports incremental handoffs or not. If not then code will continue using old logic and will have similar behavior as current. I am running this internally on our cluster where only few middle managers are running the code in this PR and others (including overlord) is running latest master/current code. However, I plan to switch them all back to running the latest code in this PR. Can you guys please also test the backwards compatibility support on your cluster ? I still have to add the unit tests but for now if possible please run the code on your cluster. |
@dclim probably not immediately until parse batch code is in. Are you thinking of having this feature in 0.11.1 ? I believe 0.11.1 will be released by end of December, will that be a RC or stable version ? |
@pjain1 0.11.1 (or 0.12.0 if that's what it is) should be release-branched off in mid-december, and then RC/stabled as soon as feasible. |
@pjain1 thanks for your work on this patch! Could you please write up something for the release notes with anything users should be aware of when upgrading? Like how to do a rolling upgrade, how to do a roll back to an older version, if any configs should be adjusted, etc. Marking this 'release notes' so we can include it. |
@gianm I was asking about the time-based intermediate publish, do you intend to have it in 0.11.1 ? If yes then I would encourage someone else to implement it otherwise I can have a look at this feature later. Let me see what I can add in the release notes, just to clarify rolling back to older version would cause currently running tasks to not restore as Appenderator metadata would not be compatible. |
@pjain1 oh, sorry I misunderstood you. I think it's not critical to have the time-based intermediate publish feature in 0.11.1. More like when you are able to get to it.
Thank you, the docs would be really helpful for users. Hmm, if a user does roll back, what will happen? Will it be broken forever or will it just fail to restore and then continue? If it's just going to ignore the on-disk data and continue, that's fine. If it's broken forever I think we should do a follow up patch to fix that so rollback is possible (even if restoring is not possible). |
@gianm In case of roll-back in worst case, current set of tasks will fail and then new set of tasks should be started by Supervisor which would continue to work. |
@pjain1 that sounds OK to me, probably just mention that in the release notes blurb so people know what to expect. |
Changes to Kafka Indexing Service
Rolling upgradeSince Kafka Indexing Service was marked experimental, with this release, there are protocol changes between Kafka Supervisor and Kafka Indexing task and also some changes to metadata persisted on disk. Therefore, to support rolling upgrade, all the Middle Managers will need to be upgraded first before the Overlord. Note that this ordering is different from the standard order of upgrade, also note that this ordering is only specific to Kafka Indexing Service. If one is not using Kafka Indexing Service or can handle down time for Kafka Supervisor then one can upgrade in any order. Until the point in time Overlord is upgraded, all the Kafka Indexing Task will behave in same manner (even if they are upgraded) as earlier which means no decoupling and incremental hand-offs. Once, Overlord is upgraded, the new tasks started by the upgraded Overlord will support the new features. Roll backDepending on how roll-back is done, in worst case, current set of tasks will fail. Once, both Overlord and Middle Managers are upgraded, new set of tasks should be started by Supervisor which would continue to work. Better to roll back Overlord first and then Middle Managers. Note of caution - Rolling back is not tested. |
@gianm I have written something, please have a look and let me know if I am missing anything. |
activeSegments, | ||
publishPendingSegments | ||
); | ||
if (segments == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pjain1 would you let me know when segments
can be null?
Also, activeSegments
and publishPendingSegments
look always null. Maybe these should be removed but accidentally remained?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, never mind. I just found your comment on these variables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is done for backwards compatibility so that old AppenderatorDriverMetadata can be converted to new format. segments
will be null while reading older metadata before upgrading the middle manager code, after upgrade the old metadata needs to be converted to new one.
Yes, for new code activeSegments
and publishPendingSegments
will always be null therefore these properties can be removed on versions greater than 0.12.x
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. Thanks.
while (taskIndex < taskSequences.size()) { | ||
if (earliestConsistentSequenceId.get() == -1) { | ||
// find the first replica task with earliest sequenceId consistent with datasource metadata in the metadata store | ||
if (taskSequences.get(taskIndex).rhs.entrySet().stream().anyMatch( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pjain1 could you please refactor this if statement with side effects into a method that returns a boolean? It's very hard to understand this code.
* Kafka Index Task that supports Incremental handoffs apache#4815 * prevent NPE from supressing actual exception (apache#5146) * prevent npe on mismatch between number of kafka partitions and task count (apache#5139) * Throw away rows with timestamps beyond long bounds in kafka indexing (apache#5215) (apache#5232) * Fix state check bug in Kafka Index Task (apache#5204) (apache#5248)
Hopefully fixes #4016, #4177, #4812, #4781, #4743 and #5046
Partially fixes #4498 and #4693
Alternate to #4178 and #4175
pauseAfterRead
featureDesign -
sequenceName
.maxRowsInSegment
limit is reached and publishing all the segments for the previous sequence.sequenceId
. It starts with 0.SequenceMetadata
class is added toKafkaIndexTask
class to maintain metadata about a sequence. Metadata for all the sequences in the task is persisted to disk whenever a new sequence is created or deleted or end offsets for a sequence are set. This also enables restore on restart.AppenderatorDriverAddResult.getNumRowsInSegment()
returned byadd
call ofAppenderatorDriver
is greater thanmaxRowsInMemory
.-At this point the task will pause and send
CheckPointDataSourceMetadataAction
to the Supervisor, which will use the same logic as is used for finishing all the tasks in theTaskGroup
and callsetEndOffsets
on the replica tasks withfinish
flag set tofalse
to indicate that current sequence should be finished and published and new sequence should be created for messages having offsets greater than the set end offsets. The new checkpoint information will be stored in the TaskGroup.context
field. Using this checkpoints information, the task creates list of SequenceMetadata and sets it start offsets to the start offset of the first sequence.verifyAndMergeCheckpoints
method.Some more information about specific issues -
FINISHING
immediately after the task is asked to finish by call tosetEndOffsets
withfinish
set totrue
to fix Better handling of Supervisor spec udpate #4812handoffConditionTimeout
property as for fixing make the timeout publishing as a failure #4175 the tasks are now killed (not stopped) after pending Completion Timeout elapses.