Skip to content

Commit

Permalink
Improve run time of coordinator duty MarkAsUnusedOvershadowedSegments (
Browse files Browse the repository at this point in the history
…#13287)

In clusters with a large number of segments, the duty `MarkAsUnusedOvershadowedSegments`
can take a long very long time to finish. This is because of the costly invocation of 
`timeline.isOvershadowed` which is done for every used segment in every coordinator run.

Changes
- Use `DataSourceSnapshot.getOvershadowedSegments` to get all overshadowed segments
- Iterate over this set instead of all used segments to identify segments that can be marked as unused
- Mark segments as unused in the DB in batches rather than one at a time
- Refactor: Add class `SegmentTimeline` for ease of use and readability while using a
`VersionedIntervalTimeline` of segments.
  • Loading branch information
kfaraz authored Nov 1, 2022
1 parent 0d03ce4 commit fd7864a
Show file tree
Hide file tree
Showing 34 changed files with 281 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.druid.server.coordinator.duty.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.duty.NewestSegmentFirstPolicy;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -82,7 +82,7 @@ public class NewestSegmentFirstPolicyBenchmark
private int numCompactionTaskSlots;

private Map<String, DataSourceCompactionConfig> compactionConfigs;
private Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources;
private Map<String, SegmentTimeline> dataSources;

@Setup(Level.Trial)
public void setup()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class VersionedIntervalTimelineBenchmark

private List<Interval> intervals;
private List<DataSegment> segments;
private VersionedIntervalTimeline<String, DataSegment> timeline;
private SegmentTimeline timeline;
private List<DataSegment> newSegments;

@Setup
Expand Down Expand Up @@ -143,7 +143,7 @@ public void setup()
nextMinorVersions.put(interval, (short) (numNonRootGenerations + 1));
}

timeline = VersionedIntervalTimeline.forSegments(segments);
timeline = SegmentTimeline.forSegments(segments);

newSegments = new ArrayList<>(200);

Expand Down Expand Up @@ -206,7 +206,7 @@ public void setup()
@Benchmark
public void benchAdd(Blackhole blackhole)
{
final VersionedIntervalTimeline<String, DataSegment> timeline = VersionedIntervalTimeline.forSegments(segments);
final SegmentTimeline timeline = SegmentTimeline.forSegments(segments);
for (DataSegment newSegment : newSegments) {
timeline.add(
newSegment.getInterval(),
Expand All @@ -220,7 +220,7 @@ public void benchAdd(Blackhole blackhole)
public void benchRemove(Blackhole blackhole)
{
final List<DataSegment> segmentsCopy = new ArrayList<>(segments);
final VersionedIntervalTimeline<String, DataSegment> timeline = VersionedIntervalTimeline.forSegments(segmentsCopy);
final SegmentTimeline timeline = SegmentTimeline.forSegments(segmentsCopy);
final int numTests = (int) (segmentsCopy.size() * 0.1);
for (int i = 0; i < numTests; i++) {
final DataSegment segment = segmentsCopy.remove(ThreadLocalRandom.current().nextInt(segmentsCopy.size()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* An Overshadowable overshadows another if its root partition range contains that of another
* and has a higher minorVersion. For more details, check https://github.com/apache/druid/issues/7491.
*/
public interface Overshadowable<T extends Overshadowable>
public interface Overshadowable<T extends Overshadowable<T>>
{
/**
* Returns true if this overshadowable overshadows the other.
Expand Down
52 changes: 52 additions & 0 deletions core/src/main/java/org/apache/druid/timeline/SegmentTimeline.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.timeline;

import java.util.Comparator;
import java.util.Iterator;

/**
* {@link VersionedIntervalTimeline} for {@link DataSegment} objects.
*/
public class SegmentTimeline extends VersionedIntervalTimeline<String, DataSegment>
{
public static SegmentTimeline forSegments(Iterable<DataSegment> segments)
{
return forSegments(segments.iterator());
}

public static SegmentTimeline forSegments(Iterator<DataSegment> segments)
{
final SegmentTimeline timeline = new SegmentTimeline();
VersionedIntervalTimeline.addSegments(timeline, segments);
return timeline;
}

public SegmentTimeline()
{
super(Comparator.naturalOrder());
}

public boolean isOvershadowed(DataSegment segment)
{
return isOvershadowed(segment.getInterval(), segment.getVersion(), segment);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,6 @@
public class VersionedIntervalTimeline<VersionType, ObjectType extends Overshadowable<ObjectType>>
implements TimelineLookup<VersionType, ObjectType>
{
public static VersionedIntervalTimeline<String, DataSegment> forSegments(Iterable<DataSegment> segments)
{
return forSegments(segments.iterator());
}

public static VersionedIntervalTimeline<String, DataSegment> forSegments(Iterator<DataSegment> segments)
{
final VersionedIntervalTimeline<String, DataSegment> timeline =
new VersionedIntervalTimeline<>(Comparator.naturalOrder());
addSegments(timeline, segments);
return timeline;
}

private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);

// Below timelines stores only *visible* timelineEntries
Expand All @@ -106,16 +93,16 @@ public static VersionedIntervalTimeline<String, DataSegment> forSegments(Iterato
private final Comparator<? super VersionType> versionComparator;

// Set this to true if the client needs to skip tombstones upon lookup (like the broker)
private boolean skipObjectsWithNoData = false;
private final boolean skipObjectsWithNoData;

public VersionedIntervalTimeline(Comparator<? super VersionType> versionComparator)
{
this.versionComparator = versionComparator;
this(versionComparator, false);
}

public VersionedIntervalTimeline(Comparator<? super VersionType> versionComparator, boolean skipObjectsWithNoData)
{
this(versionComparator);
this.versionComparator = versionComparator;
this.skipObjectsWithNoData = skipObjectsWithNoData;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.timeline;

import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.junit.Assert;
import org.junit.Test;

import java.util.Arrays;
import java.util.Collections;

public class SegmentTimelineTest
{

@Test
public void testIsOvershadowed()
{
final SegmentTimeline timeline = SegmentTimeline.forSegments(
Arrays.asList(
createSegment("2022-01-01/2022-01-02", "v1", 0, 3),
createSegment("2022-01-01/2022-01-02", "v1", 1, 3),
createSegment("2022-01-01/2022-01-02", "v1", 2, 3),
createSegment("2022-01-02/2022-01-03", "v2", 0, 2),
createSegment("2022-01-02/2022-01-03", "v2", 1, 2)
)
);

Assert.assertFalse(
timeline.isOvershadowed(createSegment("2022-01-01/2022-01-02", "v1", 1, 3))
);
Assert.assertFalse(
timeline.isOvershadowed(createSegment("2022-01-01/2022-01-02", "v1", 2, 3))
);
Assert.assertFalse(
timeline.isOvershadowed(createSegment("2022-01-01/2022-01-02", "v1", 1, 4))
);
Assert.assertFalse(
timeline.isOvershadowed(createSegment("2022-01-01T00:00:00/2022-01-01T06:00:00", "v1", 1, 4))
);

Assert.assertTrue(
timeline.isOvershadowed(createSegment("2022-01-02/2022-01-03", "v1", 2, 4))
);
Assert.assertTrue(
timeline.isOvershadowed(createSegment("2022-01-02/2022-01-03", "v1", 0, 1))
);
}

private DataSegment createSegment(String interval, String version, int partitionNum, int totalNumPartitions)
{
return new DataSegment(
"wiki",
Intervals.of(interval),
version,
Collections.emptyMap(),
Collections.emptyList(),
Collections.emptyList(),
new NumberedShardSpec(partitionNum, totalNumPartitions),
0x9,
1L
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@
import org.apache.druid.server.DruidNode;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
Expand Down Expand Up @@ -957,7 +957,7 @@ private DataSegmentTimelineView makeDataSegmentTimelineView()
if (dataSegments.isEmpty()) {
return Optional.empty();
} else {
return Optional.of(VersionedIntervalTimeline.forSegments(dataSegments));
return Optional.of(SegmentTimeline.forSegments(dataSegments));
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -79,15 +79,14 @@ public class TableInputSpecSlicerTest extends InitializedNullHandlingTest
BYTES_PER_SEGMENT
);

private VersionedIntervalTimeline<String, DataSegment> timeline;
private DataSegmentTimelineView timelineView;
private SegmentTimeline timeline;
private TableInputSpecSlicer slicer;

@Before
public void setUp()
{
timeline = VersionedIntervalTimeline.forSegments(ImmutableList.of(SEGMENT1, SEGMENT2));
timelineView = (dataSource, intervals) -> {
timeline = SegmentTimeline.forSegments(ImmutableList.of(SEGMENT1, SEGMENT2));
DataSegmentTimelineView timelineView = (dataSource, intervals) -> {
if (DATASOURCE.equals(dataSource)) {
return Optional.of(timeline);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.IngestionSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.joda.time.Interval;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -199,8 +198,7 @@ public static void updateSegmentListIfDatasourcePathSpecIsUsed(
}
}

final VersionedIntervalTimeline<String, DataSegment> timeline =
VersionedIntervalTimeline.forSegments(usedVisibleSegments);
final SegmentTimeline timeline = SegmentTimeline.forSegments(usedVisibleSegments);
final List<WindowedDataSegment> windowedSegments = new ArrayList<>();
for (Interval interval : ingestionSpecObj.getIntervals()) {
final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = timeline.lookup(interval);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.TombstoneShardSpec;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -483,7 +483,7 @@ private LockGranularityDetermineResult determineSegmentGranularity(List<DataSegm
} else {
// Use segment lock
// Create a timeline to find latest segments only
final VersionedIntervalTimeline<String, DataSegment> timeline = VersionedIntervalTimeline.forSegments(
final SegmentTimeline timeline = SegmentTimeline.forSegments(
segments
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.joda.time.Duration;
Expand Down Expand Up @@ -735,7 +735,7 @@ private static NonnullPair<Map<DataSegment, File>, List<TimelineObjectHolder<Str
segmentProvider.findSegments(toolbox.getTaskActionClient());
segmentProvider.checkSegments(lockGranularityInUse, usedSegments);
final Map<DataSegment, File> segmentFileMap = toolbox.fetchSegments(usedSegments);
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = VersionedIntervalTimeline
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = SegmentTimeline
.forSegments(usedSegments)
.lookup(segmentProvider.interval);
return new NonnullPair<>(segmentFileMap, timelineSegments);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.Interval;
Expand Down Expand Up @@ -267,7 +267,7 @@ public TaskStatus runTask(final TaskToolbox toolbox)
// Find inputSegments overshadowed by pushedSegments
final Set<DataSegment> allSegments = new HashSet<>(getTaskLockHelper().getLockedExistingSegments());
allSegments.addAll(pushedSegments);
final VersionedIntervalTimeline<String, DataSegment> timeline = VersionedIntervalTimeline.forSegments(allSegments);
final SegmentTimeline timeline = SegmentTimeline.forSegments(allSegments);
final Set<DataSegment> oldSegments = FluentIterable.from(timeline.findFullyOvershadowed())
.transformAndConcat(TimelineObjectHolder::getObject)
.transform(PartitionChunk::getObject)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.apache.druid.utils.Streams;
Expand Down Expand Up @@ -509,7 +509,7 @@ public static List<TimelineObjectHolder<String, DataSegment>> getTimelineForInte
}
}

return VersionedIntervalTimeline.forSegments(usedSegments).lookup(interval);
return SegmentTimeline.forSegments(usedSegments).lookup(interval);
}

public static List<TimelineObjectHolder<String, DataSegment>> getTimelineForSegmentIds(
Expand Down
Loading

0 comments on commit fd7864a

Please sign in to comment.