Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for concurrent batch Append and Replace #14407

Merged
merged 56 commits into from
Sep 25, 2023

Conversation

AmatyaAvadhanula
Copy link
Contributor

@AmatyaAvadhanula AmatyaAvadhanula commented Jun 12, 2023

Allows multiple Appending batch ingestion jobs to run with at most one Replacing job for an enclosing interval

Description

This PR utilizes the new task lock types: APPEND and REPLACE and builds on them to allow concurrent compaction with batch ingestion.

TODO - Describe the problems with segment locking, and briefly explain the mechanism used in this patch

Add new task actions: SegmentTransactionalAppendAction and SegmentTransactionalReplaceAction

The SegmentTransactionalAppendAction is used by appending tasks holding an APPEND lock. When committing segments to the metadata store, they transactionally commit the metadata corresponding to the segment id and the version of the REPLACE lock (if any) that was held on this interval in the druid_segmentVersions table.

The SegmentTransactionalReplaceAction is used by replacing tasks that hold a REPLACE lock. When committing the core partitions for a given interval, they also carry forward the previously appended segments held with the same lock (utilizing the metadata committed by the append action) to every version for which used segments exist.

Utilize the new task actions with previously added lock types to facilitate concurrent compaction with batch ingestion

Add new test Task types: AppendTask and ReplaceTask to help simulate various orders of events

Release note - TODO



This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Copy link
Contributor

@imply-cheddar imply-cheddar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving some comments, will come back to it.

Comment on lines 193 to 207
public SegmentIdWithShardSpec allocateOrGetSegmentForTimestamp(String timestamp)
{
final DateTime time = DateTime.parse(timestamp);
for (SegmentIdWithShardSpec pendingSegment : pendingSegments) {
if (pendingSegment.getInterval().contains(time)) {
return pendingSegment;
}
}
return allocateNewSegmentForDate(time);
}

public SegmentIdWithShardSpec allocateNewSegmentForTimestamp(String timestamp)
{
return allocateNewSegmentForDate(DateTime.parse(timestamp));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the methods that you are using to tell the task what to do from the test thread. But they are also doing all of their work on the actual test thread, not on the task's thread. This means that if you ever have one of these block, it's going to block your test thread instead of the task thread, meaning that your test cannot make progress.

In order to fix this, you will need to make the actual run() part of the task basically just sit and wait on a queue of work for it to do. Then these calls would add new Runnables to that queue. Once you do that, you will likely find that it's incredibly simple to control the behavior of the tasks from the test itself.

Comment on lines 118 to 121
if (lockType.equals(TaskLockType.APPEND) && preferredVersion == null) {
return "1970-01-01T00:00:00.000Z";
}
return preferredVersion == null ? DateTimes.nowUtc().toString() : preferredVersion;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The defaulting happening in this getter is a bit weird, let's try to make the things that build the Request do the right thing and make this getter less intelligent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some changes related to realtime tasks in this PR, primarily because the parameter useSharedLock has been removed. I think we should retain the useSharedLock parameter for the time being and only deprecate it for now to retain backward compatibililty.

Also, the realtime task related changes should be reverted from this PR.

@@ -297,15 +297,14 @@ public boolean determineLockGranularityAndTryLock(TaskActionClient client, List<
Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK
);
IngestionMode ingestionMode = getIngestionMode();
final boolean useSharedLock = ingestionMode == IngestionMode.APPEND
&& getContextValue(Tasks.USE_SHARED_LOCK, false);
final TaskLockType taskLockType = TaskLockType.valueOf(getContextValue(Tasks.TASK_LOCK_TYPE, TaskLockType.EXCLUSIVE.name()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the default value of task lock type should be a constant somewhere.

@kfaraz kfaraz marked this pull request as ready for review August 3, 2023 06:09
kfaraz added a commit that referenced this pull request Aug 21, 2023
Motivation:
- There is no usage of the `SegmentTransactionInsertAction` which passes a
non-null non-empty value of `segmentsToBeDropped`.
- This is not really needed either as overshadowed segments are marked as unused
by the Coordinator and need not be done in the same transaction as committing segments.
- It will also help simplify the changes being made in #14407 

Changes:
- Remove `segmentsToBeDropped` from the task action and all intermediate methods
- Remove related tests which are not needed anymore
kfaraz added a commit that referenced this pull request Sep 21, 2023
This commit pulls out some changes from #14407 to simplify that PR.

Changes:
- Rename `IndexerMetadataStorageCoordinator.announceHistoricalSegments` to `commitSegments`
- Rename the overloaded method to `commitSegmentsAndMetadata`
- Fix some typos
@kfaraz
Copy link
Contributor

kfaraz commented Sep 25, 2023

Merging as the IT failure is unrelated.

@kfaraz kfaraz merged commit c62193c into apache:master Sep 25, 2023
Comment on lines +182 to +185
() -> SegmentPublishResult.fail(
"Invalid task locks. Maybe they are revoked by a higher priority task."
+ " Please check the overlord log for details."
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have logged the intervals though.

Comment on lines +40 to +41
* Replace segments in metadata storage. The segment versions must all be less than or equal to a lock held by
* your task for the segment intervals.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this javadoc is not very clear. versions of "what" segments? The ones being replaced? Also what does it mean here by "your" task. Some verbosity here could be helpful.

@@ -96,7 +112,8 @@ public static boolean isLockCoversSegments(
final TimeChunkLock timeChunkLock = (TimeChunkLock) lock;
return timeChunkLock.getInterval().contains(segment.getInterval())
&& timeChunkLock.getDataSource().equals(segment.getDataSource())
&& timeChunkLock.getVersion().compareTo(segment.getVersion()) >= 0;
&& (timeChunkLock.getVersion().compareTo(segment.getVersion()) >= 0
|| TaskLockType.APPEND.equals(timeChunkLock.getType()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please leave some comments here, like append by definition covers all versions unlike other lock types

Comment on lines +133 to +135
* This method should be de-duplicated with {@link AbstractBatchIndexTask#determineLockType}
* by passing the ParallelIndexSupervisorTask instance into the
* SinglePhaseParallelIndexTaskRunner.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This passage is not clear. What kind of de-duplication does it refer to?

Comment on lines +226 to +239
final Set<Map<String, Object>> usedSegmentLoadSpecs = toolbox
.getTaskActionClient()
.submit(new RetrieveUsedSegmentsAction(getDataSource(), getInterval(), null, Segments.INCLUDING_OVERSHADOWED))
.stream()
.map(DataSegment::getLoadSpec)
.collect(Collectors.toSet());

// Kill segments from the deep storage only if their load specs are not being used by any used segments
final List<DataSegment> segmentsToBeKilled = unusedSegments
.stream()
.filter(unusedSegment -> !usedSegmentLoadSpecs.contains(unusedSegment.getLoadSpec()))
.collect(Collectors.toList());

toolbox.getDataSegmentKiller().kill(segmentsToBeKilled);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This action is very expensive action especially if the interval is eternity which is often the case with killUnusedSegments task.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was already happening, this PR has just reduced the number of segments that are being killed by the DataSegmentKiller.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is that Kashif? Were we getting the list of used segments before too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry, I misinterpreted the comment to be just for the last line, rather than this whole block of code.

* there would be some used segments in the DB with versions higher than these
* append segments.
*/
private Set<DataSegment> getSegmentsToUpgradeOnAppend(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be renamed to getExtraVersionsForAppendSegments

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"extra" versions is probably still confusing. How about get createUpgradedVersionsOfAppendSegments?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or newVersionsOfAppendSegments

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that works too. But I just want to avoid confusion between words like "new", "upgrade" or "extra" when all of them mean the same thing in our context.

Comment on lines +1082 to +1083
final Map<String, Set<Interval>> committedVersionToIntervals = new HashMap<>();
final Map<Interval, Set<DataSegment>> committedIntervalToSegments = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

variable naming - committed -> overlapping

final Set<DataSegment> upgradedSegments = new HashSet<>();
for (Map.Entry<String, Set<Interval>> entry : committedVersionToIntervals.entrySet()) {
final String upgradeVersion = entry.getKey();
Map<Interval, Set<DataSegment>> segmentsToUpgrade = getSegmentsWithVersionLowerThan(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming -segmentsToUpgrade --> extraSegmentVersions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is not returning extra versions, it's returning the segments that need to be upgraded.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think when I hear upgrade, I see something going from V0 to V1, but here, we leave V0 as it is and also create V1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's correct. In this line, we are just identifying the segments of V0 that need to go to V1.

final Set<DataSegment> upgradedSegments = new HashSet<>();
for (Map.Entry<String, Set<Interval>> entry : committedVersionToIntervals.entrySet()) {
final String upgradeVersion = entry.getKey();
Map<Interval, Set<DataSegment>> segmentsToUpgrade = getSegmentsWithVersionLowerThan(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for my own understanding, there should never be an empty value for a function such as getSegmentsWithVersionHigherThan with the same arguments?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean that the returned value should not be empty?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never mind. I was asking that versions in the db will always be higher or equal to the append segment version

* Computes new Segment IDs for the {@code segmentsToUpgrade} being upgraded
* to the given {@code upgradeVersion}.
*/
private Set<DataSegment> upgradeSegmentsToVersion(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the name is a bit confusing because it doesn't actually upgrade anything yet. Just creating extra versions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about createUpgradedVersionOfSegments?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that works too.

@kfaraz
Copy link
Contributor

kfaraz commented Oct 8, 2023

@abhishekagarwal87 , I have replied to your comments. The changes will be included in #15097 .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants