Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run compaction as a supervisor on Overlord #16768

Merged
merged 35 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
d89802a
Add Compaction Scheduler
kfaraz Jul 22, 2024
bea1078
Fix checkstyle, forbidden API
kfaraz Jul 22, 2024
9455547
Add some tests
kfaraz Jul 22, 2024
26d4bdc
Add CompactionDutySimulator
kfaraz Jul 24, 2024
caefb27
Fix tests and checkstyle
kfaraz Jul 24, 2024
bbc2eb6
More changes for simulator
kfaraz Jul 27, 2024
8be913e
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Jul 27, 2024
23899b8
Use compaction simulator on both coordinator and overlord
kfaraz Jul 30, 2024
9c06504
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Jul 30, 2024
f972ea2
Handle master changes
kfaraz Jul 30, 2024
fdcfdf1
Add CompactionSupervisor
kfaraz Aug 8, 2024
4b81779
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Aug 8, 2024
4942a08
Add test for OverlordCompactionScheduler
kfaraz Aug 9, 2024
b2ae560
Add more tests
kfaraz Aug 9, 2024
ad98383
Fix flow
kfaraz Aug 10, 2024
2e676be
Remove extra sout
kfaraz Aug 10, 2024
e793f44
Fix CompactSegmentsTest
kfaraz Aug 10, 2024
8ece3f0
Do not call taskStatus API with empty taskIds
kfaraz Aug 10, 2024
2f152e2
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Aug 23, 2024
382bfdf
Add javadocs, cleanup, etc.
kfaraz Aug 23, 2024
e111d71
Fix checkstyle, add tests
kfaraz Aug 23, 2024
93284b3
Handle concurrency, validation, cleanup stale entries
kfaraz Aug 28, 2024
273822b
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Aug 28, 2024
117b0ab
Fix test, avoid metadata store calls
kfaraz Aug 28, 2024
0a584d4
Redirect compaction progress APIs to Overlord
kfaraz Aug 30, 2024
bfc9ceb
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Aug 30, 2024
ab711ed
Bind CompactionSupervisorConfig in SupervisorModule
kfaraz Aug 30, 2024
e3012f3
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Aug 30, 2024
bca9810
Minor cleanup and tests
kfaraz Aug 31, 2024
6d84927
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Aug 31, 2024
b439935
Remove unused methods, cleanup tests
kfaraz Sep 1, 2024
517f9ad
Rename SegmentsToCompact to CompactionCandidate
kfaraz Sep 1, 2024
eceb767
Fix tests
kfaraz Sep 1, 2024
9d04d9b
Fix checkstyle
kfaraz Sep 1, 2024
3d2e58f
Merge branch 'master' of github.com:apache/druid into compact_scheduler
kfaraz Sep 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.server.coordinator.compact.CompactionSegmentIterator;
import org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.compact.NewestSegmentFirstPolicy;
import org.apache.druid.server.compaction.CompactionSegmentIterator;
import org.apache.druid.server.compaction.CompactionSegmentSearchPolicy;
import org.apache.druid.server.compaction.CompactionStatusTracker;
import org.apache.druid.server.compaction.NewestSegmentFirstPolicy;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.NumberedShardSpec;
Expand Down Expand Up @@ -61,7 +62,7 @@ public class NewestSegmentFirstPolicyBenchmark
{
private static final String DATA_SOURCE_PREFIX = "dataSource_";

private final CompactionSegmentSearchPolicy policy = new NewestSegmentFirstPolicy(new DefaultObjectMapper());
private final CompactionSegmentSearchPolicy policy = new NewestSegmentFirstPolicy(null);

@Param("100")
private int numDataSources;
Expand Down Expand Up @@ -132,7 +133,12 @@ public void setup()
@Benchmark
public void measureNewestSegmentFirstPolicy(Blackhole blackhole)
{
final CompactionSegmentIterator iterator = policy.createIterator(compactionConfigs, dataSources, Collections.emptyMap());
final CompactionSegmentIterator iterator = policy.createIterator(
compactionConfigs,
dataSources,
Collections.emptyMap(),
new CompactionStatusTracker(new DefaultObjectMapper())
);
for (int i = 0; i < numCompactionTaskSlots && iterator.hasNext(); i++) {
blackhole.consume(iterator.next());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.compact;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
import org.apache.druid.client.indexing.IndexingWorkerInfo;
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.task.CompactionTask;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueryTool;
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.http.TotalWorkerCapacityResponse;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

/**
* Dummy Overlord client used by the {@link OverlordCompactionScheduler} to fetch
* task related info. This client simply redirects all queries to the
* {@link TaskQueryTool} and all updates to the {@link TaskQueue}.
*/
class LocalOverlordClient implements OverlordClient
{
private static final Logger log = new Logger(LocalOverlordClient.class);

private final TaskMaster taskMaster;
private final TaskQueryTool taskQueryTool;
private final ObjectMapper objectMapper;

LocalOverlordClient(TaskMaster taskMaster, TaskQueryTool taskQueryTool, ObjectMapper objectMapper)
{
this.taskMaster = taskMaster;
this.taskQueryTool = taskQueryTool;
this.objectMapper = objectMapper;
}

@Override
public ListenableFuture<Void> runTask(String taskId, Object clientTaskQuery)
{
return futureOf(() -> {
getValidTaskQueue().add(
convertTask(clientTaskQuery, ClientCompactionTaskQuery.class, CompactionTask.class)
);
return null;
});
}

@Override
public ListenableFuture<Void> cancelTask(String taskId)
{
return futureOf(() -> {
getValidTaskQueue().shutdown(taskId, "Shutdown by Compaction Scheduler");
return null;
});
}

@Override
public ListenableFuture<TaskPayloadResponse> taskPayload(String taskId)
{
ClientCompactionTaskQuery taskPayload = taskQueryTool.getTask(taskId).transform(
task -> convertTask(task, CompactionTask.class, ClientCompactionTaskQuery.class)
).orNull();
return futureOf(() -> new TaskPayloadResponse(taskId, taskPayload));
}

@Override
public ListenableFuture<CloseableIterator<TaskStatusPlus>> taskStatuses(
@Nullable String state,
@Nullable String dataSource,
@Nullable Integer maxCompletedTasks
)
{
final ListenableFuture<List<TaskStatusPlus>> tasksFuture
= futureOf(taskQueryTool::getAllActiveTasks);
return Futures.transform(
tasksFuture,
taskList -> CloseableIterators.withEmptyBaggage(taskList.iterator()),
Execs.directExecutor()
);
}

@Override
public ListenableFuture<Map<String, TaskStatus>> taskStatuses(Set<String> taskIds)
{
return futureOf(() -> taskQueryTool.getMultipleTaskStatuses(taskIds));
}

@Override
public ListenableFuture<Map<String, List<Interval>>> findLockedIntervals(
List<LockFilterPolicy> lockFilterPolicies
)
{
return futureOf(() -> taskQueryTool.getLockedIntervals(lockFilterPolicies));
}

@Override
public ListenableFuture<IndexingTotalWorkerCapacityInfo> getTotalWorkerCapacity()
{
return futureOf(() -> convert(taskQueryTool.getTotalWorkerCapacity()));
}

private TaskQueue getValidTaskQueue()
{
Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
if (taskQueue.isPresent()) {
return taskQueue.get();
} else {
throw DruidException.defensive("No TaskQueue. Cannot proceed.");
}
}

private <T> ListenableFuture<T> futureOf(Supplier<T> supplier)
{
try {
return Futures.immediateFuture(supplier.get());
}
catch (Exception e) {
return Futures.immediateFailedFuture(e);
}
}

private IndexingTotalWorkerCapacityInfo convert(TotalWorkerCapacityResponse capacity)
{
if (capacity == null) {
return null;
} else {
return new IndexingTotalWorkerCapacityInfo(
capacity.getCurrentClusterCapacity(),
capacity.getMaximumCapacityWithAutoScale()
);
}
}

private <U, V> V convertTask(Object taskPayload, Class<U> inputType, Class<V> outputType)
{
if (taskPayload == null) {
return null;
} else if (!inputType.isInstance(taskPayload)) {
throw DruidException.defensive(
"Unknown type[%s] for compaction task. Expected type[%s].",
taskPayload.getClass().getSimpleName(), inputType.getSimpleName()
);
}

try {
return objectMapper.readValue(
objectMapper.writeValueAsBytes(taskPayload),
outputType
);
}
catch (IOException e) {
log.warn(e, "Could not convert task[%s] to client compatible object", taskPayload);
throw DruidException.defensive(
"Could not convert task[%s] to compatible object.",
taskPayload
);
}
}

// Unsupported methods as these are not used by the CompactionScheduler / CompactSegments duty

@Override
public ListenableFuture<URI> findCurrentLeader()
{
throw new UnsupportedOperationException();
}

@Override
public ListenableFuture<TaskStatusResponse> taskStatus(String taskId)
{
throw new UnsupportedOperationException();
}

@Override
public ListenableFuture<TaskReport.ReportMap> taskReportAsMap(String taskId)
{
throw new UnsupportedOperationException();
}

@Override
public ListenableFuture<CloseableIterator<SupervisorStatus>> supervisorStatuses()
{
throw new UnsupportedOperationException();
}

@Override
public ListenableFuture<Integer> killPendingSegments(String dataSource, Interval interval)
{
throw new UnsupportedOperationException();
}

@Override
public ListenableFuture<List<IndexingWorkerInfo>> getWorkers()
{
throw new UnsupportedOperationException();
}

@Override
public OverlordClient withRetryPolicy(ServiceRetryPolicy retryPolicy)
{
return this;
}
}
Loading
Loading