diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 9a1dd089cfc5..f31f66c5ef3a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -69,7 +69,7 @@ import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.LockReleaseAction; import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction; -import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction; +import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.SegmentAllocateAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; @@ -1233,10 +1233,15 @@ private DataSegmentTimelineView makeDataSegmentTimelineView() // any segment created after the lock was acquired for its interval will not be considered. final Collection publishedUsedSegments; try { - publishedUsedSegments = context.taskActionClient().submit(new RetrieveSegmentsToReplaceAction( - dataSource, - intervals - )); + // Additional check as the task action does not accept empty intervals + if (intervals.isEmpty()) { + publishedUsedSegments = Collections.emptySet(); + } else { + publishedUsedSegments = context.taskActionClient().submit(new RetrieveUsedSegmentsAction( + dataSource, + intervals + )); + } } catch (IOException e) { throw new MSQException(e, UnknownFault.forException(e)); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java index f8100dc8a8f3..ea7adc866ee0 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java @@ -38,6 +38,7 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.DimensionRangeShardSpec; +import org.easymock.EasyMock; import org.joda.time.Interval; import org.junit.Test; import org.junit.runner.RunWith; @@ -98,6 +99,28 @@ public void testReplaceOnFooWithAll() .add("m1", ColumnType.FLOAT) .build(); + DataSegment existingDataSegment0 = DataSegment.builder() + .interval(Intervals.of("2000-01-01T/2000-01-04T")) + .size(50) + .version(MSQTestTaskActionClient.VERSION) + .dataSource("foo") + .build(); + + DataSegment existingDataSegment1 = DataSegment.builder() + .interval(Intervals.of("2001-01-01T/2001-01-04T")) + .size(50) + .version(MSQTestTaskActionClient.VERSION) + .dataSource("foo") + .build(); + + Mockito.doCallRealMethod() + .doReturn(ImmutableSet.of(existingDataSegment0, existingDataSegment1)) + .when(testTaskActionClient) + .submit(new RetrieveUsedSegmentsAction( + EasyMock.eq("foo"), + EasyMock.eq(ImmutableList.of(Intervals.ETERNITY)) + )); + testIngestQuery().setSql(" REPLACE INTO foo OVERWRITE ALL " + "SELECT __time, m1 " + "FROM foo " @@ -418,6 +441,28 @@ public void testReplaceSegmentsRepartitionTable() .add("m1", ColumnType.FLOAT) .build(); + DataSegment existingDataSegment0 = DataSegment.builder() + .interval(Intervals.of("2000-01-01T/2000-01-04T")) + .size(50) + .version(MSQTestTaskActionClient.VERSION) + .dataSource("foo") + .build(); + DataSegment existingDataSegment1 = DataSegment.builder() + .interval(Intervals.of("2001-01-01T/2001-01-04T")) + .size(50) + .version(MSQTestTaskActionClient.VERSION) + .dataSource("foo") + .build(); + + Mockito.doCallRealMethod() + .doReturn(ImmutableSet.of(existingDataSegment0, existingDataSegment1)) + .when(testTaskActionClient) + .submit(new RetrieveUsedSegmentsAction( + EasyMock.eq("foo"), + EasyMock.eq(ImmutableList.of(Intervals.ETERNITY)) + )); + + testIngestQuery().setSql(" REPLACE INTO foo " + "OVERWRITE ALL " + "SELECT __time, m1 " @@ -479,6 +524,20 @@ public void testReplaceWithWhereClause() .add("m1", ColumnType.FLOAT) .build(); + DataSegment existingDataSegment0 = DataSegment.builder() + .interval(Intervals.of("2000-01-01T/2000-01-04T")) + .size(50) + .version(MSQTestTaskActionClient.VERSION) + .dataSource("foo") + .build(); + + Mockito.doReturn(ImmutableSet.of(existingDataSegment0)) + .when(testTaskActionClient) + .submit(new RetrieveUsedSegmentsAction( + EasyMock.eq("foo"), + EasyMock.eq(ImmutableList.of(Intervals.of("2000-01-01/2000-03-01"))) + )); + testIngestQuery().setSql(" REPLACE INTO foo " + "OVERWRITE WHERE __time >= TIMESTAMP '2000-01-01' AND __time < TIMESTAMP '2000-03-01' " + "SELECT __time, m1 " @@ -538,6 +597,28 @@ public void testReplaceWhereClauseLargerThanData() .add("m1", ColumnType.FLOAT) .build(); + DataSegment existingDataSegment0 = DataSegment.builder() + .interval(Intervals.of("2000-01-01T/2000-01-04T")) + .size(50) + .version(MSQTestTaskActionClient.VERSION) + .dataSource("foo") + .build(); + + DataSegment existingDataSegment1 = DataSegment.builder() + .interval(Intervals.of("2001-01-01T/2001-01-04T")) + .size(50) + .version(MSQTestTaskActionClient.VERSION) + .dataSource("foo") + .build(); + + Mockito.doReturn(ImmutableSet.of(existingDataSegment0, existingDataSegment1)) + .when(testTaskActionClient) + .submit(new RetrieveUsedSegmentsAction( + EasyMock.eq("foo"), + EasyMock.eq(ImmutableList.of(Intervals.of("2000-01-01/2002-01-01"))) + )); + + testIngestQuery().setSql(" REPLACE INTO foo " + "OVERWRITE WHERE __time >= TIMESTAMP '2000-01-01' AND __time < TIMESTAMP '2002-01-01' " + "SELECT __time, m1 " @@ -625,6 +706,19 @@ public void testReplaceTimeChunks() .add("m1", ColumnType.FLOAT) .build(); + final DataSegment existingDataSegment = DataSegment.builder() + .dataSource("foo") + .interval(Intervals.of("2000-01-01/2000-01-04")) + .version(MSQTestTaskActionClient.VERSION) + .size(1) + .build(); + Mockito.doReturn(ImmutableSet.of(existingDataSegment)) + .when(testTaskActionClient) + .submit(new RetrieveUsedSegmentsAction( + EasyMock.eq("foo"), + EasyMock.eq(ImmutableList.of(Intervals.of("2000-01-01/2000-03-01"))) + )); + testIngestQuery().setSql(" REPLACE INTO foo " + "OVERWRITE WHERE __time >= TIMESTAMP '2000-01-01' AND __time < TIMESTAMP '2000-03-01'" + "SELECT __time, m1 " @@ -659,6 +753,26 @@ public void testReplaceTimeChunksLargerThanData() .add("m1", ColumnType.FLOAT) .build(); + DataSegment existingDataSegment0 = DataSegment.builder() + .interval(Intervals.of("2000-01-01T/2000-01-04T")) + .size(50) + .version(MSQTestTaskActionClient.VERSION) + .dataSource("foo") + .build(); + DataSegment existingDataSegment1 = DataSegment.builder() + .interval(Intervals.of("2001-01-01T/2001-01-04T")) + .size(50) + .version(MSQTestTaskActionClient.VERSION) + .dataSource("foo") + .build(); + + Mockito.doReturn(ImmutableSet.of(existingDataSegment0, existingDataSegment1)) + .when(testTaskActionClient) + .submit(new RetrieveUsedSegmentsAction( + EasyMock.eq("foo"), + EasyMock.eq(ImmutableList.of(Intervals.of("2000/2002"))) + )); + testIngestQuery().setSql(" REPLACE INTO foo " + "OVERWRITE WHERE __time >= TIMESTAMP '2000-01-01' AND __time < TIMESTAMP '2002-01-01'" + "SELECT __time, m1 " @@ -939,6 +1053,26 @@ public void testReplaceUnnestSegmentWithTimeFilter() .add("d", ColumnType.STRING) .build(); + DataSegment existingDataSegment0 = DataSegment.builder() + .interval(Intervals.of("2000-01-01T/2000-01-04T")) + .size(50) + .version(MSQTestTaskActionClient.VERSION) + .dataSource("foo") + .build(); + DataSegment existingDataSegment1 = DataSegment.builder() + .interval(Intervals.of("2001-01-01T/2001-01-04T")) + .size(50) + .version(MSQTestTaskActionClient.VERSION) + .dataSource("foo") + .build(); + + Mockito.doReturn(ImmutableSet.of(existingDataSegment0, existingDataSegment1)) + .when(testTaskActionClient) + .submit(new RetrieveUsedSegmentsAction( + EasyMock.eq("foo"), + EasyMock.eq(ImmutableList.of(Intervals.of("1999/2002"))) + )); + testIngestQuery().setSql(" REPLACE INTO foo " + "OVERWRITE WHERE __time >= TIMESTAMP '1999-01-01 00:00:00' and __time < TIMESTAMP '2002-01-01 00:00:00'" + "SELECT __time, d " @@ -1003,7 +1137,10 @@ public void testReplaceTombstonesOverPartiallyOverlappingSegments() Mockito.doReturn(ImmutableSet.of(existingDataSegment)) .when(testTaskActionClient) - .submit(ArgumentMatchers.isA(RetrieveUsedSegmentsAction.class)); + .submit(new RetrieveUsedSegmentsAction( + EasyMock.eq("foo1"), + EasyMock.eq(ImmutableList.of(Intervals.of("2000/2002"))) + )); List expectedResults; if (NullHandling.sqlCompatible()) { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java index 5192aafccdc5..fd0452cce7ca 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java @@ -21,13 +21,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.inject.Injector; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TimeChunkLock; import org.apache.druid.indexing.common.actions.LockListAction; -import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.SegmentAllocateAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction; @@ -47,8 +44,6 @@ import org.joda.time.Interval; import java.util.HashSet; -import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -60,10 +55,6 @@ public class MSQTestTaskActionClient implements TaskActionClient public static final String VERSION = "test"; private final ObjectMapper mapper; private final ConcurrentHashMap segmentIdPartitionIdMap = new ConcurrentHashMap<>(); - private final Map> usedIntervals = ImmutableMap.of( - "foo", ImmutableList.of(Intervals.of("2001-01-01/2001-01-04"), Intervals.of("2000-01-01/2000-01-04")), - "foo2", ImmutableList.of(Intervals.of("2000-01-01/P1D")) - ); private final Set publishedSegments = new HashSet<>(); private final Injector injector; @@ -115,21 +106,6 @@ public RetType submit(TaskAction taskAction) )); } else if (taskAction instanceof RetrieveUsedSegmentsAction) { String dataSource = ((RetrieveUsedSegmentsAction) taskAction).getDataSource(); - if (!usedIntervals.containsKey(dataSource)) { - return (RetType) ImmutableSet.of(); - } else { - return (RetType) usedIntervals.get(dataSource) - .stream() - .map(interval -> DataSegment.builder() - .dataSource(dataSource) - .interval(interval) - .version(VERSION) - .size(1) - .build() - ).collect(Collectors.toSet()); - } - } else if (taskAction instanceof RetrieveSegmentsToReplaceAction) { - String dataSource = ((RetrieveSegmentsToReplaceAction) taskAction).getDataSource(); return (RetType) injector.getInstance(SpecificSegmentsQuerySegmentWalker.class) .getSegments() .stream() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java index 984058895e3c..3a33bc80d68f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java @@ -22,7 +22,6 @@ import com.google.common.collect.Iterables; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.TaskActionClient; -import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.segment.realtime.appenderator.UsedSegmentChecker; @@ -66,7 +65,7 @@ public Set findUsedSegments(Set segmentIds) ); final Collection usedSegmentsForIntervals = taskActionClient - .submit(new RetrieveUsedSegmentsAction(dataSource, null, intervals, Segments.ONLY_VISIBLE)); + .submit(new RetrieveUsedSegmentsAction(dataSource, intervals)); for (DataSegment segment : usedSegmentsForIntervals) { if (segmentIdsInDataSource.contains(segment.getId())) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java deleted file mode 100644 index 7fec3369a824..000000000000 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsToReplaceAction.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.common.actions; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.type.TypeReference; -import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.common.task.batch.parallel.AbstractBatchSubtask; -import org.apache.druid.indexing.overlord.Segments; -import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.Pair; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.metadata.ReplaceTaskLock; -import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.Partitions; -import org.apache.druid.timeline.SegmentTimeline; -import org.joda.time.Interval; - -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.stream.Collectors; - -/** - * This action exists in addition to retrieveUsedSegmentsAction because that action suffers - * from a race condition described by the following sequence of events: - * - * -Segments S1, S2, S3 exist - * -Compact acquires a replace lock - * -A concurrent appending job publishes a segment S4 which needs to be upgraded to the replace lock's version - * -Compact task processes S1-S4 to create new segments - * -Compact task publishes new segments and carries S4 forward to the new version - * - * This can lead to the data in S4 being duplicated - * - * This TaskAction returns a collection of segments which have data within the specified interval and are marked as - * used, and have been created before a REPLACE lock, if any, was acquired. - * This ensures that a consistent set of segments is returned each time this action is called - */ -public class RetrieveSegmentsToReplaceAction implements TaskAction> -{ - private static final Logger log = new Logger(RetrieveSegmentsToReplaceAction.class); - - private final String dataSource; - - private final List intervals; - - @JsonCreator - public RetrieveSegmentsToReplaceAction( - @JsonProperty("dataSource") String dataSource, - @JsonProperty("intervals") List intervals - ) - { - this.dataSource = dataSource; - this.intervals = intervals; - } - - @JsonProperty - public String getDataSource() - { - return dataSource; - } - - @JsonProperty - public List getIntervals() - { - return intervals; - } - - @Override - public TypeReference> getReturnTypeReference() - { - return new TypeReference>() {}; - } - - @Override - public Collection perform(Task task, TaskActionToolbox toolbox) - { - // The DruidInputSource can be used to read from one datasource and write to another. - // In such a case, the race condition described in the class-level docs cannot occur, - // and the action can simply fetch all visible segments for the datasource and interval - if (!task.getDataSource().equals(dataSource)) { - return retrieveAllVisibleSegments(toolbox); - } - - final String supervisorId; - if (task instanceof AbstractBatchSubtask) { - supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId(); - } else { - supervisorId = task.getId(); - } - - final Set replaceLocksForTask = toolbox - .getTaskLockbox() - .getAllReplaceLocksForDatasource(task.getDataSource()) - .stream() - .filter(lock -> supervisorId.equals(lock.getSupervisorTaskId())) - .collect(Collectors.toSet()); - - // If there are no replace locks for the task, simply fetch all visible segments for the interval - if (replaceLocksForTask.isEmpty()) { - return retrieveAllVisibleSegments(toolbox); - } - - Map>> intervalToCreatedToSegments = new HashMap<>(); - for (Pair segmentAndCreatedDate : - toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource, intervals)) { - final DataSegment segment = segmentAndCreatedDate.lhs; - final String created = segmentAndCreatedDate.rhs; - intervalToCreatedToSegments.computeIfAbsent(segment.getInterval(), s -> new HashMap<>()) - .computeIfAbsent(created, c -> new HashSet<>()) - .add(segment); - } - - Set allSegmentsToBeReplaced = new HashSet<>(); - for (final Map.Entry>> entry : intervalToCreatedToSegments.entrySet()) { - final Interval segmentInterval = entry.getKey(); - String lockVersion = null; - for (ReplaceTaskLock replaceLock : replaceLocksForTask) { - if (replaceLock.getInterval().contains(segmentInterval)) { - lockVersion = replaceLock.getVersion(); - } - } - final Map> createdToSegmentsMap = entry.getValue(); - for (Map.Entry> createdAndSegments : createdToSegmentsMap.entrySet()) { - if (lockVersion == null || lockVersion.compareTo(createdAndSegments.getKey()) > 0) { - allSegmentsToBeReplaced.addAll(createdAndSegments.getValue()); - } else { - for (DataSegment segment : createdAndSegments.getValue()) { - log.info("Ignoring segment[%s] as it has created_date[%s] greater than the REPLACE lock version[%s]", - segment.getId(), createdAndSegments.getKey(), lockVersion); - } - } - } - } - - return SegmentTimeline.forSegments(allSegmentsToBeReplaced) - .findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, Partitions.ONLY_COMPLETE); - } - - private Collection retrieveAllVisibleSegments(TaskActionToolbox toolbox) - { - return toolbox.getIndexerMetadataStorageCoordinator() - .retrieveUsedSegmentsForIntervals(dataSource, intervals, Segments.ONLY_VISIBLE); - } - - @Override - public boolean isAudited() - { - return false; - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - RetrieveSegmentsToReplaceAction that = (RetrieveSegmentsToReplaceAction) o; - return Objects.equals(dataSource, that.dataSource) && Objects.equals(intervals, that.intervals); - } - - @Override - public int hashCode() - { - return Objects.hash(dataSource, intervals); - } - @Override - public String toString() - { - return "RetrieveSegmentsToReplaceAction{" + - "dataSource='" + dataSource + '\'' + - ", intervals=" + intervals + - '}'; - } -} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java index fab8c4846894..29986eeba555 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java @@ -26,29 +26,47 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.common.task.batch.parallel.AbstractBatchSubtask; import org.apache.druid.indexing.overlord.Segments; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.ReplaceTaskLock; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.Partitions; +import org.apache.druid.timeline.SegmentTimeline; import org.joda.time.Interval; import javax.annotation.Nullable; import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; /** * This TaskAction returns a collection of segments which have data within the specified intervals and are marked as * used. + * If the task holds REPLACE locks and is writing back to the same datasource, + * only segments that were created before the REPLACE lock was acquired are returned for an interval. + * This ensures that the input set of segments for this replace task remains consistent + * even when new data is appended by other concurrent tasks. * * The order of segments within the returned collection is unspecified, but each segment is guaranteed to appear in * the collection only once. * - * @implNote This action doesn't produce a {@link java.util.Set} because it's implemented via {@link + * @implNote This action doesn't produce a {@link Set} because it's implemented via {@link * org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator#retrieveUsedSegmentsForIntervals} which returns - * a collection. Producing a {@link java.util.Set} would require an unnecessary copy of segments collection. + * a collection. Producing a {@link Set} would require an unnecessary copy of segments collection. */ public class RetrieveUsedSegmentsAction implements TaskAction> { + private static final Logger log = new Logger(RetrieveUsedSegmentsAction.class); + @JsonIgnore private final String dataSource; @@ -87,6 +105,11 @@ public RetrieveUsedSegmentsAction( this.visibility = visibility != null ? visibility : Segments.ONLY_VISIBLE; } + public RetrieveUsedSegmentsAction(String dataSource, Collection intervals) + { + this(dataSource, null, intervals, Segments.ONLY_VISIBLE); + } + @JsonProperty public String getDataSource() { @@ -113,6 +136,75 @@ public TypeReference> getReturnTypeReference() @Override public Collection perform(Task task, TaskActionToolbox toolbox) + { + // When fetching segments for a datasource other than the one this task is writing to, + // just return all segments with the needed visibility. + // This is because we can't ensure that the set of returned segments is consistent throughout the task's lifecycle + if (!task.getDataSource().equals(dataSource)) { + return retrieveUsedSegments(toolbox); + } + + final String supervisorId; + if (task instanceof AbstractBatchSubtask) { + supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId(); + } else { + supervisorId = task.getId(); + } + + final Set replaceLocksForTask = toolbox + .getTaskLockbox() + .getAllReplaceLocksForDatasource(task.getDataSource()) + .stream() + .filter(lock -> supervisorId.equals(lock.getSupervisorTaskId())) + .collect(Collectors.toSet()); + + // If there are no replace locks for the task, simply fetch all visible segments for the interval + if (replaceLocksForTask.isEmpty()) { + return retrieveUsedSegments(toolbox); + } + + Map>> intervalToCreatedToSegments = new HashMap<>(); + for (Pair segmentAndCreatedDate : + toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource, intervals)) { + final DataSegment segment = segmentAndCreatedDate.lhs; + final String createdDate = segmentAndCreatedDate.rhs; + intervalToCreatedToSegments.computeIfAbsent(segment.getInterval(), s -> new HashMap<>()) + .computeIfAbsent(createdDate, c -> new HashSet<>()) + .add(segment); + } + + Set allSegmentsToBeReplaced = new HashSet<>(); + for (final Map.Entry>> entry : intervalToCreatedToSegments.entrySet()) { + final Interval segmentInterval = entry.getKey(); + String lockVersion = null; + for (ReplaceTaskLock replaceLock : replaceLocksForTask) { + if (replaceLock.getInterval().contains(segmentInterval)) { + lockVersion = replaceLock.getVersion(); + break; + } + } + final Map> createdToSegmentsMap = entry.getValue(); + for (Map.Entry> createdAndSegments : createdToSegmentsMap.entrySet()) { + if (lockVersion == null || lockVersion.compareTo(createdAndSegments.getKey()) > 0) { + allSegmentsToBeReplaced.addAll(createdAndSegments.getValue()); + } else { + for (DataSegment segment : createdAndSegments.getValue()) { + log.info("Ignoring segment[%s] as it has created_date[%s] greater than the REPLACE lock version[%s]", + segment.getId(), createdAndSegments.getKey(), lockVersion); + } + } + } + } + + if (visibility == Segments.ONLY_VISIBLE) { + return SegmentTimeline.forSegments(allSegmentsToBeReplaced) + .findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, Partitions.ONLY_COMPLETE); + } else { + return allSegmentsToBeReplaced; + } + } + + private Collection retrieveUsedSegments(TaskActionToolbox toolbox) { return toolbox.getIndexerMetadataStorageCoordinator() .retrieveUsedSegmentsForIntervals(dataSource, intervals, visibility); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java index e251626f8690..171d53b9cdd6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java @@ -38,7 +38,6 @@ @JsonSubTypes.Type(name = "segmentTransactionalInsert", value = SegmentTransactionalInsertAction.class), @JsonSubTypes.Type(name = "segmentTransactionalAppend", value = SegmentTransactionalAppendAction.class), @JsonSubTypes.Type(name = "segmentTransactionalReplace", value = SegmentTransactionalReplaceAction.class), - @JsonSubTypes.Type(name = "retrieveSegmentsToReplace", value = RetrieveSegmentsToReplaceAction.class), // Type name doesn't correspond to the name of the class for backward compatibility. @JsonSubTypes.Type(name = "segmentListUsed", value = RetrieveUsedSegmentsAction.class), // Type name doesn't correspond to the name of the class for backward compatibility. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java index 3352735b9e0e..db48d6f07f7d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java @@ -79,7 +79,6 @@ public enum BatchProcessingMode private static final Period DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = new Period("PT5M"); private static final boolean DEFAULT_STORE_EMPTY_COLUMNS = true; private static final long DEFAULT_TMP_STORAGE_BYTES_PER_TASK = -1; - private static final boolean DEFAULT_ENABLE_CONCURRENT_APPEND_AND_REPLACE = false; @JsonProperty private final String baseDir; @@ -126,9 +125,6 @@ public enum BatchProcessingMode @JsonProperty private final long tmpStorageBytesPerTask; - @JsonProperty - private final boolean enableConcurrentAppendAndReplace; - @JsonCreator public TaskConfig( @JsonProperty("baseDir") String baseDir, @@ -146,8 +142,7 @@ public TaskConfig( @JsonProperty("batchProcessingMode") String batchProcessingMode, @JsonProperty("storeEmptyColumns") @Nullable Boolean storeEmptyColumns, @JsonProperty("encapsulatedTask") boolean enableTaskLevelLogPush, - @JsonProperty("tmpStorageBytesPerTask") @Nullable Long tmpStorageBytesPerTask, - @JsonProperty("enableConcurrentAppendAndReplace") @Nullable Boolean enableConcurrentAppendAndReplace + @JsonProperty("tmpStorageBytesPerTask") @Nullable Long tmpStorageBytesPerTask ) { this.baseDir = Configs.valueOrDefault(baseDir, System.getProperty("java.io.tmpdir")); @@ -198,10 +193,6 @@ public TaskConfig( this.storeEmptyColumns = Configs.valueOrDefault(storeEmptyColumns, DEFAULT_STORE_EMPTY_COLUMNS); this.tmpStorageBytesPerTask = Configs.valueOrDefault(tmpStorageBytesPerTask, DEFAULT_TMP_STORAGE_BYTES_PER_TASK); - this.enableConcurrentAppendAndReplace = Configs.valueOrDefault( - enableConcurrentAppendAndReplace, - DEFAULT_ENABLE_CONCURRENT_APPEND_AND_REPLACE - ); } private TaskConfig( @@ -219,8 +210,7 @@ private TaskConfig( BatchProcessingMode batchProcessingMode, boolean storeEmptyColumns, boolean encapsulatedTask, - long tmpStorageBytesPerTask, - boolean enableConcurrentAppendAndReplace + long tmpStorageBytesPerTask ) { this.baseDir = baseDir; @@ -238,7 +228,6 @@ private TaskConfig( this.storeEmptyColumns = storeEmptyColumns; this.encapsulatedTask = encapsulatedTask; this.tmpStorageBytesPerTask = tmpStorageBytesPerTask; - this.enableConcurrentAppendAndReplace = enableConcurrentAppendAndReplace; } @JsonProperty @@ -355,12 +344,6 @@ public long getTmpStorageBytesPerTask() return tmpStorageBytesPerTask; } - @JsonProperty("enableConcurrentAppendAndReplace") - public boolean isConcurrentAppendAndReplaceEnabled() - { - return enableConcurrentAppendAndReplace; - } - private String defaultDir(@Nullable String configParameter, final String defaultVal) { if (configParameter == null) { @@ -387,8 +370,7 @@ public TaskConfig withBaseTaskDir(File baseTaskDir) batchProcessingMode, storeEmptyColumns, encapsulatedTask, - tmpStorageBytesPerTask, - enableConcurrentAppendAndReplace + tmpStorageBytesPerTask ); } @@ -409,8 +391,7 @@ public TaskConfig withTmpStorageBytesPerTask(long tmpStorageBytesPerTask) batchProcessingMode, storeEmptyColumns, encapsulatedTask, - tmpStorageBytesPerTask, - enableConcurrentAppendAndReplace + tmpStorageBytesPerTask ); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index 94110e167e3e..4a76e688fb7a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -48,7 +48,6 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.indexing.input.InputRowSchemas; import org.apache.druid.indexing.overlord.SegmentPublishResult; -import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.JodaUtils; @@ -655,7 +654,7 @@ protected static List findInputSegments( { return ImmutableList.copyOf( actionClient.submit( - new RetrieveUsedSegmentsAction(dataSource, null, intervalsToRead, Segments.ONLY_VISIBLE) + new RetrieveUsedSegmentsAction(dataSource, intervalsToRead) ) ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 1a0e5c971fda..6959de2809de 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -60,7 +60,6 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.indexing.input.DruidInputSource; -import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; @@ -428,7 +427,7 @@ public List findSegmentsToLock(TaskActionClient taskActionClient, L throws IOException { return ImmutableList.copyOf( - taskActionClient.submit(new RetrieveUsedSegmentsAction(getDataSource(), null, intervals, Segments.ONLY_VISIBLE)) + taskActionClient.submit(new RetrieveUsedSegmentsAction(getDataSource(), intervals)) ); } @@ -1163,7 +1162,9 @@ static class SegmentProvider List findSegments(TaskActionClient actionClient) throws IOException { return new ArrayList<>( - actionClient.submit(new RetrieveUsedSegmentsAction(dataSource, interval, null, Segments.ONLY_VISIBLE)) + actionClient.submit( + new RetrieveUsedSegmentsAction(dataSource, ImmutableList.of(interval)) + ) ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java index 6b3c684e7947..15ba6788307e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java @@ -26,7 +26,6 @@ import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.task.batch.TooManyBucketsException; -import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; @@ -309,9 +308,7 @@ private List getExistingNonEmptyIntervalsOfDatasource( Collection usedSegmentsInInputInterval = taskActionClient.submit(new RetrieveUsedSegmentsAction( dataSource, - null, - condensedInputIntervals, - Segments.ONLY_VISIBLE + condensedInputIntervals )); for (DataSegment usedSegment : usedSegmentsInInputInterval) { for (Interval condensedInputInterval : condensedInputIntervals) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index 85617728e5e6..890a7c313fa4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -49,7 +49,7 @@ import org.apache.druid.data.input.impl.systemfield.SystemFieldDecoratorFactory; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TaskToolbox; -import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction; +import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.firehose.WindowedSegmentId; import org.apache.druid.java.util.common.CloseableIterators; @@ -546,7 +546,7 @@ public static List> getTimelineForInte Preconditions.checkNotNull(interval); final Collection usedSegments; - if (toolbox == null || !toolbox.getConfig().isConcurrentAppendAndReplaceEnabled()) { + if (toolbox == null) { usedSegments = FutureUtils.getUnchecked( coordinatorClient.fetchUsedSegments(dataSource, Collections.singletonList(interval)), true @@ -554,7 +554,10 @@ public static List> getTimelineForInte } else { try { usedSegments = toolbox.getTaskActionClient() - .submit(new RetrieveSegmentsToReplaceAction(dataSource, Collections.singletonList(interval))); + .submit(new RetrieveUsedSegmentsAction( + dataSource, + Collections.singletonList(interval) + )); } catch (IOException e) { LOG.error(e, "Error retrieving the used segments for interval[%s].", interval); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java index 35a28be6cbdf..c339a103b2d3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentCheckerTest.java @@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.TaskActionClient; -import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.segment.realtime.appenderator.UsedSegmentChecker; @@ -44,7 +43,7 @@ public void testBasic() throws IOException final TaskActionClient taskActionClient = EasyMock.createMock(TaskActionClient.class); EasyMock.expect( taskActionClient.submit( - new RetrieveUsedSegmentsAction("bar", Intervals.of("2002/P1D"), null, Segments.ONLY_VISIBLE) + new RetrieveUsedSegmentsAction("bar", ImmutableList.of(Intervals.of("2002/P1D"))) ) ).andReturn( ImmutableList.of( @@ -68,9 +67,7 @@ public void testBasic() throws IOException taskActionClient.submit( new RetrieveUsedSegmentsAction( "foo", - null, - ImmutableList.of(Intervals.of("2000/P1D"), Intervals.of("2001/P1D")), - Segments.ONLY_VISIBLE + ImmutableList.of(Intervals.of("2000/P1D"), Intervals.of("2001/P1D")) ) ) ).andReturn( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java index 24d2f0a90431..09f724bcfc1c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java @@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableList; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; @@ -96,7 +95,7 @@ private static DataSegment createSegment(Interval interval, String version) public void testRetrieveUsedSegmentsAction() { final RetrieveUsedSegmentsAction action = - new RetrieveUsedSegmentsAction(task.getDataSource(), INTERVAL, null, Segments.ONLY_VISIBLE); + new RetrieveUsedSegmentsAction(task.getDataSource(), ImmutableList.of(INTERVAL)); final Set resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox())); Assert.assertEquals(expectedUsedSegments, resultSegments); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java index 0876092cac11..99675fd57bb1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsActionSerdeTest.java @@ -56,9 +56,7 @@ public void testMultiIntervalSerde() throws Exception List intervals = ImmutableList.of(Intervals.of("2014/2015"), Intervals.of("2016/2017")); RetrieveUsedSegmentsAction expected = new RetrieveUsedSegmentsAction( "dataSource", - null, - intervals, - Segments.ONLY_VISIBLE + intervals ); RetrieveUsedSegmentsAction actual = diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java index 8b488fff8093..af920ebbeb73 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java @@ -41,7 +41,6 @@ public class TaskConfigBuilder private Boolean storeEmptyColumns; private boolean enableTaskLevelLogPush; private Long tmpStorageBytesPerTask; - private Boolean enableConcurrentAppendAndReplace; public TaskConfigBuilder setBaseDir(String baseDir) { @@ -133,18 +132,6 @@ public TaskConfigBuilder setTmpStorageBytesPerTask(Long tmpStorageBytesPerTask) return this; } - public TaskConfigBuilder enableConcurrentAppendAndReplace() - { - this.enableConcurrentAppendAndReplace = true; - return this; - } - - public TaskConfigBuilder disableConcurrentAppendAndReplace() - { - this.enableConcurrentAppendAndReplace = false; - return this; - } - public TaskConfig build() { return new TaskConfig( @@ -162,8 +149,7 @@ public TaskConfig build() batchProcessingMode, storeEmptyColumns, enableTaskLevelLogPush, - tmpStorageBytesPerTask, - enableConcurrentAppendAndReplace + tmpStorageBytesPerTask ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java index 7f83a8f02332..3fda953a4541 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java @@ -27,7 +27,6 @@ import org.apache.druid.indexing.common.TaskStorageDirTracker; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TaskToolboxFactory; -import org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; @@ -955,7 +954,7 @@ private void verifyInputSegments(Task task, Interval interval, DataSegment... ex try { final TaskActionClient taskActionClient = taskActionClientFactory.create(task); Collection allUsedSegments = taskActionClient.submit( - new RetrieveSegmentsToReplaceAction( + new RetrieveUsedSegmentsAction( WIKI, Collections.singletonList(interval) ) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 4f0aacd1cec3..94d21c144dc1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -616,7 +616,6 @@ private TaskToolboxFactory setUpTaskToolboxFactory( .setDefaultRowFlushBoundary(50000) .setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name()) .setTmpStorageBytesPerTask(-1L) - .enableConcurrentAppendAndReplace() .build(); return new TaskToolboxFactory(