Skip to content

Commit

Permalink
Fix estimations, add javadoc
Browse files Browse the repository at this point in the history
  • Loading branch information
kfaraz committed Jan 17, 2022
1 parent 598cfb5 commit 9908c0b
Show file tree
Hide file tree
Showing 20 changed files with 220 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ public class SketchAggregator implements Aggregator
private Sketch sketch;

@Nullable
private static final Field sketchField;
private static final Field SKETCH_FIELD;

static {
try {
sketchField = Class.forName("org.apache.datasketches.theta.UnionImpl")
.getDeclaredField("gadget_");
sketchField.setAccessible(true);
SKETCH_FIELD = Class.forName("org.apache.datasketches.theta.UnionImpl")
.getDeclaredField("gadget_");
SKETCH_FIELD.setAccessible(true);
}
catch (NoSuchFieldException | ClassNotFoundException e) {
throw new ISE(e, "Could not initialize SketchAggregator");
Expand All @@ -71,7 +72,7 @@ private void initUnion()
private void initSketch()
{
try {
sketch = (Sketch) sketchField.get(union);
sketch = (Sketch) SKETCH_FIELD.get(union);
}
catch (IllegalAccessException e) {
throw new ISE(e, "Could not initialize sketch field in SketchAggregator");
Expand Down Expand Up @@ -105,8 +106,9 @@ public long aggregateWithSize()
if (union == null) {
initUnion();

// Fields in UnionImpl: a sketch reference, a short, a long and a boolean
unionSizeDelta = Long.BYTES + Short.BYTES + Long.BYTES + 1;
// Size of UnionImpl = 16B (object header) + 8B (sketch ref) + 2B (short)
// + 8B (long) + 1B (boolean) + 5B (padding) = 40B
unionSizeDelta = 40L;
}

long initialSketchSize = 0;
Expand Down Expand Up @@ -198,8 +200,10 @@ static void updateUnion(Union union, Object update)
*/
public long getInitialSizeBytes()
{
// SketchAggregator has 3 references and an int
return 3L * Long.BYTES + Integer.BYTES;
// Size = 16B (object header) + 24B (3 refs) + 4B (int size) = 44B
// Due to 8-byte alignment, size = 48B
// (see https://www.baeldung.com/java-memory-layout)
return 48L;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorAndSize;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
import org.apache.druid.query.aggregation.AggregatorAndSize;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnInspector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.Druids;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorAndSize;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.theta.oldapi.OldSketchBuildAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.theta.oldapi.OldSketchMergeAggregatorFactory;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ public class Tasks
public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout";
public static final String FORCE_TIME_CHUNK_LOCK_KEY = "forceTimeChunkLock";
public static final String USE_SHARED_LOCK = "useSharedLock";

/**
* Context flag denoting if maximum possible values should be used to estimate
* on-heap memory usage while indexing. Refer to OnHeapIncrementalIndex for
* more details.
*
* The value of this flag is true by default which corresponds to the old method
* of estimation.
*/
public static final String USE_MAX_MEMORY_ESTIMATES = "useMaxMemoryEstimates";

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public SamplerResponse sample(

//keep the index of the row to be added to responseRows for further use
final int rowIndex = responseRows.size();
IncrementalIndexAddResult addResult = index.add(new SamplerInputRow(row, rowIndex), true);
IncrementalIndexAddResult addResult = index.add(new SamplerInputRow(row, rowIndex), true, true);
if (addResult.hasParseException()) {
responseRows.add(new SamplerResponseRow(
rawColumns,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,19 @@
@ExtensionPoint
public interface Aggregator extends Closeable
{
/**
* Performs aggregation.
*/
void aggregate();

/**
* Performs aggregation and returns the increase in required on-heap memory
* caused by this aggregation step.
* <p>
* The default implementation of this method calls {@link #aggregate()} and returns 0.
*
* @return Increase in required on-heap memory caused by this aggregation step.
*/
default long aggregateWithSize()
{
aggregate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
*/
public class AggregatorAndSize
{

// TODO: include default overhead for object sizes

private final Aggregator aggregator;
private final long initialSizeBytes;

/**
* @param aggregator Aggregator
* @param initialSizeBytes Initial size in bytes (including JVM object overheads)
* required by the aggregator.
*/
public AggregatorAndSize(Aggregator aggregator, long initialSizeBytes)
{
this.aggregator = aggregator;
Expand All @@ -42,6 +44,9 @@ public Aggregator getAggregator()
return aggregator;
}

/**
* Initial size of the aggregator in bytes including JVM object overheads.
*/
public long getInitialSizeBytes()
{
return initialSizeBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFact

/**
* Creates an {@link Aggregator} based on the provided column selector factory.
* The returned value is a holder object which contains both the aggregator
* and its initial size in bytes. The callers can then invoke
* {@link Aggregator#aggregateWithSize()} to perform aggregation and get back
* the incremental memory required in each aggregate call. Combined with the
* initial size, this gives the total on-heap memory required by the aggregator.
*
* This flow does not require invoking {@link #guessAggregatorHeapFootprint(long)}
* which tends to over-estimate the required memory.
*
* @return AggregatorAndSize which contains the actual aggregator and its initial size.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.segment;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import org.apache.druid.query.filter.ValueMatcher;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
Expand All @@ -29,6 +30,7 @@
import org.apache.druid.segment.incremental.IncrementalIndexRowHolder;

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.util.Iterator;

/**
Expand All @@ -43,9 +45,25 @@ public abstract class DictionaryEncodedColumnIndexer<KeyType, ActualType extends
@Nullable
protected SortedDimensionDictionary<ActualType> sortedLookup;

/**
* Creates a new DictionaryEncodedColumnIndexer with the default implementation
* of {@link DimensionDictionary}.
* <p>
* Using this constructor may cause incorrect memory estimations of the dictionary size.
*/
public DictionaryEncodedColumnIndexer()
{
this.dimLookup = new DimensionDictionary();
this(new DimensionDictionary<>());
}

/**
* Creates a new DictionaryEncodedColumnIndexer.
*
* @param dimLookup Dimension Dictionary to lookup dimension values.
*/
public DictionaryEncodedColumnIndexer(@NotNull DimensionDictionary<ActualType> dimLookup)
{
this.dimLookup = Preconditions.checkNotNull(dimLookup);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -123,9 +122,8 @@ public int add(@Nullable T originalValue)
valueToId.put(originalValue, index);
idToValue.add(originalValue);

long sizeOfString = getObjectSize(originalValue);
long sizeOfReference = Long.BYTES;
sizeInBytes.addAndGet(sizeOfString + 2 * sizeOfReference);
// Add size of new dim value and 2 references (valueToId and idToValue)
sizeInBytes.addAndGet(estimateSizeOfValue(originalValue) + 2 * Long.BYTES);

minValue = minValue == null || minValue.compareTo(originalValue) > 0 ? originalValue : minValue;
maxValue = maxValue == null || maxValue.compareTo(originalValue) < 0 ? originalValue : maxValue;
Expand Down Expand Up @@ -174,15 +172,15 @@ public SortedDimensionDictionary<T> sort()
}
}

private long getObjectSize(@Nonnull T object)
/**
* Estimates the size of a dimension value in bytes. This method is called
* only when a new dimension value is being added to the lookup.
*
* @return 0 by default
*/
public long estimateSizeOfValue(T value)
{
// According to https://www.ibm.com/developerworks/java/library/j-codetoheap/index.html
// String has the following memory usuage...
// 28 bytes of data for String metadata (class pointer, flags, locks, hash, count, offset, reference to char array)
// 16 bytes of data for the char array metadata (class pointer, flags, locks, size)
// 2 bytes for every letter of the string
String val = object.toString();
return 28 + 16 + (2L * val.length());
return 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -107,27 +107,34 @@
* @param <ActualType> class of a single actual value
*
*/
public interface DimensionIndexer
<EncodedType extends Comparable<EncodedType>, EncodedKeyComponentType, ActualType extends Comparable<ActualType>>
public interface DimensionIndexer<
EncodedType extends Comparable<EncodedType>,
EncodedKeyComponentType,
ActualType extends Comparable<ActualType>>
{

/**
* Given a single row value or list of row values (for multi-valued dimensions), update any internal data structures
* with the ingested values and return the row values as an array to be used within a Row key.
*
* For example, the dictionary-encoded String-type column will return an int[] containing a dictionary ID.
*
* The value within the returned array should be encoded if applicable, i.e. as instances of EncodedType.
*
* NOTE: This function can change the internal state of the DimensionIndexer.
*
* @param dimValues Single row val to process
*
* @param reportParseExceptions
* @return An array containing an encoded representation of the input row value.
* Encodes the given row value(s) of the dimension to be used within a row key.
* It also updates the internal state of the DimensionIndexer, e.g. the dimLookup.
* <p>
* For example, the dictionary-encoded String-type column will return an int[]
* containing dictionary IDs.
* <p>
*
* @param dimValues Value(s) of the dimension in a row. This can
* either be a single value or a list of values
* (for multi-valued dimensions)
* @param reportParseExceptions true if parse exceptions should be reported,
* false otherwise
* @return Encoded dimension value(s) to be used as a component for the row key.
* Contains an object of the {@link EncodedKeyComponentType} and the increase
* in size of the DimensionIndexer due to any newly added dimension value.
*/
@Nullable
EncodedDimensionValue<EncodedKeyComponentType> processRowValsToUnsortedEncodedKeyComponent(@Nullable Object dimValues, boolean reportParseExceptions);
EncodedKeyComponent<EncodedKeyComponentType> processRowValsToUnsortedEncodedKeyComponent(
@Nullable Object dimValues,
boolean reportParseExceptions
);

/**
* This method will be called while building an {@link IncrementalIndex} whenever a known dimension column (either
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class DoubleDimensionIndexer implements DimensionIndexer<Double, Double,

@Nullable
@Override
public EncodedDimensionValue<Double> processRowValsToUnsortedEncodedKeyComponent(@Nullable Object dimValues, boolean reportParseExceptions)
public EncodedKeyComponent<Double> processRowValsToUnsortedEncodedKeyComponent(@Nullable Object dimValues, boolean reportParseExceptions)
{
if (dimValues instanceof List) {
throw new UnsupportedOperationException("Numeric columns do not support multivalue rows.");
Expand All @@ -54,7 +54,7 @@ public EncodedDimensionValue<Double> processRowValsToUnsortedEncodedKeyComponent
if (d == null) {
hasNulls = NullHandling.sqlCompatible();
}
return new EncodedDimensionValue<>(d, Double.BYTES);
return new EncodedKeyComponent<>(d, Double.BYTES);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.segment;

import javax.annotation.Nullable;

/**
* Represents the encoded component of a row key corresponding to a single dimension.
* The row key contains a component for each dimension.
* <p>
* Contains:
* <ul>
* <li>the encoded dimension value(s)</li>
* <li>the increase in size (in bytes) caused by adding the dimension value(s)</li>
* </ul>
*
* @param <K> Encoded key component type
*/
public class EncodedKeyComponent<K>
{
@Nullable
private final K component;
private final long incrementalSizeBytes;

EncodedKeyComponent(@Nullable K component, long incrementalSizeBytes)
{
this.component = component;
this.incrementalSizeBytes = incrementalSizeBytes;
}

/**
* The encoded dimension value(s) to be used a component for a row key.
*/
@Nullable
public K getComponent()
{
return component;
}

/**
* Increase in size (in bytes) caused by adding the dimension value(s).
*/
public long getIncrementalSizeBytes()
{
return incrementalSizeBytes;
}
}
Loading

0 comments on commit 9908c0b

Please sign in to comment.