Skip to content

Commit

Permalink
Improve memory estimates in Aggregator and DimensionIndexer (#12073)
Browse files Browse the repository at this point in the history
Fixes #12022  

### Description
The current implementations of memory estimation in `OnHeapIncrementalIndex` and `StringDimensionIndexer` tend to over-estimate which leads to more persistence cycles than necessary.

This PR replaces the max estimation mechanism with getting the incremental memory used by the aggregator or indexer at each invocation of `aggregate` or `encode` respectively.

### Changes
- Add new flag `useMaxMemoryEstimates` in the task context. This overrides the same flag in DefaultTaskConfig i.e. `druid.indexer.task.default.context` map
- Add method `AggregatorFactory.factorizeWithSize()` that returns an `AggregatorAndSize` which contains
  the aggregator instance and the estimated initial size of the aggregator
- Add method `Aggregator.aggregateWithSize()` which returns the incremental memory used by this aggregation step
- Update the method `DimensionIndexer.processRowValsToKeyComponent()` to return the encoded key component as well as its effective size in bytes
- Update `OnHeapIncrementalIndex` to use the new estimations only if `useMaxMemoryEstimates = false`
  • Loading branch information
kfaraz authored Feb 3, 2022
1 parent bc408ba commit e648b01
Show file tree
Hide file tree
Showing 60 changed files with 1,093 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class StringDimensionIndexerBenchmark
@Setup
public void setup()
{
indexer = new StringDimensionIndexer(DimensionSchema.MultiValueHandling.ofDefault(), true, false);
indexer = new StringDimensionIndexer(DimensionSchema.MultiValueHandling.ofDefault(), true, false, true);

for (int i = 0; i < cardinality; i++) {
indexer.processRowValsToUnsortedEncodedKeyComponent("abcd-" + i, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,51 @@

import org.apache.datasketches.Family;
import org.apache.datasketches.theta.SetOperation;
import org.apache.datasketches.theta.Sketch;
import org.apache.datasketches.theta.Union;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.BaseObjectColumnValueSelector;

import javax.annotation.Nullable;
import java.lang.reflect.Field;
import java.util.List;

public class SketchAggregator implements Aggregator
{

private final BaseObjectColumnValueSelector selector;
private final int size;

@Nullable
private Union union;

@Nullable
private Sketch sketch;

@Nullable
private static Field SKETCH_FIELD;

/**
* Initializes static fields of the SketchAggregator needed for memory
* estimation.
*/
public static synchronized void initialize()
{
if (SKETCH_FIELD != null) {
return;
}
try {
SKETCH_FIELD = Class.forName("org.apache.datasketches.theta.UnionImpl")
.getDeclaredField("gadget_");
SKETCH_FIELD.setAccessible(true);
}
catch (NoSuchFieldException | ClassNotFoundException e) {
throw new ISE(e, "Could not initialize SketchAggregator");
}
}

public SketchAggregator(BaseObjectColumnValueSelector selector, int size)
{
this.selector = selector;
Expand All @@ -49,6 +77,16 @@ private void initUnion()
union = (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION);
}

private void initSketch()
{
try {
sketch = (Sketch) SKETCH_FIELD.get(union);
}
catch (IllegalAccessException e) {
throw new ISE(e, "Could not initialize sketch field in SketchAggregator");
}
}

@Override
public void aggregate()
{
Expand All @@ -64,6 +102,37 @@ public void aggregate()
}
}

@Override
public long aggregateWithSize()
{
Object update = selector.getObject();
if (update == null) {
return 0;
}
synchronized (this) {
long unionSizeDelta = 0;
if (union == null) {
initUnion();

// Size of UnionImpl = 16B (object header) + 8B (sketch ref) + 2B (short)
// + 8B (long) + 1B (boolean) + 5B (padding) = 40B
unionSizeDelta = 40L;
}

long initialSketchSize = 0;
if (sketch == null) {
initSketch();
} else {
initialSketchSize = sketch.getCurrentBytes();
}

updateUnion(union, update);

long sketchSizeDelta = sketch.getCurrentBytes() - initialSketchSize;
return sketchSizeDelta + unionSizeDelta;
}
}

@Override
public Object get()
{
Expand Down Expand Up @@ -133,4 +202,16 @@ static void updateUnion(Union union, Object update)
throw new ISE("Illegal type received while theta sketch merging [%s]", update.getClass());
}
}

/**
* Gets the initial size of this aggregator in bytes.
*/
public long getInitialSizeBytes()
{
// Size = 16B (object header) + 24B (3 refs) + 4B (int size) = 44B
// Due to 8-byte alignment, size = 48B
// (see https://www.baeldung.com/java-memory-layout)
return 48L;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorAndSize;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
Expand Down Expand Up @@ -80,6 +81,14 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory)
return new SketchAggregator(selector, size);
}

@Override
public AggregatorAndSize factorizeWithSize(ColumnSelectorFactory metricFactory)
{
BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName);
final SketchAggregator aggregator = new SketchAggregator(selector, size);
return new AggregatorAndSize(aggregator, aggregator.getInitialSizeBytes());
}

@SuppressWarnings("unchecked")
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public void configure(Binder binder)
ThetaSketchApproxCountDistinctSqlAggregator.NAME,
ThetaSketchApproxCountDistinctSqlAggregator.class
);
SketchAggregator.initialize();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,48 @@ public void testUpdateUnionWithDouble()
Assert.assertEquals(1, ((SketchHolder) agg.get()).getSketch().getEstimate(), 0);
}

@Test
public void testAggregateWithSize()
{
SketchAggregator.initialize();

final String[] columnValues = new String[20];
for (int i = 0; i < columnValues.length; ++i) {
columnValues[i] = "" + i;
}

final TestObjectColumnSelector<String> selector = new TestObjectColumnSelector<>(columnValues);
final SketchAggregator agg = new SketchAggregator(selector, 128);

// Verify initial size of sketch
Assert.assertEquals(48L, agg.getInitialSizeBytes());
Assert.assertEquals(328L, agg.aggregateWithSize());

// Verify that subsequent size deltas are zero
for (int i = 1; i < 16; ++i) {
selector.increment();
long sizeDelta = agg.aggregateWithSize();
Assert.assertEquals(0, sizeDelta);
}

// Verify that size delta is positive when sketch resizes
selector.increment();
long deltaAtResize = agg.aggregateWithSize();
Assert.assertEquals(1792, deltaAtResize);

for (int i = 17; i < columnValues.length; ++i) {
selector.increment();
long sizeDelta = agg.aggregateWithSize();
Assert.assertEquals(0, sizeDelta);
}

// Verify unique count estimate
SketchHolder sketchHolder = (SketchHolder) agg.get();
Assert.assertEquals(columnValues.length, sketchHolder.getEstimate(), 0);
Assert.assertNotNull(sketchHolder.getSketch());
Assert.assertEquals(columnValues.length, sketchHolder.getSketch().getEstimate(), 0);
}

private void assertPostAggregatorSerde(PostAggregator agg) throws Exception
{
Assert.assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.Druids;
import org.apache.druid.query.aggregation.AggregatorAndSize;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.theta.oldapi.OldSketchBuildAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.theta.oldapi.OldSketchMergeAggregatorFactory;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -61,6 +65,21 @@ public void testMaxIntermediateSize()
Assert.assertEquals(524320, AGGREGATOR_32768.getMaxIntermediateSize());
}

@Test
public void testFactorizeSized()
{
ColumnSelectorFactory colSelectorFactory = EasyMock.mock(ColumnSelectorFactory.class);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector(EasyMock.anyString()))
.andReturn(EasyMock.createMock(ColumnValueSelector.class)).anyTimes();
EasyMock.replay(colSelectorFactory);

AggregatorAndSize aggregatorAndSize = AGGREGATOR_16384.factorizeWithSize(colSelectorFactory);
Assert.assertEquals(48, aggregatorAndSize.getInitialSizeBytes());

aggregatorAndSize = AGGREGATOR_32768.factorizeWithSize(colSelectorFactory);
Assert.assertEquals(48, aggregatorAndSize.getInitialSizeBytes());
}

@Test
public void testResultArraySignature()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,15 @@ public Map<String, Object> getContext()
{
return context;
}

/**
* Whether maximum memory usage should be considered in estimation for indexing tasks.
*/
protected boolean isUseMaxMemoryEstimates()
{
return getContextValue(
Tasks.USE_MAX_MEMORY_ESTIMATES,
Tasks.DEFAULT_USE_MAX_MEMORY_ESTIMATES
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,8 @@ private Appenderator newAppenderator(
toolbox.getCacheConfig(),
toolbox.getCachePopulatorStats(),
rowIngestionMeters,
parseExceptionHandler
parseExceptionHandler,
isUseMaxMemoryEstimates()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public static Appenderator newAppenderator(
DataSchema dataSchema,
AppenderatorConfig appenderatorConfig,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler
ParseExceptionHandler parseExceptionHandler,
boolean useMaxMemoryEstimates
)
{
return newAppenderator(
Expand All @@ -56,7 +57,8 @@ public static Appenderator newAppenderator(
appenderatorConfig,
toolbox.getSegmentPusher(),
rowIngestionMeters,
parseExceptionHandler
parseExceptionHandler,
useMaxMemoryEstimates
);
}

Expand All @@ -69,7 +71,8 @@ public static Appenderator newAppenderator(
AppenderatorConfig appenderatorConfig,
DataSegmentPusher segmentPusher,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler
ParseExceptionHandler parseExceptionHandler,
boolean useMaxMemoryEstimates
)
{
if (toolbox.getConfig().getBatchProcessingMode() == TaskConfig.BatchProcessingMode.OPEN_SEGMENTS) {
Expand All @@ -83,7 +86,8 @@ public static Appenderator newAppenderator(
toolbox.getIndexIO(),
toolbox.getIndexMergerV9(),
rowIngestionMeters,
parseExceptionHandler
parseExceptionHandler,
useMaxMemoryEstimates
);
} else if (toolbox.getConfig().getBatchProcessingMode() == TaskConfig.BatchProcessingMode.CLOSED_SEGMENTS) {
return appenderatorsManager.createClosedSegmentsOfflineAppenderatorForTask(
Expand All @@ -96,7 +100,8 @@ public static Appenderator newAppenderator(
toolbox.getIndexIO(),
toolbox.getIndexMergerV9(),
rowIngestionMeters,
parseExceptionHandler
parseExceptionHandler,
useMaxMemoryEstimates
);
} else if (toolbox.getConfig().getBatchProcessingMode() == TaskConfig.BatchProcessingMode.CLOSED_SEGMENTS_SINKS) {
return appenderatorsManager.createOfflineAppenderatorForTask(
Expand All @@ -109,7 +114,8 @@ public static Appenderator newAppenderator(
toolbox.getIndexIO(),
toolbox.getIndexMergerV9(),
rowIngestionMeters,
parseExceptionHandler
parseExceptionHandler,
useMaxMemoryEstimates
);
} else {
throw new IAE("Invalid batchProcesingMode[%s]", toolbox.getConfig().getBatchProcessingMode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,8 @@ private TaskStatus generateAndPublishSegments(
dataSchema,
tuningConfig,
buildSegmentsMeters,
buildSegmentsParseExceptionHandler
buildSegmentsParseExceptionHandler,
isUseMaxMemoryEstimates()
);
boolean exceptionOccurred = false;
try (final BatchAppenderatorDriver driver = BatchAppenderators.newDriver(appenderator, toolbox, segmentAllocator)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,23 @@ public class Tasks
public static final long DEFAULT_LOCK_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);
public static final boolean DEFAULT_FORCE_TIME_CHUNK_LOCK = true;
public static final boolean DEFAULT_STORE_COMPACTION_STATE = false;
public static final boolean DEFAULT_USE_MAX_MEMORY_ESTIMATES = true;

public static final String PRIORITY_KEY = "priority";
public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout";
public static final String FORCE_TIME_CHUNK_LOCK_KEY = "forceTimeChunkLock";
public static final String USE_SHARED_LOCK = "useSharedLock";

/**
* Context flag denoting if maximum possible values should be used to estimate
* on-heap memory usage while indexing. Refer to OnHeapIncrementalIndex for
* more details.
*
* The value of this flag is true by default which corresponds to the old method
* of estimation.
*/
public static final String USE_MAX_MEMORY_ESTIMATES = "useMaxMemoryEstimates";

/**
* This context is used in compaction. When it is set in the context, the segments created by the task
* will fill 'lastCompactionState' in its metadata. This will be used to track what segments are compacted or not.
Expand Down
Loading

0 comments on commit e648b01

Please sign in to comment.