Skip to content

Commit

Permalink
Make tombstones ingestible by having them return an empty result set. (
Browse files Browse the repository at this point in the history
…#12392)

* Make tombstones ingestible by having them return an empty result set.

* Spotbug

* Coverage

* Coverage

* Remove unnecessary exception (checkstyle)

* Fix integration test and add one more to test dropExisting set to false over tombstones

* Force dropExisting to true in auto-compaction when the interval contains only tombstones

* Checkstyle, fix unit test

* Changed flag by mistake, fixing it

* Remove method from interface since this method is specific to only DruidSegmentInputentity

* Fix typo

* Adapt to latest code

* Update comments when only tombstones to compact

* Move empty iterator to a new DruidTombstoneSegmentReader

* Code review feedback

* Checkstyle

* Review feedback

* Coverage
  • Loading branch information
Agustin Gonzalez authored Apr 15, 2022
1 parent a22d413 commit 0460d45
Show file tree
Hide file tree
Showing 14 changed files with 844 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static <T extends Enum<T>> T getEnumIfPresent(final Class<T> enumClass, f

/**
* If first argument is not null, return it, else return the other argument. Sort of like
* {@link com.google.common.base.Objects#firstNonNull(Object, Object)} except will not explode if both arguments are
* {@link com.google.common.base.Objects#firstNonNull(T, T)} except will not explode if both arguments are
* null.
*/
@Nullable
Expand All @@ -85,7 +85,8 @@ public static <T> T firstNonNull(@Nullable T arg1, @Nullable T arg2)

/**
* Cancel futures manually, because sometime we can't cancel all futures in {@link com.google.common.util.concurrent.Futures.CombinedFuture}
* automatically. Especially when we call {@link com.google.common.util.concurrent.Futures#allAsList(Iterable)} to create a batch of
* automatically. Especially when we call
* {@link static <V> ListenableFuture<List<V>> com.google.common.util.concurrent.Futures#allAsList(Iterable<? extends ListenableFuture <? extends V>> futures)} to create a batch of
* future.
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,5 @@ default Predicate<Throwable> getRetryCondition()
{
return Predicates.alwaysFalse();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
Expand Down Expand Up @@ -111,6 +112,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -550,16 +552,18 @@ static List<ParallelIndexIngestionSpec> createIngestionSchema(
segmentProvider,
lockGranularityInUse
);

final Map<DataSegment, File> segmentFileMap = pair.lhs;
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = pair.rhs;

if (timelineSegments.size() == 0) {
return Collections.emptyList();
}

// find metadata for interval
// find metadata for intervals with real data segments
// queryableIndexAndSegments is sorted by the interval of the dataSegment
final List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments = loadSegments(
// Note that this list will contain null QueriableIndex values for tombstones
final List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments = loadSegments(
timelineSegments,
segmentFileMap,
toolbox.getIndexIO()
Expand All @@ -568,8 +572,10 @@ static List<ParallelIndexIngestionSpec> createIngestionSchema(
final CompactionTuningConfig compactionTuningConfig = partitionConfigurationManager.computeTuningConfig();

if (granularitySpec == null || granularitySpec.getSegmentGranularity() == null) {
final List<ParallelIndexIngestionSpec> specs = new ArrayList<>();

// original granularity
final Map<Interval, List<NonnullPair<QueryableIndex, DataSegment>>> intervalToSegments = new TreeMap<>(
final Map<Interval, List<Pair<QueryableIndex, DataSegment>>> intervalToSegments = new TreeMap<>(
Comparators.intervalsByStartThenEnd()
);
queryableIndexAndSegments.forEach(
Expand All @@ -578,11 +584,11 @@ static List<ParallelIndexIngestionSpec> createIngestionSchema(
);

// unify overlapping intervals to ensure overlapping segments compacting in the same indexSpec
List<NonnullPair<Interval, List<NonnullPair<QueryableIndex, DataSegment>>>> intervalToSegmentsUnified =
List<NonnullPair<Interval, List<Pair<QueryableIndex, DataSegment>>>> intervalToSegmentsUnified =
new ArrayList<>();
Interval union = null;
List<NonnullPair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
for (Entry<Interval, List<NonnullPair<QueryableIndex, DataSegment>>> entry : intervalToSegments.entrySet()) {
List<Pair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
for (Entry<Interval, List<Pair<QueryableIndex, DataSegment>>> entry : intervalToSegments.entrySet()) {
Interval cur = entry.getKey();
if (union == null) {
union = cur;
Expand All @@ -596,12 +602,12 @@ static List<ParallelIndexIngestionSpec> createIngestionSchema(
segments = new ArrayList<>(entry.getValue());
}
}

intervalToSegmentsUnified.add(new NonnullPair<>(union, segments));

final List<ParallelIndexIngestionSpec> specs = new ArrayList<>(intervalToSegmentsUnified.size());
for (NonnullPair<Interval, List<NonnullPair<QueryableIndex, DataSegment>>> entry : intervalToSegmentsUnified) {
for (NonnullPair<Interval, List<Pair<QueryableIndex, DataSegment>>> entry : intervalToSegmentsUnified) {
final Interval interval = entry.lhs;
final List<NonnullPair<QueryableIndex, DataSegment>> segmentsToCompact = entry.rhs;
final List<Pair<QueryableIndex, DataSegment>> segmentsToCompact = entry.rhs;
// If granularitySpec is not null, then set segmentGranularity. Otherwise,
// creates new granularitySpec and set segmentGranularity
Granularity segmentGranularityToUse = GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity();
Expand Down Expand Up @@ -700,22 +706,19 @@ private static NonnullPair<Map<DataSegment, File>, List<TimelineObjectHolder<Str
LockGranularity lockGranularityInUse
) throws IOException, SegmentLoadingException
{
final List<DataSegment> usedSegmentsMinusTombstones =
segmentProvider.findSegments(toolbox.getTaskActionClient())
.stream()
.filter(dataSegment -> !dataSegment.isTombstone()) // skip tombstones
.collect(Collectors.toList());
segmentProvider.checkSegments(lockGranularityInUse, usedSegmentsMinusTombstones);
final Map<DataSegment, File> segmentFileMap = toolbox.fetchSegments(usedSegmentsMinusTombstones);
final List<DataSegment> usedSegments =
segmentProvider.findSegments(toolbox.getTaskActionClient());
segmentProvider.checkSegments(lockGranularityInUse, usedSegments);
final Map<DataSegment, File> segmentFileMap = toolbox.fetchSegments(usedSegments);
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = VersionedIntervalTimeline
.forSegments(usedSegmentsMinusTombstones)
.forSegments(usedSegments)
.lookup(segmentProvider.interval);
return new NonnullPair<>(segmentFileMap, timelineSegments);
}

private static DataSchema createDataSchema(
String dataSource,
List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments,
List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments,
@Nullable DimensionsSpec dimensionsSpec,
@Nullable ClientCompactionTaskTransformSpec transformSpec,
@Nullable AggregatorFactory[] metricsSpec,
Expand Down Expand Up @@ -781,34 +784,36 @@ private static DataSchema createDataSchema(
private static void decideRollupAndQueryGranularityCarryOver(
SettableSupplier<Boolean> rollup,
SettableSupplier<Granularity> queryGranularity,
List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments
List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments
)
{
final SettableSupplier<Boolean> rollupIsValid = new SettableSupplier<>(true);
for (NonnullPair<QueryableIndex, DataSegment> pair : queryableIndexAndSegments) {
for (Pair<QueryableIndex, DataSegment> pair : queryableIndexAndSegments) {
final QueryableIndex index = pair.lhs;
if (index.getMetadata() == null) {
throw new RE("Index metadata doesn't exist for segment[%s]", pair.rhs.getId());
}
// carry-overs (i.e. query granularity & rollup) are valid iff they are the same in every segment:

// Pick rollup value if all segments being compacted have the same, non-null, value otherwise set it to false
if (rollupIsValid.get()) {
Boolean isRollup = index.getMetadata().isRollup();
if (isRollup == null) {
rollupIsValid.set(false);
rollup.set(false);
} else if (rollup.get() == null) {
rollup.set(isRollup);
} else if (!rollup.get().equals(isRollup.booleanValue())) {
rollupIsValid.set(false);
rollup.set(false);
if (index != null) { // avoid tombstones
if (index.getMetadata() == null) {
throw new RE("Index metadata doesn't exist for segment[%s]", pair.rhs.getId());
}
// carry-overs (i.e. query granularity & rollup) are valid iff they are the same in every segment:

// Pick rollup value if all segments being compacted have the same, non-null, value otherwise set it to false
if (rollupIsValid.get()) {
Boolean isRollup = index.getMetadata().isRollup();
if (isRollup == null) {
rollupIsValid.set(false);
rollup.set(false);
} else if (rollup.get() == null) {
rollup.set(isRollup);
} else if (!rollup.get().equals(isRollup.booleanValue())) {
rollupIsValid.set(false);
rollup.set(false);
}
}
}

// Pick the finer, non-null, of the query granularities of the segments being compacted
Granularity current = index.getMetadata().getQueryGranularity();
queryGranularity.set(compareWithCurrent(queryGranularity.get(), current));
// Pick the finer, non-null, of the query granularities of the segments being compacted
Granularity current = index.getMetadata().getQueryGranularity();
queryGranularity.set(compareWithCurrent(queryGranularity.get(), current));
}
}
}

Expand All @@ -828,22 +833,28 @@ static Granularity compareWithCurrent(Granularity queryGranularity, Granularity
}

private static AggregatorFactory[] createMetricsSpec(
List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments
List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments
)
{
final List<AggregatorFactory[]> aggregatorFactories = queryableIndexAndSegments
.stream()
.filter(pair -> pair.lhs != null) // avoid tombstones
.map(pair -> pair.lhs.getMetadata().getAggregators()) // We have already done null check on index.getMetadata()
.collect(Collectors.toList());
final AggregatorFactory[] mergedAggregators = AggregatorFactory.mergeAggregators(aggregatorFactories);

if (mergedAggregators == null) {
throw new ISE("Failed to merge aggregators[%s]", aggregatorFactories);
Optional<Pair<QueryableIndex, DataSegment>> pair =
queryableIndexAndSegments.stream().filter(p -> !p.rhs.isTombstone()).findFirst();
if (pair.isPresent()) {
// this means that there are true data segments, so something went wrong
throw new ISE("Failed to merge aggregators[%s]", aggregatorFactories);
}
}
return mergedAggregators;
}

private static DimensionsSpec createDimensionsSpec(List<NonnullPair<QueryableIndex, DataSegment>> queryableIndices)
private static DimensionsSpec createDimensionsSpec(List<Pair<QueryableIndex, DataSegment>> queryableIndices)
{
final BiMap<String, Integer> uniqueDims = HashBiMap.create();
final Map<String, DimensionSchema> dimensionSchemaMap = new HashMap<>();
Expand All @@ -859,33 +870,35 @@ private static DimensionsSpec createDimensionsSpec(List<NonnullPair<QueryableInd
);

int index = 0;
for (NonnullPair<QueryableIndex, DataSegment> pair : Lists.reverse(queryableIndices)) {
for (Pair<QueryableIndex, DataSegment> pair : Lists.reverse(queryableIndices)) {
final QueryableIndex queryableIndex = pair.lhs;
final Map<String, DimensionHandler> dimensionHandlerMap = queryableIndex.getDimensionHandlers();
if (queryableIndex != null) { // avoid tombstones
final Map<String, DimensionHandler> dimensionHandlerMap = queryableIndex.getDimensionHandlers();

for (String dimension : queryableIndex.getAvailableDimensions()) {
final ColumnHolder columnHolder = Preconditions.checkNotNull(
queryableIndex.getColumnHolder(dimension),
"Cannot find column for dimension[%s]",
dimension
);

if (!uniqueDims.containsKey(dimension)) {
final DimensionHandler dimensionHandler = Preconditions.checkNotNull(
dimensionHandlerMap.get(dimension),
"Cannot find dimensionHandler for dimension[%s]",
for (String dimension : queryableIndex.getAvailableDimensions()) {
final ColumnHolder columnHolder = Preconditions.checkNotNull(
queryableIndex.getColumnHolder(dimension),
"Cannot find column for dimension[%s]",
dimension
);

uniqueDims.put(dimension, index++);
dimensionSchemaMap.put(
dimension,
createDimensionSchema(
dimension,
columnHolder.getCapabilities(),
dimensionHandler.getMultivalueHandling()
)
);
if (!uniqueDims.containsKey(dimension)) {
final DimensionHandler dimensionHandler = Preconditions.checkNotNull(
dimensionHandlerMap.get(dimension),
"Cannot find dimensionHandler for dimension[%s]",
dimension
);

uniqueDims.put(dimension, index++);
dimensionSchemaMap.put(
dimension,
createDimensionSchema(
dimension,
columnHolder.getCapabilities(),
dimensionHandler.getMultivalueHandling()
)
);
}
}
}
}
Expand All @@ -905,25 +918,33 @@ private static DimensionsSpec createDimensionsSpec(List<NonnullPair<QueryableInd
return new DimensionsSpec(dimensionSchemas);
}

private static List<NonnullPair<QueryableIndex, DataSegment>> loadSegments(
/**
* This private method does not load, does not create QueryableIndices, for tombstones since tombstones
* do not have a file image, they are never pushed to deep storage. Thus, for the case of a tombstone,
* The return list
* will contain a null for the QueryableIndex slot in the pair (lhs)
*/
private static List<Pair<QueryableIndex, DataSegment>> loadSegments(
List<TimelineObjectHolder<String, DataSegment>> timelineObjectHolders,
Map<DataSegment, File> segmentFileMap,
IndexIO indexIO
) throws IOException
{
final List<NonnullPair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
final List<Pair<QueryableIndex, DataSegment>> segments = new ArrayList<>();

for (TimelineObjectHolder<String, DataSegment> timelineObjectHolder : timelineObjectHolders) {
final PartitionHolder<DataSegment> partitionHolder = timelineObjectHolder.getObject();
for (PartitionChunk<DataSegment> chunk : partitionHolder) {
QueryableIndex queryableIndex = null;
final DataSegment segment = chunk.getObject();
final QueryableIndex queryableIndex = indexIO.loadIndex(
Preconditions.checkNotNull(segmentFileMap.get(segment), "File for segment %s", segment.getId())
);
segments.add(new NonnullPair<>(queryableIndex, segment));
if (!chunk.getObject().isTombstone()) {
queryableIndex = indexIO.loadIndex(
Preconditions.checkNotNull(segmentFileMap.get(segment), "File for segment %s", segment.getId())
);
}
segments.add(new Pair<>(queryableIndex, segment));
}
}

return segments;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,6 @@ protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nu
//noinspection ConstantConditions
return FluentIterable
.from(partitionHolder)
.filter(chunk -> !chunk.getObject().isTombstone())
.transform(chunk -> new DruidSegmentInputEntity(segmentCacheManager, chunk.getObject(), holder.getInterval()));
}).iterator();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,10 @@ public void close()
}
};
}

public boolean isFromTombstone()
{
return segment.isTombstone();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.indexing.input;

import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
Expand Down Expand Up @@ -55,14 +56,28 @@ public InputEntityReader createReader(
File temporaryDirectory
)
{
return new DruidSegmentReader(
source,
indexIO,
inputRowSchema.getTimestampSpec(),
inputRowSchema.getDimensionsSpec(),
inputRowSchema.getColumnsFilter(),
dimFilter,
temporaryDirectory
// this method handles the case when the entity comes from a tombstone or from a regular segment
Preconditions.checkArgument(
source instanceof DruidSegmentInputEntity,
DruidSegmentInputEntity.class.getName() + " required, but "
+ source.getClass().getName() + " provided."
);

final InputEntityReader retVal;
// Cast is safe here because of the precondition above passed
if (((DruidSegmentInputEntity) source).isFromTombstone()) {
retVal = new DruidTombstoneSegmentReader(source);
} else {
retVal = new DruidSegmentReader(
source,
indexIO,
inputRowSchema.getTimestampSpec(),
inputRowSchema.getDimensionsSpec(),
inputRowSchema.getColumnsFilter(),
dimFilter,
temporaryDirectory
);
}
return retVal;
}
}
Loading

0 comments on commit 0460d45

Please sign in to comment.