Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Aggregator and AggregatorFactory interfaces to improve mem estimates #12073

Merged
merged 17 commits into from
Feb 3, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,42 @@

import org.apache.datasketches.Family;
import org.apache.datasketches.theta.SetOperation;
import org.apache.datasketches.theta.Sketch;
import org.apache.datasketches.theta.Union;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.BaseObjectColumnValueSelector;

import javax.annotation.Nullable;
import java.lang.reflect.Field;
import java.util.List;

public class SketchAggregator implements Aggregator
{

private final BaseObjectColumnValueSelector selector;
private final int size;

@Nullable
private Union union;

@Nullable
private Sketch sketch;

@Nullable
private static final Field sketchField;
static {
try {
sketchField = Class.forName("org.apache.datasketches.theta.UnionImpl")
.getDeclaredField("gadget_");
sketchField.setAccessible(true);
}
catch (NoSuchFieldException | ClassNotFoundException e) {
throw new ISE(e, "Could not initialize SketchAggregator");
}
}

public SketchAggregator(BaseObjectColumnValueSelector selector, int size)
{
this.selector = selector;
Expand All @@ -49,6 +68,16 @@ private void initUnion()
union = (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION);
}

private void initSketch()
{
try {
sketch = (Sketch) sketchField.get(union);
}
catch (IllegalAccessException e) {
throw new ISE(e, "Could not initialize sketch field in SketchAggregator");
}
}

@Override
public void aggregate()
{
Expand All @@ -64,6 +93,36 @@ public void aggregate()
}
}

@Override
public long aggregateWithSize()
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
{
Object update = selector.getObject();
if (update == null) {
return 0;
}
synchronized (this) {
long unionSizeDelta = 0;
if (union == null) {
initUnion();

// Fields in UnionImpl: a sketch reference, a short, a long and a boolean
unionSizeDelta = Long.BYTES + Short.BYTES + Long.BYTES + 1;
}

long initialSketchSize = 0;
if (sketch == null) {
initSketch();
} else {
initialSketchSize = sketch.getCurrentBytes();
}

updateUnion(union, update);

long sketchSizeDelta = sketch.getCurrentBytes() - initialSketchSize;
return sketchSizeDelta + unionSizeDelta;
}
}

@Override
public Object get()
{
Expand Down Expand Up @@ -133,4 +192,14 @@ static void updateUnion(Union union, Object update)
throw new ISE("Illegal type received while theta sketch merging [%s]", update.getClass());
}
}

/**
* Gets the initial size of this aggregator in bytes.
*/
public long getInitialSizeBytes()
{
// SketchAggregator has 3 references and an int
return 3L * Long.BYTES + Integer.BYTES;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
import org.apache.druid.query.aggregation.AggregatorAndSize;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnInspector;
Expand Down Expand Up @@ -80,6 +81,14 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory)
return new SketchAggregator(selector, size);
}

@Override
public AggregatorAndSize factorizeWithSize(ColumnSelectorFactory metricFactory)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs some documentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

{
BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName);
final SketchAggregator aggregator = new SketchAggregator(selector, size);
return new AggregatorAndSize(aggregator, aggregator.getInitialSizeBytes());
}

@SuppressWarnings("unchecked")
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,18 @@
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.Druids;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorAndSize;
import org.apache.druid.query.aggregation.datasketches.theta.oldapi.OldSketchBuildAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.theta.oldapi.OldSketchMergeAggregatorFactory;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -61,6 +65,21 @@ public void testMaxIntermediateSize()
Assert.assertEquals(524320, AGGREGATOR_32768.getMaxIntermediateSize());
}

@Test
public void testFactorizeSized()
{
ColumnSelectorFactory colSelectorFactory = EasyMock.mock(ColumnSelectorFactory.class);
EasyMock.expect(colSelectorFactory.makeColumnValueSelector(EasyMock.anyString()))
.andReturn(EasyMock.createMock(ColumnValueSelector.class)).anyTimes();
EasyMock.replay(colSelectorFactory);

AggregatorAndSize aggregatorAndSize = AGGREGATOR_16384.factorizeWithSize(colSelectorFactory);
Assert.assertEquals(28, aggregatorAndSize.getInitialSizeBytes());

aggregatorAndSize = AGGREGATOR_32768.factorizeWithSize(colSelectorFactory);
Assert.assertEquals(28, aggregatorAndSize.getInitialSizeBytes());
}

@Test
public void testResultArraySignature()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,15 @@ public Map<String, Object> getContext()
{
return context;
}

/**
* Whether maximum memory usage should be considered in estimation for indexing tasks.
*/
protected boolean isUseMaxMemoryEstimates()
{
return getContextValue(
Tasks.USE_MAX_MEMORY_ESTIMATES,
Tasks.DEFAULT_USE_MAX_MEMORY_ESTIMATES
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,8 @@ private Appenderator newAppenderator(
toolbox.getCacheConfig(),
toolbox.getCachePopulatorStats(),
rowIngestionMeters,
parseExceptionHandler
parseExceptionHandler,
isUseMaxMemoryEstimates()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public static Appenderator newAppenderator(
DataSchema dataSchema,
AppenderatorConfig appenderatorConfig,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler
ParseExceptionHandler parseExceptionHandler,
boolean useMaxMemoryEstimates
)
{
return newAppenderator(
Expand All @@ -56,7 +57,8 @@ public static Appenderator newAppenderator(
appenderatorConfig,
toolbox.getSegmentPusher(),
rowIngestionMeters,
parseExceptionHandler
parseExceptionHandler,
useMaxMemoryEstimates
);
}

Expand All @@ -69,7 +71,8 @@ public static Appenderator newAppenderator(
AppenderatorConfig appenderatorConfig,
DataSegmentPusher segmentPusher,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler
ParseExceptionHandler parseExceptionHandler,
boolean useMaxMemoryEstimates
)
{
if (toolbox.getConfig().getBatchProcessingMode() == TaskConfig.BatchProcessingMode.OPEN_SEGMENTS) {
Expand All @@ -83,7 +86,8 @@ public static Appenderator newAppenderator(
toolbox.getIndexIO(),
toolbox.getIndexMergerV9(),
rowIngestionMeters,
parseExceptionHandler
parseExceptionHandler,
useMaxMemoryEstimates
);
} else if (toolbox.getConfig().getBatchProcessingMode() == TaskConfig.BatchProcessingMode.CLOSED_SEGMENTS) {
return appenderatorsManager.createClosedSegmentsOfflineAppenderatorForTask(
Expand All @@ -96,7 +100,8 @@ public static Appenderator newAppenderator(
toolbox.getIndexIO(),
toolbox.getIndexMergerV9(),
rowIngestionMeters,
parseExceptionHandler
parseExceptionHandler,
useMaxMemoryEstimates
);
} else if (toolbox.getConfig().getBatchProcessingMode() == TaskConfig.BatchProcessingMode.CLOSED_SEGMENTS_SINKS) {
return appenderatorsManager.createOfflineAppenderatorForTask(
Expand All @@ -109,7 +114,8 @@ public static Appenderator newAppenderator(
toolbox.getIndexIO(),
toolbox.getIndexMergerV9(),
rowIngestionMeters,
parseExceptionHandler
parseExceptionHandler,
useMaxMemoryEstimates
);
} else {
throw new IAE("Invalid batchProcesingMode[%s]", toolbox.getConfig().getBatchProcessingMode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,8 @@ private TaskStatus generateAndPublishSegments(
dataSchema,
tuningConfig,
buildSegmentsMeters,
buildSegmentsParseExceptionHandler
buildSegmentsParseExceptionHandler,
isUseMaxMemoryEstimates()
);
boolean exceptionOccurred = false;
try (final BatchAppenderatorDriver driver = BatchAppenderators.newDriver(appenderator, toolbox, segmentAllocator)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@ public class Tasks
public static final long DEFAULT_LOCK_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);
public static final boolean DEFAULT_FORCE_TIME_CHUNK_LOCK = true;
public static final boolean DEFAULT_STORE_COMPACTION_STATE = false;
public static final boolean DEFAULT_USE_MAX_MEMORY_ESTIMATES = true;

public static final String PRIORITY_KEY = "priority";
public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout";
public static final String FORCE_TIME_CHUNK_LOCK_KEY = "forceTimeChunkLock";
public static final String USE_SHARED_LOCK = "useSharedLock";
public static final String USE_MAX_MEMORY_ESTIMATES = "useMaxMemoryEstimates";

/**
* This context is used in compaction. When it is set in the context, the segments created by the task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch;
import org.apache.druid.indexing.common.task.SequenceNameFunction;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder;
import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
import org.apache.druid.query.DruidMetrics;
Expand Down Expand Up @@ -169,6 +170,10 @@ private List<DataSegment> generateSegments(
tuningConfig.getMaxParseExceptions(),
tuningConfig.getMaxSavedParseExceptions()
);
final boolean useMaxMemoryEstimates = getContextValue(
Tasks.USE_MAX_MEMORY_ESTIMATES,
Tasks.DEFAULT_USE_MAX_MEMORY_ESTIMATES
);
final Appenderator appenderator = BatchAppenderators.newAppenderator(
getId(),
toolbox.getAppenderatorsManager(),
Expand All @@ -178,7 +183,8 @@ private List<DataSegment> generateSegments(
tuningConfig,
new ShuffleDataSegmentPusher(supervisorTaskId, getId(), toolbox.getIntermediaryDataManager()),
buildSegmentsMeters,
parseExceptionHandler
parseExceptionHandler,
useMaxMemoryEstimates
);
boolean exceptionOccurred = false;
try (final BatchAppenderatorDriver driver = BatchAppenderators.newDriver(appenderator, toolbox, segmentAllocator)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,10 @@ private Set<DataSegment> generateAndPushSegments(
useLineageBasedSegmentAllocation
);

final boolean useMaxMemoryEstimates = getContextValue(
Tasks.USE_MAX_MEMORY_ESTIMATES,
Tasks.DEFAULT_USE_MAX_MEMORY_ESTIMATES
);
final Appenderator appenderator = BatchAppenderators.newAppenderator(
getId(),
toolbox.getAppenderatorsManager(),
Expand All @@ -401,7 +405,8 @@ private Set<DataSegment> generateAndPushSegments(
dataSchema,
tuningConfig,
rowIngestionMeters,
parseExceptionHandler
parseExceptionHandler,
useMaxMemoryEstimates
);
boolean exceptionOccurred = false;
try (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ public Appenderator newAppenderator(
toolbox.getCacheConfig(),
toolbox.getCachePopulatorStats(),
rowIngestionMeters,
parseExceptionHandler
parseExceptionHandler,
isUseMaxMemoryEstimates()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ public Map<String, Object> makeLoadSpec(URI uri)
indexIO,
indexMerger,
rowIngestionMeters,
new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0)
new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0),
true
);
break;
case "CLOSED_SEGMENTS":
Expand All @@ -259,7 +260,8 @@ public Map<String, Object> makeLoadSpec(URI uri)
indexIO,
indexMerger,
rowIngestionMeters,
new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0)
new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0),
true
);

break;
Expand All @@ -274,7 +276,8 @@ public Map<String, Object> makeLoadSpec(URI uri)
indexIO,
indexMerger,
rowIngestionMeters,
new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0)
new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0),
true
);
break;
default:
Expand Down
Loading