Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AppenderatorDriver metadata loses information about publishing segments before they are published #4743

Closed
pjain1 opened this issue Sep 1, 2017 · 14 comments

Comments

@pjain1
Copy link
Member

pjain1 commented Sep 1, 2017

I see a problem with the current way of managing metadata about publishing segments in AppenderatorDriver. Lets say a task calls AppenderatorDriver to publish segments for a sequence, then the driver will remove the sequence information from activeSegments and publishPendingSegments map. Now, if the task is restarted at a point after in memory data is persisted and metadata is committed but before any mergeAndPush or publish happens, then on restart if the task again tells driver to publish the same sequence, there is no way for the driver to know what segments to publish.

If you look at the code for publish or publishAll method of AppenderatorDriver the first thing that is done is to remove the sequence information from activeSegments and publishPendingSegments map. After that push is called on Appenderator with wrapped committer which will contain driver metadata (with the sequence information removed). In the push implementation in memory data is persisted using persist method which also commits the metadata to disk. So, what I was saying is that if task is restarted at this point, the restored metadata might be incomplete as this sequence information will not be restored. Any further calls to publish this sequence wouldn't do anything.

One way to resolve this would be to maintain an additional in-memory structure that contains sequences which are being published. On call to publish, sequence information is removed from activeSegments map but not from publishPendingSegments and added to the new in-memory structure. On successful publish sequence information is removed from the publishPendingSegments and in-memory structure. Any call to publish should try to publish sequence if it is not in the in-memory structure (it may not be in the activeSegment map when task is restarted at the point of time above described).

@pjain1 pjain1 added this to the 0.11.0 milestone Sep 1, 2017
@pjain1 pjain1 assigned jihoonson and unassigned jihoonson Sep 1, 2017
@pjain1
Copy link
Member Author

pjain1 commented Sep 1, 2017

@jihoonson can you confirm this ?

@jihoonson
Copy link
Contributor

@pjain1 thanks for pointing out this problem. Your suggestion looks reasonable. I have one comment below.

One way to resolve this would be to maintain an additional in-memory structure that contains sequences which are being published.

It seems that now the segments in AppenderatorDriver have 4 states, i.e., active, inactive, publishing, and published. Since the published state can be represented by removing the segments from a sort of in-memory structure (like activeSegments, publishPendingSegments or something else), we need to maintain the remaining 3 states of the segments. I think the state machine can be useful for this case. With the state machine, we need to keep only one in-memory structure containing the segments associated with their states. I guess it's simpler than the current implementation. What do you think?

@pjain1
Copy link
Member Author

pjain1 commented Sep 3, 2017

True that published state need not be maintained. However, I don't understand what you mean by inactive state. Using state machine to keep track of segment state is a good idea, the state can be persisted to disk as part of wrapped commit metadata. However, there may still be a need for an in-memory structure (not persisted to disk anytime) to keep track of currently publishing segments for a sequence. As the publish call is an external trigger by a task, so multiple calls to publish same sequence should be de-duplicated based on this structure and on restart of task, a call to publish same sequence should actually trigger a publish as the sequence/segments would not be present in the in-memory structure.

The main thing is to prevent multiple publish of same sequence on duplicate publish requests and more importantly to actually redo publish of segments for a sequence after the task is restarted if previous publish request did not complete fully. So, if this can be achieved by using only state machine then I am good with that.

@jihoonson
Copy link
Contributor

I don't understand what you mean by inactive state.

Sorry for insufficient description. inactive state means that a segment is not published yet but data is not added anymore. It can be thought as the segments which are in publishPendingSegments but not in activeSegments in the current implementation. Maybe there is a better name for this state.

The main thing is to prevent multiple publish of same sequence on duplicate publish requests and more importantly to actually redo publish of segments for a sequence after the task is restarted if previous publish request did not complete fully.

I thought that this could be achieved by maintaining the segments' state machines in an in-memory structure like Map<sequenceName, NavigableMap<Interval, SegmentState>>. A SegmentState contains at least a segment identifier and a state of the segment. Once publish() is called, the states of the segments to be published are changed to publishing. If the publish() is called for the same sequence (and same segments), AppenderatorDriver first checks the states of the segments and won't publish them because they are already in the publishing state.

I think this can solve the problem because all segments are kept in memory until they are published. Please correct me if I missed something.

@pjain1
Copy link
Member Author

pjain1 commented Sep 3, 2017

So can the segments move directly from active to publishing state ? I don't see what are we gaining from putting them in inactive state.

The map for maintaining the state of segments can look like Map<SequenceName, List<SegmentState>>, I am not sure why interval is needed here. As you explained, publishing state can be used from preventing publishing same sequence twice but you didn't explained what will happen when the task restarts, will this map (with segment states) be persisted to disk or it will be rebuilt again on restart ? What state the segments will be when restart happens ? How will the driver know if a sequence was in publishing state when task restarted so that when a call is made to publish same sequence again, it should actually start publishing it again ?

Sorry these points are not clear to me, can you please clarify. Thanks

@jihoonson
Copy link
Contributor

Sure. Here are more details.

The map for maintaining the state of segments can look like Map<SequenceName, List>, I am not sure why interval is needed here.

If this is Map<sequenceName, NavigableMap<Interval, SegmentState>>, this map is the only one we should maintain. All segments should be in this map until they are published. The lifecycle of the segments in AppenderatorDriver will be

  • When AppenderatorDriver.add() is called, it first finds there is an active segment for the input row. This includes finding a segment from the inner NavigableMap using the input row's timestamp as a key. The interval is required here. If there isn't any segment of any state, AppenderatorDriver creates a new segment. If there is a segment of active state, it reuses that segment. Otherwise, it throws an exception.
  • If the number of rows exceeds a threshold (e.g., maxRowsPerSegment in KafkaIndexTask) after AppenderatorDriver.add(), AppenderatorDriver.moveSegmentOut() can be called. This changes the segments' states from active to inactive.
  • When AppenderatorDriver.publish() is called, it first changes the states of the segments from active or inactive to publishing, and calls Appenderator.push(). Once push() is finished, it publishes the pushed segments.
  • Once segments are published, those segments are removed from the map.

The state transition can be summarized as active -> (inactive) -> publishing. Transition to inactive state is optional.

will this map (with segment states) be persisted to disk or it will be rebuilt again on restart ? What state the segments will be when restart happens ?

Yes, this map should be persisted when committer runs. On a task restart, the map is read from the commit metadata.

The segment states on task restart can be different depending on the implementation. Here are some solutions I can think of now.

  • Since the kafka consumer can be rewound on task restart, the same records should be able to be added to the driver. So, the segment states in the map should not be publishing on task restart. This can be achieved by calling wrapCommitter() before changing segment states in AppenderatorDriver.publish().
  • I think it's better if AppenderatorDriver can publish the segments in publishing state by itself on task restart. It means, the segment states should be changed to publishing before wrapCommitter(). On a task restart, the task don't have to rewind the kafka consumer for the segments which are already in publishing state because AppenderatorDriver will retry publishing those segments by itself. This will remove the duplicated data processing on task restart in the first solution. I guess this solution may involve some changes in the current implementation of KafkaIndexTask.

How will the driver know if a sequence was in publishing state when task restarted so that when a call is made to publish same sequence again, it should actually start publishing it again ?

The driver is able to know that each segment is in which state. When a task restarts, the driver should be able to find the segments to be published in the map because every segment are in there until they are published.

@pjain1
Copy link
Member Author

pjain1 commented Sep 4, 2017

Thanks for the detailed explanation. The state transitions look good to me, you have already pointed out but mentioning it again that state transition to inactive should not be mandatory as some task may not call moveSegmentOut method but may directly call publish once segment row threshold is reached.

For the restart logic, I think rewinding Kafka consumer may not be a good idea, its just extra work and can be problematic if retention on Kafka side is low. I like the idea of driver start publishing the segment by itself, however the only problem I can see there is how the task will get the future for publishing segments, as the task cannot start new publish until the previous one is done and also the task does few things when a publish future is resolved like registering for handoff, adjusting its metadata etc. May be the startJob method can return futures for publishes restarted by the driver after task is restarted in addition to other metadata. Is that what you have in mind ?

@jihoonson
Copy link
Contributor

Yeah, it sounds needed. Or it's also possible that startJob() simply returns metadata including the segments in publishing state, and a task calls publish() and registerHandoff() for those segments. This is different from what I mentioned above because the one which is responsible for restarting publishing segments is the task not the AppenderatorDriver, but this is also able to avoid duplicated data processing. Please choose the one you think is better.

@pjain1
Copy link
Member Author

pjain1 commented Sep 5, 2017

@jihoonson second approach sounds easier to plug with the current code I have. So, lets go with that if you think it is good. Also, it would be better if the driver can return sequence names instead of segment information.

@jihoonson
Copy link
Contributor

@pjain1 sounds good. Thanks!

@jihoonson
Copy link
Contributor

@pjain1 would you please take this issue if this is a blocking issue for you?
I'm currently working on #4550 and #4704, so I could take this issue after they are finished. Also, I think it's easier for you to check that the patch works for the problem you're facing.

@pjain1
Copy link
Member Author

pjain1 commented Sep 6, 2017

Its not blocking me, its a bug that already exists and I was hoping to get this fixed in 0.11. Anyways, I can pick it up once I am done with my current tasks. Thanks

@jihoonson
Copy link
Contributor

@pjain1 cool, thanks!

@pjain1
Copy link
Member Author

pjain1 commented Nov 17, 2017

Fixed in #4815

@pjain1 pjain1 closed this as completed Nov 17, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants