Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add coordinator API for unused segments #14846

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,12 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer)
);
}
}

taskContextOverridesBuilder.put(
MultiStageQueryContext.CTX_IS_REINDEX,
MSQControllerTask.isReindexTask(task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets rename this to isReplaceTask ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only for a replace task which reads from the datasource it is replacing, so wouldn't reindex be better?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaceInputDataSourceTask? How does that sound ?

);

this.workerTaskLauncher = new MSQWorkerTaskLauncher(
id(),
task.getDataSource(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public TaskDataSegmentProvider(
@Override
public Supplier<ResourceHolder<Segment>> fetchSegment(
final SegmentId segmentId,
final ChannelCounters channelCounters
final ChannelCounters channelCounters,
final boolean isReindex
)
{
// Returns Supplier<ResourceHolder> instead of ResourceHolder, so the Coordinator calls and segment downloads happen
Expand All @@ -84,7 +85,7 @@ public Supplier<ResourceHolder<Segment>> fetchSegment(
holder = holders.computeIfAbsent(
segmentId,
k -> new SegmentHolder(
() -> fetchSegmentInternal(segmentId, channelCounters),
() -> fetchSegmentInternal(segmentId, channelCounters, isReindex),
() -> holders.remove(segmentId)
)
).get();
Expand All @@ -95,20 +96,22 @@ public Supplier<ResourceHolder<Segment>> fetchSegment(
}

/**
* Helper used by {@link #fetchSegment(SegmentId, ChannelCounters)}. Does the actual fetching of a segment, once it
* Helper used by {@link #fetchSegment(SegmentId, ChannelCounters, boolean)}. Does the actual fetching of a segment, once it
* is determined that we definitely need to go out and get one.
*/
private ReferenceCountingResourceHolder<Segment> fetchSegmentInternal(
final SegmentId segmentId,
final ChannelCounters channelCounters
final ChannelCounters channelCounters,
final boolean isReindex
)
{
final DataSegment dataSegment;
try {
dataSegment = FutureUtils.get(
coordinatorClient.fetchSegment(
segmentId.getDataSource(),
segmentId.toString()
segmentId.toString(),
!isReindex
),
true
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1099,7 +1099,12 @@ private void makeInputSliceReader()
.put(ExternalInputSlice.class, new ExternalInputSliceReader(frameContext.tempDir()))
.put(InlineInputSlice.class, new InlineInputSliceReader(frameContext.segmentWrangler()))
.put(LookupInputSlice.class, new LookupInputSliceReader(frameContext.segmentWrangler()))
.put(SegmentsInputSlice.class, new SegmentsInputSliceReader(frameContext.dataSegmentProvider()))
.put(SegmentsInputSlice.class,
new SegmentsInputSliceReader(
frameContext.dataSegmentProvider(),
MultiStageQueryContext.isReindex(QueryContext.of(task().getContext()))
)
)
.build()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,16 @@ public static boolean isIngestion(final MSQSpec querySpec)
return querySpec.getDestination() instanceof DataSourceMSQDestination;
}

public static boolean isReindexTask(MSQControllerTask task)
{
return task.getQuerySpec()
.getQuery()
.getDataSource()
.getTableNames()
.stream()
.anyMatch(datasouce -> task.getDataSource().equals(datasouce));
}

public static boolean writeResultsToDurableStorage(final MSQSpec querySpec)
{
return querySpec.getDestination() instanceof DurableStorageMSQDestination;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@
public class SegmentsInputSliceReader implements InputSliceReader
{
private final DataSegmentProvider dataSegmentProvider;
private final boolean isReindex;

public SegmentsInputSliceReader(final DataSegmentProvider dataSegmentProvider)
public SegmentsInputSliceReader(final DataSegmentProvider dataSegmentProvider, final boolean isReindex)
{
this.dataSegmentProvider = dataSegmentProvider;
this.isReindex = isReindex;
}

@Override
Expand Down Expand Up @@ -91,7 +93,7 @@ private Iterator<SegmentWithDescriptor> dataSegmentIterator(
);

return new SegmentWithDescriptor(
dataSegmentProvider.fetchSegment(segmentId, channelCounters),
dataSegmentProvider.fetchSegment(segmentId, channelCounters, isReindex),
descriptor
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public interface DataSegmentProvider
*/
Supplier<ResourceHolder<Segment>> fetchSegment(
SegmentId segmentId,
ChannelCounters channelCounters
ChannelCounters channelCounters,
boolean isReindex
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
Expand All @@ -59,6 +60,7 @@ public class MSQTaskSqlEngine implements SqlEngine
ImmutableSet.<String>builder()
.addAll(NativeSqlEngine.SYSTEM_CONTEXT_PARAMETERS)
.add(QueryKitUtils.CTX_TIME_COLUMN_NAME)
.add(MultiStageQueryContext.CTX_IS_REINDEX)
.build();

public static final List<String> TASK_STRUCT_FIELD_NAMES = ImmutableList.of("TASK");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ public class MultiStageQueryContext
// OnheapIncrementalIndex. For example: overheads related to creating bitmaps during persist.
static final int DEFAULT_ROWS_IN_MEMORY = 100000;

public static final String CTX_IS_REINDEX = "isReindex";

/**
* Controls sort order within segments. Normally, this is the same as the overall order of the query (from the
* CLUSTERED BY clause) but it can be overridden.
Expand Down Expand Up @@ -146,6 +148,14 @@ public static boolean isFaultToleranceEnabled(final QueryContext queryContext)
);
}

public static boolean isReindex(final QueryContext queryContext)
{
return queryContext.getBoolean(
CTX_IS_REINDEX,
true
);
}

public static long getMaxInputBytesPerWorker(final QueryContext queryContext)
{
return queryContext.getLong(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public void testConcurrency()
final int expectedSegmentNumber = i % NUM_SEGMENTS;
final DataSegment segment = segments.get(expectedSegmentNumber);
final ListenableFuture<Supplier<ResourceHolder<Segment>>> f =
exec.submit(() -> provider.fetchSegment(segment.getId(), new ChannelCounters()));
exec.submit(() -> provider.fetchSegment(segment.getId(), new ChannelCounters(), false));

testFutures.add(
FutureUtils.transform(
Expand Down Expand Up @@ -231,7 +231,7 @@ public void testConcurrency()
private class TestCoordinatorClientImpl extends NoopCoordinatorClient
{
@Override
public ListenableFuture<DataSegment> fetchSegment(String dataSource, String segmentId)
public ListenableFuture<DataSegment> fetchSegment(String dataSource, String segmentId, boolean includeUnused)
{
for (final DataSegment segment : segments) {
if (segment.getDataSource().equals(dataSource) && segment.getId().toString().equals(segmentId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public String getFormatString()
));
binder.bind(DataSegmentAnnouncer.class).toInstance(new NoopDataSegmentAnnouncer());
binder.bind(DataSegmentProvider.class)
.toInstance((dataSegment, channelCounters) -> getSupplierForSegment(dataSegment));
.toInstance((segmentId, channelCounters, isReindex) -> getSupplierForSegment(segmentId));

GroupByQueryConfig groupByQueryConfig = new GroupByQueryConfig();
binder.bind(GroupByStrategySelector.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ public String getFormatString()
binder.bind(QueryProcessingPool.class)
.toInstance(new ForwardingQueryProcessingPool(Execs.singleThreaded("Test-runner-processing-pool")));
binder.bind(DataSegmentProvider.class)
.toInstance((dataSegment, channelCounters) -> getSupplierForSegment(dataSegment));
.toInstance((dataSegment, channelCounters, isReindex) -> getSupplierForSegment(dataSegment));
binder.bind(IndexIO.class).toInstance(indexIO);
binder.bind(SpecificSegmentsQuerySegmentWalker.class).toInstance(qf.walker());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ public static List<TimelineObjectHolder<String, DataSegment>> getTimelineForSegm
);
for (WindowedSegmentId windowedSegmentId : Preconditions.checkNotNull(segmentIds, "segmentIds")) {
final DataSegment segment = FutureUtils.getUnchecked(
coordinatorClient.fetchUsedSegment(dataSource, windowedSegmentId.getSegmentId()),
coordinatorClient.fetchSegment(dataSource, windowedSegmentId.getSegmentId(), false),
true
);
for (Interval interval : windowedSegmentId.getIntervals()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,7 @@ public ListenableFuture<List<DataSegment>> fetchUsedSegments(
}

@Override
public ListenableFuture<DataSegment> fetchUsedSegment(String dataSource, String segmentId)
public ListenableFuture<DataSegment> fetchSegment(String dataSource, String segmentId, boolean includeUnused)
{
ImmutableDruidDataSource druidDataSource;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,17 +230,11 @@ public void updateSegmentMetadata(Set<DataSegment> segments)
}

@Override
public DataSegment retrieveUsedSegmentForId(final String id)
public DataSegment retrieveSegmentForId(final String id, boolean includeUnused)
{
return null;
}

@Override
public DataSegment retrieveSegmentForId(final String id)
{
throw new UnsupportedOperationException();
}

public Set<DataSegment> getPublished()
{
return ImmutableSet.copyOf(published);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,10 @@ public interface CoordinatorClient
ListenableFuture<Boolean> isHandoffComplete(String dataSource, SegmentDescriptor descriptor);

/**
* Fetches segment metadata for the given dataSource and segmentId.
* Fetches segment metadata for the given dataSource and segmentId. If includeUnused is set to false, the segment is
* not returned if it is marked as unused.
*/
ListenableFuture<DataSegment> fetchUsedSegment(String dataSource, String segmentId);

/**
* Fetches segment metadata for the given dataSource and segmentId.
*/
ListenableFuture<DataSegment> fetchSegment(String dataSource, String segmentId);
ListenableFuture<DataSegment> fetchSegment(String dataSource, String segmentId, boolean includeUnused);

/**
* Fetches segment metadata for the given dataSource and intervals.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,30 +71,13 @@ public ListenableFuture<Boolean> isHandoffComplete(String dataSource, SegmentDes
}

@Override
public ListenableFuture<DataSegment> fetchUsedSegment(String dataSource, String segmentId)
public ListenableFuture<DataSegment> fetchSegment(String dataSource, String segmentId, boolean includeUnused)
{
final String path = StringUtils.format(
"/druid/coordinator/v1/metadata/datasources/%s/segments/%s",
"/druid/coordinator/v1/metadata/datasources/%s/segments/%s%s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks better to read. We can adjust %s to true/false based on the flag passed. Wdyt?

Suggested change
"/druid/coordinator/v1/metadata/datasources/%s/segments/%s%s",
"/druid/coordinator/v1/metadata/datasources/%s/segments/%s?includeUnused=%s",

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added it this way for consistency with the other APIs. The current pattern is to add a flag without value and check if it has been passed or not. ("includeOvershadowedStatus", "full", etc). Do we want to start moving away from this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like

"/druid/coordinator/v1/datasources/%s/handoffComplete?interval=%s&partitionNumber=%d&version=%s",
is more readable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with that. Changing to this pattern.

StringUtils.urlEncode(dataSource),
StringUtils.urlEncode(segmentId)
);

return FutureUtils.transform(
client.asyncRequest(
new RequestBuilder(HttpMethod.GET, path),
new BytesFullResponseHandler()
),
holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), DataSegment.class)
);
}

@Override
public ListenableFuture<DataSegment> fetchSegment(String dataSource, String segmentId)
{
final String path = StringUtils.format(
"/druid/coordinator/v1/metadata/datasources/%s/segments/%s?includeUnused",
StringUtils.urlEncode(dataSource),
StringUtils.urlEncode(segmentId)
StringUtils.urlEncode(segmentId),
includeUnused ? "?includeUnused" : ""
);

return FutureUtils.transform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,26 +353,17 @@ SegmentPublishResult commitMetadataOnly(

void deleteSegments(Set<DataSegment> segments);

/**
* Retrieve the used segment for a given id from the metadata store. Return null if no such used segment exists
*
* @param id The segment id
*
* @return DataSegment used segment corresponding to given id
*/
DataSegment retrieveUsedSegmentForId(String id);

/**
* Retrieve the segment for a given id from the metadata store. Return null if no such segment exists
* <br>
* This also returns unused segments. If only used segments are needed, use {@link #retrieveUsedSegmentForId(String)}
* instead. Unused segments could be deleted by a kill task at any time. This exists mainly to provice a consistent
* view of the metadata, for example, in calls from MSQ controller and worker.
* If includeUnused is set, this also returns unused segments. Unused segments could be deleted by a kill task at any
* time and might lead to unexpected behaviour. This option exists mainly to provide a consistent view of the metadata,
* for example, in calls from MSQ controller and worker and would generally not be requrired.
*
* @param id The segment id
*
* @return DataSegment segment corresponding to given id
* @return DataSegment used segment corresponding to given id
*/
DataSegment retrieveSegmentForId(String id);
DataSegment retrieveSegmentForId(String id, boolean includeUnused);

}
Original file line number Diff line number Diff line change
Expand Up @@ -1884,24 +1884,18 @@ public int removeDataSourceMetadataOlderThan(long timestamp, @NotNull Set<String
}

@Override
public DataSegment retrieveUsedSegmentForId(final String id)
public DataSegment retrieveSegmentForId(final String id, boolean includeUnused)
{
return connector.retryTransaction(
(handle, status) ->
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
.retrieveUsedSegmentForId(id),
3,
SQLMetadataConnector.DEFAULT_MAX_TRIES
);
}

@Override
public DataSegment retrieveSegmentForId(final String id)
{
return connector.retryTransaction(
(handle, status) ->
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
.retrieveSegmentForId(id),
(handle, status) -> {
if (includeUnused) {
return SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
.retrieveSegmentForId(id);
} else {
return SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
.retrieveUsedSegmentForId(id);
}
},
3,
SQLMetadataConnector.DEFAULT_MAX_TRIES
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,7 @@ public Response getSegment(
}
}
// fallback to db
DataSegment segment;
if (includeUnused != null) {
segment = metadataStorageCoordinator.retrieveSegmentForId(segmentId);
} else {
segment = metadataStorageCoordinator.retrieveUsedSegmentForId(segmentId);
}
DataSegment segment = metadataStorageCoordinator.retrieveSegmentForId(segmentId, includeUnused != null);
if (segment != null) {
return Response.status(Response.Status.OK).entity(segment).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void test_fetchUsedSegment() throws Exception

Assert.assertEquals(
segment,
coordinatorClient.fetchUsedSegment("xyz", "def").get()
coordinatorClient.fetchSegment("xyz", "def", false).get()
);
}

Expand All @@ -138,7 +138,7 @@ public void test_fetchSegment() throws Exception

Assert.assertEquals(
segment,
coordinatorClient.fetchSegment("xyz", "def").get()
coordinatorClient.fetchSegment("xyz", "def", true).get()
);
}

Expand Down
Loading