Skip to content

Commit

Permalink
Move segment update APIs from Coordinator to Overlord (#17545)
Browse files Browse the repository at this point in the history
Summary of changes
---------------------
- Add `OverlordDataSourcesResource` with APIs to mark segments used/unused
- Add corresponding methods to `OverlordClient`
- Deprecate Coordinator APIs to update segments
- Use `OverlordClient` in `DataSourcesResource` so that Coordinator APIs internally
call the corresponding Overlord APIs
- If the API call fails, fall back to updating the metadata store directly
- Audit these actions only on the Overlord

Other minor changes
---------------------
- Do not perform null check on `OverlordClient` on the coordinator side `DataSourcesResource`.
`OverlordClient` is always non-null in production.
- Add new tests, fix existing ones
- Complete the implementation of `TestSegmentsMetadataManager`

New Overlord APIs
------------------
- Mark all segments of a datasource as unused:
`POST /druid/indexer/v1/datasources/{dataSourceName}`
- Mark all (non-overshadowed) segments of a datasource as used:
`DELETE /druid/indexer/v1/datasources/{dataSourceName}`
- Mark multiple segments as used
`POST /druid/indexer/v1/datasources/{dataSourceName}/markUsed`
- Mark multiple (non-overshadowed) segments as unused
`POST /druid/indexer/v1/datasources/{dataSourceName}/markUnused`
- Mark a single segment as used:
`POST /druid/indexer/v1/datasources/{dataSourceName}/segments/{segmentId}`
- Mark a single segment as unused:
`DELETE /druid/indexer/v1/datasources/{dataSourceName}/segments/{segmentId}`
  • Loading branch information
kfaraz authored Dec 19, 2024
1 parent f7c2c0a commit d9a58a7
Show file tree
Hide file tree
Showing 50 changed files with 1,979 additions and 696 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.RunnerTaskState;
Expand All @@ -48,6 +47,7 @@
import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
import org.apache.druid.msq.indexing.error.UnknownFault;
import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault;
import org.apache.druid.rpc.indexing.NoopOverlordClient;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.utils.CollectionUtils;
import org.junit.Assert;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.google.inject.Injector;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.RunnerTaskState;
Expand Down Expand Up @@ -67,6 +66,7 @@
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.rpc.indexing.NoopOverlordClient;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.DruidNode;
import org.mockito.ArgumentMatchers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.inject.Injector;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.indexer.TaskStatus;
Expand All @@ -48,6 +47,7 @@
import org.apache.druid.msq.indexing.report.MSQStatusReport;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.rpc.indexing.NoopOverlordClient;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,33 +25,25 @@
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
import org.apache.druid.client.indexing.IndexingWorkerInfo;
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.task.CompactionTask;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueryTool;
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.http.TotalWorkerCapacityResponse;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.compaction.CompactionProgressResponse;
import org.apache.druid.server.compaction.CompactionStatusResponse;
import org.apache.druid.rpc.indexing.NoopOverlordClient;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -62,7 +54,7 @@
* task related info. This client simply redirects all queries to the
* {@link TaskQueryTool} and all updates to the {@link TaskQueue}.
*/
class LocalOverlordClient implements OverlordClient
class LocalOverlordClient extends NoopOverlordClient
{
private static final Logger log = new Logger(LocalOverlordClient.class);

Expand Down Expand Up @@ -199,66 +191,4 @@ private <U, V> V convertTask(Object taskPayload, Class<U> inputType, Class<V> ou
);
}
}

// Unsupported methods as these are not used by the CompactionScheduler / CompactSegments duty

@Override
public ListenableFuture<URI> findCurrentLeader()
{
throw new UnsupportedOperationException();
}

@Override
public ListenableFuture<TaskStatusResponse> taskStatus(String taskId)
{
throw new UnsupportedOperationException();
}

@Override
public ListenableFuture<TaskReport.ReportMap> taskReportAsMap(String taskId)
{
throw new UnsupportedOperationException();
}

@Override
public ListenableFuture<CloseableIterator<SupervisorStatus>> supervisorStatuses()
{
throw new UnsupportedOperationException();
}

@Override
public ListenableFuture<Integer> killPendingSegments(String dataSource, Interval interval)
{
throw new UnsupportedOperationException();
}

@Override
public ListenableFuture<List<IndexingWorkerInfo>> getWorkers()
{
throw new UnsupportedOperationException();
}

@Override
public ListenableFuture<CompactionStatusResponse> getCompactionSnapshots(@Nullable String dataSource)
{
throw new UnsupportedOperationException();
}

@Override
public ListenableFuture<CompactionProgressResponse> getBytesAwaitingCompaction(String dataSource)
{
throw new UnsupportedOperationException();
}

@Override
public ListenableFuture<Boolean> isCompactionSupervisorEnabled()
{
throw new UnsupportedOperationException();
}

@Override
public OverlordClient withRetryPolicy(ServiceRetryPolicy retryPolicy)
{
return this;
}
}
Loading

0 comments on commit d9a58a7

Please sign in to comment.