Skip to content

Commit

Permalink
[Iceberg] Add histogram statistic support
Browse files Browse the repository at this point in the history
Utilizes the sketch_kll function to generate histograms and store them
into the Iceberg table's puffin files for table-level statistic storage.

Histograms are always collected by ANALYZE, but they are not used by the
cost calculator unless enabled via optimizer.use-histograms
  • Loading branch information
ZacBlanco committed Jun 4, 2024
1 parent 865c8ba commit 74a475c
Show file tree
Hide file tree
Showing 9 changed files with 729 additions and 25 deletions.
16 changes: 7 additions & 9 deletions presto-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -511,23 +511,17 @@
<artifactId>presto-cache</artifactId>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-main</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-parser</artifactId>
<scope>test</scope>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-parser</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-analyzer</artifactId>
<scope>test</scope>
</dependency>

<dependency>
Expand Down Expand Up @@ -598,7 +592,7 @@
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>1.5.0</version>
<version>${dep.iceberg.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
<exclusions>
Expand Down Expand Up @@ -628,6 +622,10 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.FixedWidthType;
import com.facebook.presto.common.type.KllSketchType;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.cost.DisjointRangeDomainHistogram;
import com.facebook.presto.hive.NodeVersion;
import com.facebook.presto.iceberg.statistics.KllHistogram;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.PrestoException;
Expand All @@ -31,6 +34,9 @@
import com.facebook.presto.spi.statistics.TableStatistics;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Range;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.theta.CompactSketch;
import org.apache.iceberg.ContentFile;
Expand Down Expand Up @@ -58,6 +64,8 @@
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
Expand All @@ -81,17 +89,22 @@
import static com.facebook.presto.common.type.TypeUtils.isNumericType;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.common.type.Varchars.isVarcharType;
import static com.facebook.presto.cost.StatisticRange.fromRange;
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getStatisticSnapshotRecordDifferenceWeight;
import static com.facebook.presto.iceberg.IcebergUtil.getIdentityPartitions;
import static com.facebook.presto.iceberg.Partition.toMap;
import static com.facebook.presto.iceberg.TypeConverter.toPrestoType;
import static com.facebook.presto.iceberg.statistics.KllHistogram.isKllHistogramSupportedType;
import static com.facebook.presto.iceberg.util.StatisticsUtil.calculateAndSetTableSize;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.HISTOGRAM;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.collect.Iterators.getOnlyElement;
import static java.lang.Long.parseLong;
import static java.lang.Math.abs;
import static java.lang.String.format;
Expand All @@ -104,6 +117,7 @@ public class TableStatisticsMaker
private static final Logger log = Logger.get(TableStatisticsMaker.class);
private static final String ICEBERG_THETA_SKETCH_BLOB_TYPE_ID = "apache-datasketches-theta-v1";
private static final String ICEBERG_DATA_SIZE_BLOB_TYPE_ID = "presto-sum-data-size-bytes-v1";
private static final String ICEBERG_KLL_SKETCH_BLOB_TYPE_ID = "presto-kll-sketch-bytes-v1";
private static final String ICEBERG_THETA_SKETCH_BLOB_PROPERTY_NDV_KEY = "ndv";
private static final String ICEBERG_DATA_SIZE_BLOB_PROPERTY_KEY = "data_size";
private final Table icebergTable;
Expand All @@ -120,11 +134,13 @@ private TableStatisticsMaker(Table icebergTable, ConnectorSession session, TypeM
private static final Map<ColumnStatisticType, PuffinBlobGenerator> puffinStatWriters = ImmutableMap.<ColumnStatisticType, PuffinBlobGenerator>builder()
.put(NUMBER_OF_DISTINCT_VALUES, TableStatisticsMaker::generateNDVBlob)
.put(TOTAL_SIZE_IN_BYTES, TableStatisticsMaker::generateStatSizeBlob)
.put(HISTOGRAM, TableStatisticsMaker::generateKllSketchBlob)
.build();

private static final Map<String, PuffinBlobReader> puffinStatReaders = ImmutableMap.<String, PuffinBlobReader>builder()
.put(ICEBERG_THETA_SKETCH_BLOB_TYPE_ID, TableStatisticsMaker::readNDVBlob)
.put(ICEBERG_DATA_SIZE_BLOB_TYPE_ID, TableStatisticsMaker::readDataSizeBlob)
.put(ICEBERG_KLL_SKETCH_BLOB_TYPE_ID, TableStatisticsMaker::readKllSketchBlob)
.build();

public static TableStatistics getTableStatistics(ConnectorSession session, TypeManager typeManager, Optional<TupleDomain<IcebergColumnHandle>> currentPredicate, Constraint constraint, IcebergTableHandle tableHandle, Table icebergTable, List<IcebergColumnHandle> columns)
Expand Down Expand Up @@ -206,7 +222,15 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Opti
Object min = summary.getMinValues().get(fieldId);
Object max = summary.getMaxValues().get(fieldId);
if (min instanceof Number && max instanceof Number) {
columnBuilder.setRange(Optional.of(new DoubleRange(((Number) min).doubleValue(), ((Number) max).doubleValue())));
DoubleRange range = new DoubleRange(((Number) min).doubleValue(), ((Number) max).doubleValue());
columnBuilder.setRange(Optional.of(range));
// the histogram is generated by scanning the entire dataset. It is possible that
// the constraint prevents scanning portions of the table. Given that we know the
// range that the scan provides for a particular column, bound the histogram to the
// scanned range.
columnBuilder.setHistogram(columnBuilder.getHistogram()
.map(histogram -> DisjointRangeDomainHistogram
.addConjunction(histogram, fromRange(Range.closed(range.getMin(), range.getMax())))));
}
result.setColumnStatistics(columnHandle, columnBuilder.build());
}
Expand Down Expand Up @@ -306,9 +330,8 @@ private void writeTableStatistics(NodeVersion nodeVersion, IcebergTableHandle ta
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
.forEach((key, value) -> {
Optional.ofNullable(puffinStatWriters.get(key.getStatisticType()))
.ifPresent(generator -> {
writer.add(generator.generate(key, value, icebergTable, snapshot));
});
.flatMap(generator -> Optional.ofNullable(generator.generate(key, value, icebergTable, snapshot, typeManager)))
.ifPresent(writer::add);
});
writer.finish();
icebergTable.updateStatistics().setStatistics(
Expand All @@ -333,7 +356,8 @@ private void writeTableStatistics(NodeVersion nodeVersion, IcebergTableHandle ta
@FunctionalInterface
private interface PuffinBlobGenerator
{
Blob generate(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot);
@Nullable
Blob generate(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot, TypeManager typeManager);
}

@FunctionalInterface
Expand All @@ -342,12 +366,12 @@ private interface PuffinBlobReader
/**
* Reads the stats from the blob and then updates the stats builder argument.
*/
void read(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder stats);
void read(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder stats, Table icebergTable, TypeManager typeManager);
}

private static Blob generateNDVBlob(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot)
private static Blob generateNDVBlob(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot, TypeManager typeManager)
{
int id = getFieldId(metadata, icebergTable);
int id = getField(metadata, icebergTable, snapshot).fieldId();
ByteBuffer raw = VARBINARY.getSlice(value, 0).toByteBuffer();
CompactSketch sketch = CompactSketch.wrap(Memory.wrap(raw, ByteOrder.nativeOrder()));
return new Blob(
Expand All @@ -360,9 +384,9 @@ private static Blob generateNDVBlob(ColumnStatisticMetadata metadata, Block valu
ImmutableMap.of(ICEBERG_THETA_SKETCH_BLOB_PROPERTY_NDV_KEY, Long.toString((long) sketch.getEstimate())));
}

private static Blob generateStatSizeBlob(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot)
private static Blob generateStatSizeBlob(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot, TypeManager typeManager)
{
int id = getFieldId(metadata, icebergTable);
int id = getField(metadata, icebergTable, snapshot).fieldId();
long size = BIGINT.getLong(value, 0);
return new Blob(
ICEBERG_DATA_SIZE_BLOB_TYPE_ID,
Expand All @@ -374,7 +398,26 @@ private static Blob generateStatSizeBlob(ColumnStatisticMetadata metadata, Block
ImmutableMap.of(ICEBERG_DATA_SIZE_BLOB_PROPERTY_KEY, Long.toString(size)));
}

private static void readNDVBlob(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder statistics)
private static Blob generateKllSketchBlob(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot, TypeManager typeManager)
{
Types.NestedField field = getField(metadata, icebergTable, snapshot);
KllSketchType sketchType = new KllSketchType(toPrestoType(field.type(), typeManager));
Slice sketchSlice = sketchType.getSlice(value, 0);
if (value.isNull(0)) {
// this can occur when all inputs to the sketch are null
return null;
}
return new Blob(
ICEBERG_KLL_SKETCH_BLOB_TYPE_ID,
ImmutableList.of(field.fieldId()),
snapshot.snapshotId(),
snapshot.sequenceNumber(),
sketchSlice.toByteBuffer(),
null,
ImmutableMap.of());
}

private static void readNDVBlob(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder statistics, Table icebergTable, TypeManager typeManager)
{
Optional.ofNullable(metadata.properties().get(ICEBERG_THETA_SKETCH_BLOB_PROPERTY_NDV_KEY))
.ifPresent(ndvProp -> {
Expand All @@ -389,7 +432,7 @@ private static void readNDVBlob(BlobMetadata metadata, ByteBuffer blob, ColumnSt
});
}

private static void readDataSizeBlob(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder statistics)
private static void readDataSizeBlob(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder statistics, Table icebergTable, TypeManager typeManager)
{
Optional.ofNullable(metadata.properties().get(ICEBERG_DATA_SIZE_BLOB_PROPERTY_KEY))
.ifPresent(sizeProp -> {
Expand All @@ -404,9 +447,17 @@ private static void readDataSizeBlob(BlobMetadata metadata, ByteBuffer blob, Col
});
}

private static int getFieldId(ColumnStatisticMetadata metadata, Table icebergTable)
private static void readKllSketchBlob(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder statistics, Table icebergTable, TypeManager typeManager)
{
return Optional.ofNullable(icebergTable.schema().findField(metadata.getColumnName())).map(Types.NestedField::fieldId)
statistics.setHistogram(Optional.ofNullable(icebergTable.schemas().get(icebergTable.snapshot(metadata.snapshotId()).schemaId()))
.map(schema -> toPrestoType(schema.findType(getOnlyElement(metadata.inputFields().iterator())), typeManager))
.map(prestoType -> new KllHistogram(Slices.wrappedBuffer(blob), prestoType)));
}

private static Types.NestedField getField(ColumnStatisticMetadata metadata, Table icebergTable, Snapshot snapshot)
{
return Optional.ofNullable(icebergTable.schemas().get(snapshot.schemaId()))
.map(schema -> schema.findField(metadata.getColumnName()))
.orElseThrow(() -> {
log.warn("failed to find column name %s in schema of table %s", metadata.getColumnName(), icebergTable.name());
return new PrestoException(ICEBERG_INVALID_METADATA, format("failed to find column name %s in schema of table %s", metadata.getColumnName(), icebergTable.name()));
Expand Down Expand Up @@ -510,7 +561,7 @@ private Map<Integer, ColumnStatistics.Builder> loadStatisticsFile(StatisticsFile
if (value == null) {
value = ColumnStatistics.builder();
}
statReader.read(metadata, blob, value);
statReader.read(metadata, blob, value, icebergTable, typeManager);
return value;
});
});
Expand All @@ -534,6 +585,10 @@ public static List<ColumnStatisticMetadata> getSupportedColumnStatistics(String
supportedStatistics.add(NUMBER_OF_DISTINCT_VALUES.getColumnStatisticMetadataWithCustomFunction(columnName, "sketch_theta"));
}

if (isKllHistogramSupportedType(type)) {
supportedStatistics.add(HISTOGRAM.getColumnStatisticMetadataWithCustomFunction(columnName, "sketch_kll"));
}

if (!(type instanceof FixedWidthType)) {
supportedStatistics.add(TOTAL_SIZE_IN_BYTES.getColumnStatisticMetadata(columnName));
}
Expand Down
Loading

0 comments on commit 74a475c

Please sign in to comment.