Skip to content

Commit

Permalink
Extend unused segment metadata api response to include created date a…
Browse files Browse the repository at this point in the history
…nd last used updated time (apache#15738)

### Description

The unusedSegment api response was extended to include the original DataSegment object with the creation time and last used update time added to it. A new object `DataSegmentPlus` was created for this purpose, and the metadata queries used were updated as needed. 

example response:

```
[
  {
    "dataSegment": {
      "dataSource": "inline_data",
      "interval": "2023-01-02T00:00:00.000Z/2023-01-03T00:00:00.000Z",
      "version": "2024-01-25T16:06:42.835Z",
      "loadSpec": {
        "type": "local",
        "path": "/Users/zachsherman/projects/opensrc-druid/distribution/target/apache-druid-30.0.0-SNAPSHOT/var/druid/segments/inline_data/2023-01-02T00:00:00.000Z_2023-01-03T00:00:00.000Z/2024-01-25T16:06:42.835Z/0/index/"
      },
      "dimensions": "str_dim,double_measure1,double_measure2",
      "metrics": "",
      "shardSpec": {
        "type": "numbered",
        "partitionNum": 0,
        "partitions": 1
      },
      "binaryVersion": 9,
      "size": 1402,
      "identifier": "inline_data_2023-01-02T00:00:00.000Z_2023-01-03T00:00:00.000Z_2024-01-25T16:06:42.835Z"
    },
    "createdDate": "2024-01-25T16:06:44.196Z",
    "usedStatusLastUpdatedDate": "2024-01-25T16:07:34.909Z"
  }
]
```
  • Loading branch information
zachjsh authored Jan 26, 2024
1 parent a7918be commit ae6afc0
Show file tree
Hide file tree
Showing 9 changed files with 748 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.base.Optional;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.server.http.DataSegmentPlus;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -126,11 +127,11 @@ Optional<Iterable<DataSegment>> iterateAllUsedNonOvershadowedSegmentsForDatasour
);

/**
* Returns an iterable to go over un-used segments for a given datasource over an optional interval.
* The order in which segments are iterated is from earliest start-time, with ties being broken with earliest end-time
* first. Note: the iteration may not be as trivially cheap as,
* for example, iteration over an ArrayList. Try (to some reasonable extent) to organize the code so that it
* iterates the returned iterable only once rather than several times.
* Returns an iterable to go over un-used segments and their associated metadata for a given datasource over an
* optional interval. The order in which segments are iterated is from earliest start-time, with ties being broken
* with earliest end-time first. Note: the iteration may not be as trivially cheap as for example, iteration over an
* ArrayList. Try (to some reasonable extent) to organize the code so that it iterates the returned iterable only
* once rather than several times.
*
* @param datasource the name of the datasource.
* @param interval an optional interval to search over. If none is specified, {@link org.apache.druid.java.util.common.Intervals#ETERNITY}
Expand All @@ -141,7 +142,7 @@ Optional<Iterable<DataSegment>> iterateAllUsedNonOvershadowedSegmentsForDatasour
* @param sortOrder an optional order with which to return the matching segments by id, start time, end time.
* If none is specified, the order of the results is not guarenteed.
*/
Iterable<DataSegment> iterateAllUnusedSegmentsForDatasource(
Iterable<DataSegmentPlus> iterateAllUnusedSegmentsForDatasource(
String datasource,
@Nullable Interval interval,
@Nullable Integer limit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.http.DataSegmentPlus;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentId;
Expand Down Expand Up @@ -957,8 +958,9 @@ public Optional<Iterable<DataSegment>> iterateAllUsedNonOvershadowedSegmentsForD
}

/**
* Retrieves segments for a given datasource that are marked unused and that are *fully contained by* an optionally
* specified interval. If the interval specified is null, this method will retrieve all unused segments.
* Retrieves segments and their associated metadata for a given datasource that are marked unused and that are
* *fully contained by* an optionally specified interval. If the interval specified is null, this method will
* retrieve all unused segments.
*
* This call does not return any information about realtime segments.
*
Expand All @@ -976,7 +978,7 @@ public Optional<Iterable<DataSegment>> iterateAllUsedNonOvershadowedSegmentsForD
* Returns an iterable.
*/
@Override
public Iterable<DataSegment> iterateAllUnusedSegmentsForDatasource(
public Iterable<DataSegmentPlus> iterateAllUnusedSegmentsForDatasource(
final String datasource,
@Nullable final Interval interval,
@Nullable final Integer limit,
Expand All @@ -993,8 +995,8 @@ public Iterable<DataSegment> iterateAllUnusedSegmentsForDatasource(
interval == null
? Intervals.ONLY_ETERNITY
: Collections.singletonList(interval);
try (final CloseableIterator<DataSegment> iterator =
queryTool.retrieveUnusedSegments(datasource, intervals, limit, lastSegmentId, sortOrder, null)) {
try (final CloseableIterator<DataSegmentPlus> iterator =
queryTool.retrieveUnusedSegmentsPlus(datasource, intervals, limit, lastSegmentId, sortOrder, null)) {
return ImmutableList.copyOf(iterator);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.server.http.DataSegmentPlus;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -173,6 +174,49 @@ public CloseableIterator<DataSegment> retrieveUnusedSegments(
);
}

/**
* Similar to {@link #retrieveUnusedSegments}, but also retrieves associated metadata for the segments for a given
* datasource that are marked unused and that are *fully contained by* any interval in a particular collection of
* intervals. If the collection of intervals is empty, this method will retrieve all unused segments.
*
* This call does not return any information about realtime segments.
*
* @param dataSource The name of the datasource
* @param intervals The intervals to search over
* @param limit The limit of segments to return
* @param lastSegmentId the last segment id from which to search for results. All segments returned are >
* this segment lexigraphically if sortOrder is null or ASC, or < this segment
* lexigraphically if sortOrder is DESC.
* @param sortOrder Specifies the order with which to return the matching segments by start time, end time.
* A null value indicates that order does not matter.
* @param maxUsedStatusLastUpdatedTime The maximum {@code used_status_last_updated} time. Any unused segment in {@code intervals}
* with {@code used_status_last_updated} no later than this time will be included in the
* iterator. Segments without {@code used_status_last_updated} time (due to an upgrade
* from legacy Druid) will have {@code maxUsedStatusLastUpdatedTime} ignored
* Returns a closeable iterator. You should close it when you are done.
*/
public CloseableIterator<DataSegmentPlus> retrieveUnusedSegmentsPlus(
final String dataSource,
final Collection<Interval> intervals,
@Nullable final Integer limit,
@Nullable final String lastSegmentId,
@Nullable final SortOrder sortOrder,
@Nullable final DateTime maxUsedStatusLastUpdatedTime
)
{
return retrieveSegmentsPlus(
dataSource,
intervals,
IntervalMode.CONTAINS,
false,
limit,
lastSegmentId,
sortOrder,
maxUsedStatusLastUpdatedTime
);
}

/**
* Marks the provided segments as either used or unused.
*
Expand Down Expand Up @@ -445,6 +489,54 @@ private CloseableIterator<DataSegment> retrieveSegments(
}
}

private CloseableIterator<DataSegmentPlus> retrieveSegmentsPlus(
final String dataSource,
final Collection<Interval> intervals,
final IntervalMode matchMode,
final boolean used,
@Nullable final Integer limit,
@Nullable final String lastSegmentId,
@Nullable final SortOrder sortOrder,
@Nullable final DateTime maxUsedStatusLastUpdatedTime
)
{
if (intervals.isEmpty() || intervals.size() <= MAX_INTERVALS_PER_BATCH) {
return CloseableIterators.withEmptyBaggage(
retrieveSegmentsPlusInIntervalsBatch(dataSource, intervals, matchMode, used, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime)
);
} else {
final List<List<Interval>> intervalsLists = Lists.partition(new ArrayList<>(intervals), MAX_INTERVALS_PER_BATCH);
final List<Iterator<DataSegmentPlus>> resultingIterators = new ArrayList<>();
Integer limitPerBatch = limit;

for (final List<Interval> intervalList : intervalsLists) {
final UnmodifiableIterator<DataSegmentPlus> iterator = retrieveSegmentsPlusInIntervalsBatch(
dataSource,
intervalList,
matchMode,
used,
limitPerBatch,
lastSegmentId,
sortOrder,
maxUsedStatusLastUpdatedTime
);
if (limitPerBatch != null) {
// If limit is provided, we need to shrink the limit for subsequent batches or circuit break if
// we have reached what was requested for.
final List<DataSegmentPlus> dataSegments = ImmutableList.copyOf(iterator);
resultingIterators.add(dataSegments.iterator());
if (dataSegments.size() >= limitPerBatch) {
break;
}
limitPerBatch -= dataSegments.size();
} else {
resultingIterators.add(iterator);
}
}
return CloseableIterators.withEmptyBaggage(Iterators.concat(resultingIterators.iterator()));
}
}

private UnmodifiableIterator<DataSegment> retrieveSegmentsInIntervalsBatch(
final String dataSource,
final Collection<Interval> intervals,
Expand All @@ -455,12 +547,73 @@ private UnmodifiableIterator<DataSegment> retrieveSegmentsInIntervalsBatch(
@Nullable final SortOrder sortOrder,
@Nullable final DateTime maxUsedStatusLastUpdatedTime
)
{
final Query<Map<String, Object>> sql = buildSegmentsTableQuery(
dataSource,
intervals,
matchMode,
used,
limit,
lastSegmentId,
sortOrder,
maxUsedStatusLastUpdatedTime,
false
);

final ResultIterator<DataSegment> resultIterator = getDataSegmentResultIterator(sql);

return filterDataSegmentIteratorByInterval(resultIterator, intervals, matchMode);
}

private UnmodifiableIterator<DataSegmentPlus> retrieveSegmentsPlusInIntervalsBatch(
final String dataSource,
final Collection<Interval> intervals,
final IntervalMode matchMode,
final boolean used,
@Nullable final Integer limit,
@Nullable final String lastSegmentId,
@Nullable final SortOrder sortOrder,
@Nullable final DateTime maxUsedStatusLastUpdatedTime
)
{

final Query<Map<String, Object>> sql = buildSegmentsTableQuery(
dataSource,
intervals,
matchMode,
used,
limit,
lastSegmentId,
sortOrder,
maxUsedStatusLastUpdatedTime,
true
);

final ResultIterator<DataSegmentPlus> resultIterator = getDataSegmentPlusResultIterator(sql);

return filterDataSegmentPlusIteratorByInterval(resultIterator, intervals, matchMode);
}

private Query<Map<String, Object>> buildSegmentsTableQuery(
final String dataSource,
final Collection<Interval> intervals,
final IntervalMode matchMode,
final boolean used,
@Nullable final Integer limit,
@Nullable final String lastSegmentId,
@Nullable final SortOrder sortOrder,
@Nullable final DateTime maxUsedStatusLastUpdatedTime,
final boolean includeExtraInfo
)
{
// Check if the intervals all support comparing as strings. If so, bake them into the SQL.
final boolean compareAsString = intervals.stream().allMatch(Intervals::canCompareEndpointsAsStrings);

final StringBuilder sb = new StringBuilder();
sb.append("SELECT payload FROM %s WHERE used = :used AND dataSource = :dataSource");
if (includeExtraInfo) {
sb.append("SELECT payload, created_date, used_status_last_updated FROM %s WHERE used = :used AND dataSource = :dataSource");
} else {
sb.append("SELECT payload FROM %s WHERE used = :used AND dataSource = :dataSource");
}

if (compareAsString) {
appendConditionForIntervalsAndMatchMode(sb, intervals, matchMode, connector);
Expand Down Expand Up @@ -513,10 +666,31 @@ private UnmodifiableIterator<DataSegment> retrieveSegmentsInIntervalsBatch(
bindQueryIntervals(sql, intervals);
}

final ResultIterator<DataSegment> resultIterator =
sql.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class))
.iterator();
return sql;
}

private ResultIterator<DataSegment> getDataSegmentResultIterator(Query<Map<String, Object>> sql)
{
return sql.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class))
.iterator();
}

private ResultIterator<DataSegmentPlus> getDataSegmentPlusResultIterator(Query<Map<String, Object>> sql)
{
return sql.map((index, r, ctx) -> new DataSegmentPlus(
JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class),
DateTimes.of(r.getString(2)),
DateTimes.of(r.getString(3))
))
.iterator();
}

private UnmodifiableIterator<DataSegment> filterDataSegmentIteratorByInterval(
ResultIterator<DataSegment> resultIterator,
final Collection<Interval> intervals,
final IntervalMode matchMode
)
{
return Iterators.filter(
resultIterator,
dataSegment -> {
Expand All @@ -538,6 +712,33 @@ private UnmodifiableIterator<DataSegment> retrieveSegmentsInIntervalsBatch(
);
}

private UnmodifiableIterator<DataSegmentPlus> filterDataSegmentPlusIteratorByInterval(
ResultIterator<DataSegmentPlus> resultIterator,
final Collection<Interval> intervals,
final IntervalMode matchMode
)
{
return Iterators.filter(
resultIterator,
dataSegment -> {
if (intervals.isEmpty()) {
return true;
} else {
// Must re-check that the interval matches, even if comparing as string, because the *segment interval*
// might not be string-comparable. (Consider a query interval like "2000-01-01/3000-01-01" and a
// segment interval like "20010/20011".)
for (Interval interval : intervals) {
if (matchMode.apply(interval, dataSegment.getDataSegment().getInterval())) {
return true;
}
}

return false;
}
}
);
}

private static int computeNumChangedSegments(List<String> segmentIds, int[] segmentChanges)
{
int numChangedSegments = 0;
Expand Down
Loading

1 comment on commit ae6afc0

@vercel
Copy link

@vercel vercel bot commented on ae6afc0 Jan 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

druid – ./

druid-git-master-317brian.vercel.app
druid-phi.vercel.app
druid-317brian.vercel.app

Please sign in to comment.