Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Aggregator and AggregatorFactory interfaces to improve mem estimates #12073

Merged
merged 17 commits into from
Feb 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class StringDimensionIndexerBenchmark
@Setup
public void setup()
{
indexer = new StringDimensionIndexer(DimensionSchema.MultiValueHandling.ofDefault(), true, false);
indexer = new StringDimensionIndexer(DimensionSchema.MultiValueHandling.ofDefault(), true, false, true);

for (int i = 0; i < cardinality; i++) {
indexer.processRowValsToUnsortedEncodedKeyComponent("abcd-" + i, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,51 @@

import org.apache.datasketches.Family;
import org.apache.datasketches.theta.SetOperation;
import org.apache.datasketches.theta.Sketch;
import org.apache.datasketches.theta.Union;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.BaseObjectColumnValueSelector;

import javax.annotation.Nullable;
import java.lang.reflect.Field;
import java.util.List;

public class SketchAggregator implements Aggregator
{

private final BaseObjectColumnValueSelector selector;
private final int size;

@Nullable
private Union union;

@Nullable
private Sketch sketch;

@Nullable
private static Field SKETCH_FIELD;

/**
* Initializes static fields of the SketchAggregator needed for memory
* estimation.
*/
public static synchronized void initialize()
{
if (SKETCH_FIELD != null) {
return;
}
try {
SKETCH_FIELD = Class.forName("org.apache.datasketches.theta.UnionImpl")
.getDeclaredField("gadget_");
SKETCH_FIELD.setAccessible(true);
}
catch (NoSuchFieldException | ClassNotFoundException e) {
throw new ISE(e, "Could not initialize SketchAggregator");
}
}

public SketchAggregator(BaseObjectColumnValueSelector selector, int size)
{
this.selector = selector;
Expand All @@ -49,6 +77,16 @@ private void initUnion()
union = (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION);
}

private void initSketch()
{
try {
sketch = (Sketch) SKETCH_FIELD.get(union);
}
catch (IllegalAccessException e) {
throw new ISE(e, "Could not initialize sketch field in SketchAggregator");
}
}

@Override
public void aggregate()
{
Expand All @@ -64,6 +102,37 @@ public void aggregate()
}
}

@Override
public long aggregateWithSize()
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
{
Object update = selector.getObject();
if (update == null) {
return 0;
}
synchronized (this) {
long unionSizeDelta = 0;
if (union == null) {
initUnion();

// Size of UnionImpl = 16B (object header) + 8B (sketch ref) + 2B (short)
// + 8B (long) + 1B (boolean) + 5B (padding) = 40B
unionSizeDelta = 40L;
}

long initialSketchSize = 0;
if (sketch == null) {
initSketch();
} else {
initialSketchSize = sketch.getCurrentBytes();
}

updateUnion(union, update);

long sketchSizeDelta = sketch.getCurrentBytes() - initialSketchSize;
return sketchSizeDelta + unionSizeDelta;
}
}

@Override
public Object get()
{
Expand Down Expand Up @@ -133,4 +202,16 @@ static void updateUnion(Union union, Object update)
throw new ISE("Illegal type received while theta sketch merging [%s]", update.getClass());
}
}

/**
* Gets the initial size of this aggregator in bytes.
*/
public long getInitialSizeBytes()
{
// Size = 16B (object header) + 24B (3 refs) + 4B (int size) = 44B
// Due to 8-byte alignment, size = 48B
// (see https://www.baeldung.com/java-memory-layout)
return 48L;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorAndSize;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
Expand Down Expand Up @@ -80,6 +81,14 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory)
return new SketchAggregator(selector, size);
}

@Override
public AggregatorAndSize factorizeWithSize(ColumnSelectorFactory metricFactory)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs some documentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

{
BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName);
final SketchAggregator aggregator = new SketchAggregator(selector, size);
return new AggregatorAndSize(aggregator, aggregator.getInitialSizeBytes());
}

@SuppressWarnings("unchecked")
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public void configure(Binder binder)
ThetaSketchApproxCountDistinctSqlAggregator.NAME,
ThetaSketchApproxCountDistinctSqlAggregator.class
);
SketchAggregator.initialize();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,48 @@ public void testUpdateUnionWithDouble()
Assert.assertEquals(1, ((SketchHolder) agg.get()).getSketch().getEstimate(), 0);
}

@Test
public void testAggregateWithSize()
{
SketchAggregator.initialize();

final String[] columnValues = new String[20];
for (int i = 0; i < columnValues.length; ++i) {
columnValues[i] = "" + i;
}

final TestObjectColumnSelector<String> selector = new TestObjectColumnSelector<>(columnValues);
final SketchAggregator agg = new SketchAggregator(selector, 128);

// Verify initial size of sketch
Assert.assertEquals(48L, agg.getInitialSizeBytes());
Assert.assertEquals(328L, agg.aggregateWithSize());

// Verify that subsequent size deltas are zero
for (int i = 1; i < 16; ++i) {
selector.increment();
long sizeDelta = agg.aggregateWithSize();
Assert.assertEquals(0, sizeDelta);
}

// Verify that size delta is positive when sketch resizes
selector.increment();
long deltaAtResize = agg.aggregateWithSize();
Assert.assertEquals(1792, deltaAtResize);

for (int i = 17; i < columnValues.length; ++i) {
selector.increment();
long sizeDelta = agg.aggregateWithSize();
Assert.assertEquals(0, sizeDelta);
}

// Verify unique count estimate
SketchHolder sketchHolder = (SketchHolder) agg.get();
Assert.assertEquals(columnValues.length, sketchHolder.getEstimate(), 0);
Assert.assertNotNull(sketchHolder.getSketch());
Assert.assertEquals(columnValues.length, sketchHolder.getSketch().getEstimate(), 0);
}

private void assertPostAggregatorSerde(PostAggregator agg) throws Exception
{
Assert.assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.Druids;
import org.apache.druid.query.aggregation.AggregatorAndSize;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.theta.oldapi.OldSketchBuildAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.theta.oldapi.OldSketchMergeAggregatorFactory;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -61,6 +65,21 @@ public void testMaxIntermediateSize()
Assert.assertEquals(524320, AGGREGATOR_32768.getMaxIntermediateSize());
}

@Test
public void testFactorizeSized()
{
ColumnSelectorFactory colSelectorFactory = EasyMock.mock(ColumnSelectorFactory.class);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector(EasyMock.anyString()))
.andReturn(EasyMock.createMock(ColumnValueSelector.class)).anyTimes();
EasyMock.replay(colSelectorFactory);

AggregatorAndSize aggregatorAndSize = AGGREGATOR_16384.factorizeWithSize(colSelectorFactory);
Assert.assertEquals(48, aggregatorAndSize.getInitialSizeBytes());

aggregatorAndSize = AGGREGATOR_32768.factorizeWithSize(colSelectorFactory);
Assert.assertEquals(48, aggregatorAndSize.getInitialSizeBytes());
}

@Test
public void testResultArraySignature()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,15 @@ public Map<String, Object> getContext()
{
return context;
}

/**
* Whether maximum memory usage should be considered in estimation for indexing tasks.
*/
protected boolean isUseMaxMemoryEstimates()
{
return getContextValue(
Tasks.USE_MAX_MEMORY_ESTIMATES,
Tasks.DEFAULT_USE_MAX_MEMORY_ESTIMATES
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,8 @@ private Appenderator newAppenderator(
toolbox.getCacheConfig(),
toolbox.getCachePopulatorStats(),
rowIngestionMeters,
parseExceptionHandler
parseExceptionHandler,
isUseMaxMemoryEstimates()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public static Appenderator newAppenderator(
DataSchema dataSchema,
AppenderatorConfig appenderatorConfig,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler
ParseExceptionHandler parseExceptionHandler,
boolean useMaxMemoryEstimates
)
{
return newAppenderator(
Expand All @@ -56,7 +57,8 @@ public static Appenderator newAppenderator(
appenderatorConfig,
toolbox.getSegmentPusher(),
rowIngestionMeters,
parseExceptionHandler
parseExceptionHandler,
useMaxMemoryEstimates
);
}

Expand All @@ -69,7 +71,8 @@ public static Appenderator newAppenderator(
AppenderatorConfig appenderatorConfig,
DataSegmentPusher segmentPusher,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler
ParseExceptionHandler parseExceptionHandler,
boolean useMaxMemoryEstimates
)
{
if (toolbox.getConfig().getBatchProcessingMode() == TaskConfig.BatchProcessingMode.OPEN_SEGMENTS) {
Expand All @@ -83,7 +86,8 @@ public static Appenderator newAppenderator(
toolbox.getIndexIO(),
toolbox.getIndexMergerV9(),
rowIngestionMeters,
parseExceptionHandler
parseExceptionHandler,
useMaxMemoryEstimates
);
} else if (toolbox.getConfig().getBatchProcessingMode() == TaskConfig.BatchProcessingMode.CLOSED_SEGMENTS) {
return appenderatorsManager.createClosedSegmentsOfflineAppenderatorForTask(
Expand All @@ -96,7 +100,8 @@ public static Appenderator newAppenderator(
toolbox.getIndexIO(),
toolbox.getIndexMergerV9(),
rowIngestionMeters,
parseExceptionHandler
parseExceptionHandler,
useMaxMemoryEstimates
);
} else if (toolbox.getConfig().getBatchProcessingMode() == TaskConfig.BatchProcessingMode.CLOSED_SEGMENTS_SINKS) {
return appenderatorsManager.createOfflineAppenderatorForTask(
Expand All @@ -109,7 +114,8 @@ public static Appenderator newAppenderator(
toolbox.getIndexIO(),
toolbox.getIndexMergerV9(),
rowIngestionMeters,
parseExceptionHandler
parseExceptionHandler,
useMaxMemoryEstimates
);
} else {
throw new IAE("Invalid batchProcesingMode[%s]", toolbox.getConfig().getBatchProcessingMode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,8 @@ private TaskStatus generateAndPublishSegments(
dataSchema,
tuningConfig,
buildSegmentsMeters,
buildSegmentsParseExceptionHandler
buildSegmentsParseExceptionHandler,
isUseMaxMemoryEstimates()
);
boolean exceptionOccurred = false;
try (final BatchAppenderatorDriver driver = BatchAppenderators.newDriver(appenderator, toolbox, segmentAllocator)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,23 @@ public class Tasks
public static final long DEFAULT_LOCK_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);
public static final boolean DEFAULT_FORCE_TIME_CHUNK_LOCK = true;
public static final boolean DEFAULT_STORE_COMPACTION_STATE = false;
public static final boolean DEFAULT_USE_MAX_MEMORY_ESTIMATES = true;

public static final String PRIORITY_KEY = "priority";
public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout";
public static final String FORCE_TIME_CHUNK_LOCK_KEY = "forceTimeChunkLock";
public static final String USE_SHARED_LOCK = "useSharedLock";

/**
* Context flag denoting if maximum possible values should be used to estimate
* on-heap memory usage while indexing. Refer to OnHeapIncrementalIndex for
* more details.
*
* The value of this flag is true by default which corresponds to the old method
* of estimation.
*/
public static final String USE_MAX_MEMORY_ESTIMATES = "useMaxMemoryEstimates";

/**
* This context is used in compaction. When it is set in the context, the segments created by the task
* will fill 'lastCompactionState' in its metadata. This will be used to track what segments are compacted or not.
Expand Down
Loading