Skip to content

Commit

Permalink
Fix bug in auto compaction preserveExistingMetrics feature (#12438)
Browse files Browse the repository at this point in the history
* fix bug

* fix test

* fix IT
  • Loading branch information
maytasm authored Apr 15, 2022
1 parent 0460d45 commit c25a556
Show file tree
Hide file tree
Showing 9 changed files with 323 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory;
Expand Down Expand Up @@ -121,6 +122,101 @@ public void setup() throws Exception
fullDatasourceName = "wikipedia_index_test_" + UUID.randomUUID() + config.getExtraDatasourceNameSuffix();
}

@Test
public void testAutoCompactionRowWithMetricAndRowWithoutMetricShouldPreserveExistingMetricsUsingAggregatorWithDifferentReturnType() throws Exception
{
// added = null, count = 2, sum_added = 62, quantilesDoublesSketch = 2, thetaSketch = 2, HLLSketchBuild = 2
loadData(INDEX_TASK_WITH_ROLLUP_FOR_PRESERVE_METRICS);
// added = 31, count = null, sum_added = null, quantilesDoublesSketch = null, thetaSketch = null, HLLSketchBuild = null
loadData(INDEX_TASK_WITHOUT_ROLLUP_FOR_PRESERVE_METRICS);
try (final Closeable ignored = unloader(fullDatasourceName)) {
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
intervalsBeforeCompaction.sort(null);
// 2 segments across 1 days...
verifySegmentsCount(2);
ArrayList<Object> nullList = new ArrayList<Object>();
nullList.add(null);
Map<String, Object> queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "added",
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(nullList)), ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(31))))
);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "count",
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(2))), ImmutableMap.of("events", ImmutableList.of(nullList)))
);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "sum_added",
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(62))), ImmutableMap.of("events", ImmutableList.of(nullList)))
);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
queryAndResultFields = ImmutableMap.of(
"%%QUANTILESRESULT%%", 2,
"%%THETARESULT%%", 2.0,
"%%HLLRESULT%%", 2
);
verifyQuery(INDEX_ROLLUP_SKETCH_QUERIES_RESOURCE, queryAndResultFields);

submitCompactionConfig(
MAX_ROWS_PER_SEGMENT_COMPACTED,
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(null, null, true),
new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))),
null,
new AggregatorFactory[]{
new CountAggregatorFactory("count"),
// FloatSumAggregator combine method takes in two Float but return Double
new FloatSumAggregatorFactory("sum_added", "added"),
new SketchMergeAggregatorFactory("thetaSketch", "user", 16384, true, false, null),
new HllSketchBuildAggregatorFactory("HLLSketchBuild", "user", 12, TgtHllType.HLL_4.name(), false),
new DoublesSketchAggregatorFactory("quantilesDoublesSketch", "delta", 128, 1000000000L)
},
false
);
// should now only have 1 row after compaction
// added = null, count = 3, sum_added = 93.0
forceTriggerAutoCompaction(1);

queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "added",
"%%EXPECTED_COUNT_RESULT%%", 1,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(nullList)))
);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "count",
"%%EXPECTED_COUNT_RESULT%%", 1,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(3))))
);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "sum_added",
"%%EXPECTED_COUNT_RESULT%%", 1,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(93.0f))))
);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
queryAndResultFields = ImmutableMap.of(
"%%QUANTILESRESULT%%", 3,
"%%THETARESULT%%", 3.0,
"%%HLLRESULT%%", 3
);
verifyQuery(INDEX_ROLLUP_SKETCH_QUERIES_RESOURCE, queryAndResultFields);

verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);

List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
// Verify rollup segments does not get compacted again
forceTriggerAutoCompaction(1);
List<TaskResponseObject> compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName);
Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
}
}

@Test
public void testAutoCompactionRowWithMetricAndRowWithoutMetricShouldPreserveExistingMetrics() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,13 +452,13 @@ protected Aggregator[] getAggsForRow(int rowOffset)
@Override
public float getMetricFloatValue(int rowOffset, int aggOffset)
{
return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getFloat);
return ((Number) getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getFloat)).floatValue();
}

@Override
public long getMetricLongValue(int rowOffset, int aggOffset)
{
return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getLong);
return ((Number) getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getLong)).longValue();
}

@Override
Expand All @@ -470,7 +470,7 @@ public Object getMetricObjectValue(int rowOffset, int aggOffset)
@Override
protected double getMetricDoubleValue(int rowOffset, int aggOffset)
{
return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getDouble);
return ((Number) getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getDouble)).doubleValue();
}

@Override
Expand Down Expand Up @@ -544,7 +544,7 @@ public Iterable<Row> iterableWithPostAggregations(
* If preserveExistingMetrics flag is set, then this method will combine values from two aggregators, the aggregator
* for aggregating from input into output field and the aggregator for combining already aggregated field, as needed
*/
private <T> T getMetricHelper(AggregatorFactory[] metrics, Aggregator[] aggs, int aggOffset, Function<Aggregator, T> getMetricTypeFunction)
private <T> Object getMetricHelper(AggregatorFactory[] metrics, Aggregator[] aggs, int aggOffset, Function<Aggregator, T> getMetricTypeFunction)
{
if (preserveExistingMetrics) {
// Since the preserveExistingMetrics flag is set, we will have to check and possibly retrieve the aggregated values
Expand All @@ -564,7 +564,7 @@ private <T> T getMetricHelper(AggregatorFactory[] metrics, Aggregator[] aggs, in
AggregatorFactory aggregatorFactory = metrics[aggOffset];
T aggregatedFromSource = getMetricTypeFunction.apply(aggs[aggOffset]);
T aggregatedFromCombined = getMetricTypeFunction.apply(aggs[aggOffset + metrics.length]);
return (T) aggregatorFactory.combine(aggregatedFromSource, aggregatedFromCombined);
return aggregatorFactory.combine(aggregatedFromSource, aggregatedFromCombined);
}
} else {
// If preserveExistingMetrics flag is not set then we simply get metrics from the list of Aggregator, aggs, using the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.filter.BoundDimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
Expand Down Expand Up @@ -799,6 +800,74 @@ public void testSchemaRollupWithRowWithExistingMetricsAndWithoutMetric() throws
}
}

@Test
public void testSchemaRollupWithRowWithExistingMetricsAndWithoutMetricUsingAggregatorWithDifferentReturnType() throws IndexSizeExceededException
{
AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{
new CountAggregatorFactory("count"),
// FloatSumAggregator combine method takes in two Float but return Double
new FloatSumAggregatorFactory("sum_of_x", "x")
};
final IncrementalIndex index = indexCreator.createIndex((Object) aggregatorFactories);
index.add(
new MapBasedInputRow(
1481871600000L,
Arrays.asList("name", "host"),
ImmutableMap.of("name", "name1", "host", "host", "x", 2)
)
);
index.add(
new MapBasedInputRow(
1481871600000L,
Arrays.asList("name", "host"),
ImmutableMap.of("name", "name1", "host", "host", "x", 3)
)
);
index.add(
new MapBasedInputRow(
1481871600000L,
Arrays.asList("name", "host"),
ImmutableMap.of("name", "name1", "host", "host", "count", 2, "sum_of_x", 4)
)
);
index.add(
new MapBasedInputRow(
1481871600000L,
Arrays.asList("name", "host"),
ImmutableMap.of("name", "name1", "host", "host", "count", 3, "sum_of_x", 5)
)
);

Assert.assertEquals(index.isRollup() ? 1 : 4, index.size());
Iterator<Row> iterator = index.iterator();
int rowCount = 0;
while (iterator.hasNext()) {
rowCount++;
Row row = iterator.next();
Assert.assertEquals(1481871600000L, row.getTimestampFromEpoch());
if (index.isRollup()) {
// All rows are rollup into one row
Assert.assertEquals(isPreserveExistingMetrics ? 7 : 4, row.getMetric("count").intValue());
Assert.assertEquals(isPreserveExistingMetrics ? 14 : 5, row.getMetric("sum_of_x").intValue());
} else {
// We still have 4 rows
if (rowCount == 1 || rowCount == 2) {
Assert.assertEquals(1, row.getMetric("count").intValue());
Assert.assertEquals(1 + rowCount, row.getMetric("sum_of_x").intValue());
} else {
if (isPreserveExistingMetrics) {
Assert.assertEquals(rowCount - 1, row.getMetric("count").intValue());
Assert.assertEquals(1 + rowCount, row.getMetric("sum_of_x").intValue());
} else {
Assert.assertEquals(1, row.getMetric("count").intValue());
// The rows does not have the dim "x", hence metric is null (useDefaultValueForNull=false) or 0 (useDefaultValueForNull=true)
Assert.assertEquals(NullHandling.sqlCompatible() ? null : 0.0f, row.getMetric("sum_of_x"));
}
}
}
}
}

@Test
public void testSchemaRollupWithRowWithOnlyExistingMetrics() throws IndexSizeExceededException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,14 @@ public class ClientCompactionTaskQueryTuningConfig

public static ClientCompactionTaskQueryTuningConfig from(
@Nullable UserCompactionTaskQueryTuningConfig userCompactionTaskQueryTuningConfig,
@Nullable Integer maxRowsPerSegment
@Nullable Integer maxRowsPerSegment,
@Nullable Boolean preserveExistingMetrics
)
{
if (userCompactionTaskQueryTuningConfig == null) {
return new ClientCompactionTaskQueryTuningConfig(
maxRowsPerSegment,
new OnheapIncrementalIndex.Spec(true),
new OnheapIncrementalIndex.Spec(preserveExistingMetrics),
null,
null,
null,
Expand All @@ -107,7 +108,7 @@ public static ClientCompactionTaskQueryTuningConfig from(
} else {
AppendableIndexSpec appendableIndexSpecToUse = userCompactionTaskQueryTuningConfig.getAppendableIndexSpec() != null
? userCompactionTaskQueryTuningConfig.getAppendableIndexSpec()
: new OnheapIncrementalIndex.Spec(true);
: new OnheapIncrementalIndex.Spec(preserveExistingMetrics);
return new ClientCompactionTaskQueryTuningConfig(
maxRowsPerSegment,
appendableIndexSpecToUse,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ private CoordinatorStats doRun(
"coordinator-issued",
segmentsToCompact,
config.getTaskPriority(),
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()),
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), config.getMetricsSpec() != null),
granularitySpec,
dimensionsSpec,
config.getMetricsSpec(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ private boolean needsCompaction(DataSourceCompactionConfig config, SegmentsToCom
{
Preconditions.checkState(!candidates.isEmpty(), "Empty candidates");
final ClientCompactionTaskQueryTuningConfig tuningConfig =
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment());
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null);
final PartitionsSpec partitionsSpecFromConfig = findPartitionsSpecFromConfig(tuningConfig);
final CompactionState lastCompactionState = candidates.segments.get(0).getLastCompactionState();
if (lastCompactionState == null) {
Expand Down
Loading

0 comments on commit c25a556

Please sign in to comment.