-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for concurrent batch Append and Replace #14407
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving some comments, will come back to it.
...src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
Show resolved
Hide resolved
...src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
Outdated
Show resolved
Hide resolved
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppendTask.java
Outdated
Show resolved
Hide resolved
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppendTask.java
Outdated
Show resolved
Hide resolved
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppendTask.java
Outdated
Show resolved
Hide resolved
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppendTask.java
Outdated
Show resolved
Hide resolved
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppendTask.java
Outdated
Show resolved
Hide resolved
public SegmentIdWithShardSpec allocateOrGetSegmentForTimestamp(String timestamp) | ||
{ | ||
final DateTime time = DateTime.parse(timestamp); | ||
for (SegmentIdWithShardSpec pendingSegment : pendingSegments) { | ||
if (pendingSegment.getInterval().contains(time)) { | ||
return pendingSegment; | ||
} | ||
} | ||
return allocateNewSegmentForDate(time); | ||
} | ||
|
||
public SegmentIdWithShardSpec allocateNewSegmentForTimestamp(String timestamp) | ||
{ | ||
return allocateNewSegmentForDate(DateTime.parse(timestamp)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are the methods that you are using to tell the task what to do from the test thread. But they are also doing all of their work on the actual test thread, not on the task's thread. This means that if you ever have one of these block, it's going to block your test thread instead of the task thread, meaning that your test cannot make progress.
In order to fix this, you will need to make the actual run()
part of the task basically just sit and wait on a queue of work for it to do. Then these calls would add new Runnables to that queue. Once you do that, you will likely find that it's incredibly simple to control the behavior of the tasks from the test itself.
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
Outdated
Show resolved
Hide resolved
indexing-service/src/main/java/org/apache/druid/indexing/common/task/ReplaceTask.java
Outdated
Show resolved
Hide resolved
indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
Outdated
Show resolved
Hide resolved
if (lockType.equals(TaskLockType.APPEND) && preferredVersion == null) { | ||
return "1970-01-01T00:00:00.000Z"; | ||
} | ||
return preferredVersion == null ? DateTimes.nowUtc().toString() : preferredVersion; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The defaulting happening in this getter is a bit weird, let's try to make the things that build the Request do the right thing and make this getter less intelligent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some changes related to realtime tasks in this PR, primarily because the parameter useSharedLock
has been removed. I think we should retain the useSharedLock
parameter for the time being and only deprecate it for now to retain backward compatibililty.
Also, the realtime task related changes should be reverted from this PR.
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java
Show resolved
Hide resolved
...xing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
Show resolved
Hide resolved
@@ -297,15 +297,14 @@ public boolean determineLockGranularityAndTryLock(TaskActionClient client, List< | |||
Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK | |||
); | |||
IngestionMode ingestionMode = getIngestionMode(); | |||
final boolean useSharedLock = ingestionMode == IngestionMode.APPEND | |||
&& getContextValue(Tasks.USE_SHARED_LOCK, false); | |||
final TaskLockType taskLockType = TaskLockType.valueOf(getContextValue(Tasks.TASK_LOCK_TYPE, TaskLockType.EXCLUSIVE.name())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the default value of task lock type should be a constant somewhere.
indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskLockHelper.java
Outdated
Show resolved
Hide resolved
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
Outdated
Show resolved
Hide resolved
… into overlordSimulator
Motivation: - There is no usage of the `SegmentTransactionInsertAction` which passes a non-null non-empty value of `segmentsToBeDropped`. - This is not really needed either as overshadowed segments are marked as unused by the Coordinator and need not be done in the same transaction as committing segments. - It will also help simplify the changes being made in #14407 Changes: - Remove `segmentsToBeDropped` from the task action and all intermediate methods - Remove related tests which are not needed anymore
server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
Fixed
Show fixed
Hide fixed
This commit pulls out some changes from #14407 to simplify that PR. Changes: - Rename `IndexerMetadataStorageCoordinator.announceHistoricalSegments` to `commitSegments` - Rename the overloaded method to `commitSegmentsAndMetadata` - Fix some typos
Merging as the IT failure is unrelated. |
() -> SegmentPublishResult.fail( | ||
"Invalid task locks. Maybe they are revoked by a higher priority task." | ||
+ " Please check the overlord log for details." | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have logged the intervals though.
* Replace segments in metadata storage. The segment versions must all be less than or equal to a lock held by | ||
* your task for the segment intervals. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this javadoc is not very clear. versions of "what" segments? The ones being replaced? Also what does it mean here by "your" task. Some verbosity here could be helpful.
...rc/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
Show resolved
Hide resolved
@@ -96,7 +112,8 @@ public static boolean isLockCoversSegments( | |||
final TimeChunkLock timeChunkLock = (TimeChunkLock) lock; | |||
return timeChunkLock.getInterval().contains(segment.getInterval()) | |||
&& timeChunkLock.getDataSource().equals(segment.getDataSource()) | |||
&& timeChunkLock.getVersion().compareTo(segment.getVersion()) >= 0; | |||
&& (timeChunkLock.getVersion().compareTo(segment.getVersion()) >= 0 | |||
|| TaskLockType.APPEND.equals(timeChunkLock.getType())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please leave some comments here, like append by definition covers all versions unlike other lock types
* This method should be de-duplicated with {@link AbstractBatchIndexTask#determineLockType} | ||
* by passing the ParallelIndexSupervisorTask instance into the | ||
* SinglePhaseParallelIndexTaskRunner. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This passage is not clear. What kind of de-duplication does it refer to?
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java
Show resolved
Hide resolved
...xing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
Show resolved
Hide resolved
final Set<Map<String, Object>> usedSegmentLoadSpecs = toolbox | ||
.getTaskActionClient() | ||
.submit(new RetrieveUsedSegmentsAction(getDataSource(), getInterval(), null, Segments.INCLUDING_OVERSHADOWED)) | ||
.stream() | ||
.map(DataSegment::getLoadSpec) | ||
.collect(Collectors.toSet()); | ||
|
||
// Kill segments from the deep storage only if their load specs are not being used by any used segments | ||
final List<DataSegment> segmentsToBeKilled = unusedSegments | ||
.stream() | ||
.filter(unusedSegment -> !usedSegmentLoadSpecs.contains(unusedSegment.getLoadSpec())) | ||
.collect(Collectors.toList()); | ||
|
||
toolbox.getDataSegmentKiller().kill(segmentsToBeKilled); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This action is very expensive action especially if the interval is eternity which is often the case with killUnusedSegments task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was already happening, this PR has just reduced the number of segments that are being killed by the DataSegmentKiller
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is that Kashif? Were we getting the list of used segments before too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, sorry, I misinterpreted the comment to be just for the last line, rather than this whole block of code.
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
Show resolved
Hide resolved
* there would be some used segments in the DB with versions higher than these | ||
* append segments. | ||
*/ | ||
private Set<DataSegment> getSegmentsToUpgradeOnAppend( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could be renamed to getExtraVersionsForAppendSegments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"extra" versions is probably still confusing. How about get createUpgradedVersionsOfAppendSegments
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or newVersionsOfAppendSegments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that works too. But I just want to avoid confusion between words like "new", "upgrade" or "extra" when all of them mean the same thing in our context.
final Map<String, Set<Interval>> committedVersionToIntervals = new HashMap<>(); | ||
final Map<Interval, Set<DataSegment>> committedIntervalToSegments = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
variable naming - committed -> overlapping
final Set<DataSegment> upgradedSegments = new HashSet<>(); | ||
for (Map.Entry<String, Set<Interval>> entry : committedVersionToIntervals.entrySet()) { | ||
final String upgradeVersion = entry.getKey(); | ||
Map<Interval, Set<DataSegment>> segmentsToUpgrade = getSegmentsWithVersionLowerThan( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
naming -segmentsToUpgrade --> extraSegmentVersions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is not returning extra versions, it's returning the segments that need to be upgraded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think when I hear upgrade, I see something going from V0 to V1, but here, we leave V0 as it is and also create V1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's correct. In this line, we are just identifying the segments of V0 that need to go to V1.
final Set<DataSegment> upgradedSegments = new HashSet<>(); | ||
for (Map.Entry<String, Set<Interval>> entry : committedVersionToIntervals.entrySet()) { | ||
final String upgradeVersion = entry.getKey(); | ||
Map<Interval, Set<DataSegment>> segmentsToUpgrade = getSegmentsWithVersionLowerThan( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just for my own understanding, there should never be an empty value for a function such as getSegmentsWithVersionHigherThan
with the same arguments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean that the returned value should not be empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
never mind. I was asking that versions in the db will always be higher or equal to the append segment version
* Computes new Segment IDs for the {@code segmentsToUpgrade} being upgraded | ||
* to the given {@code upgradeVersion}. | ||
*/ | ||
private Set<DataSegment> upgradeSegmentsToVersion( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the name is a bit confusing because it doesn't actually upgrade anything yet. Just creating extra versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about createUpgradedVersionOfSegments
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that works too.
@abhishekagarwal87 , I have replied to your comments. The changes will be included in #15097 . |
Allows multiple Appending batch ingestion jobs to run with at most one Replacing job for an enclosing interval
Description
This PR utilizes the new task lock types: APPEND and REPLACE and builds on them to allow concurrent compaction with batch ingestion.
TODO - Describe the problems with segment locking, and briefly explain the mechanism used in this patch
Add new task actions: SegmentTransactionalAppendAction and SegmentTransactionalReplaceAction
The SegmentTransactionalAppendAction is used by appending tasks holding an APPEND lock. When committing segments to the metadata store, they transactionally commit the metadata corresponding to the segment id and the version of the REPLACE lock (if any) that was held on this interval in the druid_segmentVersions table.
The SegmentTransactionalReplaceAction is used by replacing tasks that hold a REPLACE lock. When committing the core partitions for a given interval, they also carry forward the previously appended segments held with the same lock (utilizing the metadata committed by the append action) to every version for which used segments exist.
Utilize the new task actions with previously added lock types to facilitate concurrent compaction with batch ingestion
Add new test Task types: AppendTask and ReplaceTask to help simulate various orders of events
Release note - TODO
This PR has: