Skip to content

Commit

Permalink
[Iceberg] Add native NDV read and write support
Browse files Browse the repository at this point in the history
This PR introduces the changes required to read+write the
distinct value count statistics as described by Iceberg's Puffin
file specification[[1]].

The change can be broken down into three main parts.

    - Updates to the SPI to allow connectors to define the
    function used to calculate a specific statistic.
    - The addition of 3 new functions: sketch_theta,
    sketch_theta_estimate, and sketch_theta_summary.
    - Plumbing and implementation in the Iceberg connector to
    support reading and writing of the NDVs

[1]: https://iceberg.apache.org/puffin-spec/
  • Loading branch information
ZacBlanco authored and tdcmeehan committed Nov 15, 2023
1 parent fb85091 commit 2229fc4
Show file tree
Hide file tree
Showing 37 changed files with 1,302 additions and 140 deletions.
12 changes: 12 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2197,6 +2197,18 @@
<artifactId>opentelemetry-semconv</artifactId>
<version>1.19.0-alpha</version>
</dependency>

<dependency>
<groupId>org.apache.datasketches</groupId>
<artifactId>datasketches-memory</artifactId>
<version>2.2.0</version>
</dependency>

<dependency>
<groupId>org.apache.datasketches</groupId>
<artifactId>datasketches-java</artifactId>
<version>4.2.0</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
1 change: 1 addition & 0 deletions presto-docs/src/main/sphinx/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ Functions and Operators
functions/teradata
functions/internationalization
functions/setdigest
functions/sketch
32 changes: 32 additions & 0 deletions presto-docs/src/main/sphinx/functions/sketch.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
===========================
Sketch Functions
===========================

Sketches are data structures that can approximately answer particular questions
about a dataset when full accuracy is not required. The benefit of approximate
answers is that they are often faster and more efficient to compute than
functions which result in full accuracy.

Presto provides support for computing some sketches available in the `Apache
DataSketches`_ library.

.. function:: sketch_theta(data) -> varbinary

Computes a `theta sketch`_ from an input dataset. The output from
this function can be used as an input to any of the other ``sketch_theta_*``
family of functions.

.. function:: sketch_theta_estimate(sketch) -> double

Returns the estimate of distinct values from the input sketch.

.. function:: sketch_theta_summary(sketch) -> row(estimate double, theta double, upper_bound_std double, lower_bound_std double, retained_entries int)

Returns a summary of the input sketch which includes the distinct values
estimate alongside other useful information such as the sketch theta
parameter, current error bounds corresponding to 1 standard deviation, and
the number of retained entries in the sketch.


.. _Apache DataSketches: https://datasketches.apache.org/
.. _theta sketch: https://datasketches.apache.org/docs/Theta/ThetaSketchFramework.html
Original file line number Diff line number Diff line change
Expand Up @@ -3201,7 +3201,7 @@ private List<ColumnStatisticMetadata> getColumnStatisticMetadataForTemporaryTabl
private List<ColumnStatisticMetadata> getColumnStatisticMetadata(String columnName, Set<ColumnStatisticType> statisticTypes)
{
return statisticTypes.stream()
.map(type -> new ColumnStatisticMetadata(columnName, type))
.map(type -> type.getColumnStatisticMetadata(columnName))
.collect(toImmutableList());
}

Expand Down
9 changes: 9 additions & 0 deletions presto-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,15 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.datasketches</groupId>
<artifactId>datasketches-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.datasketches</groupId>
<artifactId>datasketches-memory</artifactId>
</dependency>

<!-- for testing -->
<dependency>
<groupId>com.facebook.presto</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.HivePrivilegeInfo;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.hive.metastore.PartitionStatistics;
import com.facebook.presto.hive.metastore.PrestoTableType;
import com.facebook.presto.hive.metastore.PrincipalPrivileges;
import com.facebook.presto.hive.metastore.StorageFormat;
Expand Down Expand Up @@ -305,7 +306,11 @@ public void commit(@Nullable TableMetadata base, TableMetadata metadata)
metastore.createTable(metastoreContext, table, privileges);
}
else {
PartitionStatistics tableStats = metastore.getTableStatistics(metastoreContext, database, tableName);
metastore.replaceTable(metastoreContext, database, tableName, table, privileges);

// attempt to put back previous table statistics
metastore.updateTableStatistics(metastoreContext, database, tableName, oldStats -> tableStats);
}
}
finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.HiveWrittenPartitions;
import com.facebook.presto.hive.NodeVersion;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
Expand All @@ -37,11 +38,15 @@
import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.connector.ConnectorOutputMetadata;
import com.facebook.presto.spi.statistics.ColumnStatisticMetadata;
import com.facebook.presto.spi.statistics.ComputedStatistics;
import com.facebook.presto.spi.statistics.TableStatisticType;
import com.facebook.presto.spi.statistics.TableStatistics;
import com.facebook.presto.spi.statistics.TableStatisticsMetadata;
import com.google.common.base.VerifyException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.slice.Slice;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.BaseTable;
Expand All @@ -65,6 +70,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -89,11 +95,13 @@
import static com.facebook.presto.iceberg.PartitionFields.getPartitionColumnName;
import static com.facebook.presto.iceberg.PartitionFields.getTransformTerm;
import static com.facebook.presto.iceberg.PartitionFields.toPartitionFields;
import static com.facebook.presto.iceberg.TableStatisticsMaker.getSupportedColumnStatistics;
import static com.facebook.presto.iceberg.TableType.DATA;
import static com.facebook.presto.iceberg.TypeConverter.toIcebergType;
import static com.facebook.presto.iceberg.TypeConverter.toPrestoType;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.statistics.TableStatisticType.ROW_COUNT;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
Expand All @@ -110,13 +118,15 @@ public abstract class IcebergAbstractMetadata

protected final TypeManager typeManager;
protected final JsonCodec<CommitTaskData> commitTaskCodec;
protected final NodeVersion nodeVersion;

protected Transaction transaction;

public IcebergAbstractMetadata(TypeManager typeManager, JsonCodec<CommitTaskData> commitTaskCodec)
public IcebergAbstractMetadata(TypeManager typeManager, JsonCodec<CommitTaskData> commitTaskCodec, NodeVersion nodeVersion)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
}

protected abstract Table getIcebergTable(ConnectorSession session, SchemaTableName schemaTableName);
Expand Down Expand Up @@ -342,6 +352,38 @@ protected static Schema toIcebergSchema(List<ColumnMetadata> columns)
return new Schema(icebergSchema.asStructType().fields());
}

@Override
public ConnectorTableHandle getTableHandleForStatisticsCollection(ConnectorSession session, SchemaTableName tableName, Map<String, Object> analyzeProperties)
{
return getTableHandle(session, tableName);
}

@Override
public TableStatisticsMetadata getStatisticsCollectionMetadata(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
Set<ColumnStatisticMetadata> columnStatistics = tableMetadata.getColumns().stream()
.filter(column -> !column.isHidden())
.flatMap(meta -> getSupportedColumnStatistics(meta.getName(), meta.getType()).stream())
.collect(toImmutableSet());

Set<TableStatisticType> tableStatistics = ImmutableSet.of(ROW_COUNT);
return new TableStatisticsMetadata(columnStatistics, tableStatistics, Collections.emptyList());
}

@Override
public ConnectorTableHandle beginStatisticsCollection(ConnectorSession session, ConnectorTableHandle tableHandle)
{
return tableHandle;
}

@Override
public void finishStatisticsCollection(ConnectorSession session, ConnectorTableHandle tableHandle, Collection<ComputedStatistics> computedStatistics)
{
IcebergTableHandle icebergTableHandle = (IcebergTableHandle) tableHandle;
Table icebergTable = getIcebergTable(session, icebergTableHandle.getSchemaTableName());
TableStatisticsMaker.writeTableStatistics(nodeVersion, typeManager, icebergTableHandle, icebergTable, session, computedStatistics);
}

public void rollback()
{
// TODO: cleanup open transaction
Expand Down Expand Up @@ -430,7 +472,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
return TableStatisticsMaker.getTableStatistics(typeManager, constraint, handle, icebergTable, columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(Collectors.toList()));
return TableStatisticsMaker.getTableStatistics(session, typeManager, constraint, handle, icebergTable, columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(Collectors.toList()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class IcebergConfig
private double minimumAssignedSplitWeight = 0.05;
private boolean parquetDereferencePushdownEnabled = true;
private boolean mergeOnReadModeEnabled;
private double statisticSnapshotRecordDifferenceWeight;

private HiveStatisticsMergeStrategy hiveStatisticsMergeStrategy = HiveStatisticsMergeStrategy.NONE;

Expand Down Expand Up @@ -196,4 +197,19 @@ public HiveStatisticsMergeStrategy getHiveStatisticsMergeStrategy()
{
return hiveStatisticsMergeStrategy;
}

@Config("iceberg.statistic-snapshot-record-difference-weight")
@ConfigDescription("the amount that the difference in total record count matters when " +
"calculating the closest snapshot when picking statistics. A value of 1 means a single " +
"record is equivalent to 1 millisecond of time difference.")
public IcebergConfig setStatisticSnapshotRecordDifferenceWeight(double weight)
{
this.statisticSnapshotRecordDifferenceWeight = weight;
return this;
}

public double getStatisticSnapshotRecordDifferenceWeight()
{
return statisticSnapshotRecordDifferenceWeight;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.facebook.presto.hive.HiveColumnConverterProvider;
import com.facebook.presto.hive.HiveColumnHandle;
import com.facebook.presto.hive.HiveTypeTranslator;
import com.facebook.presto.hive.NodeVersion;
import com.facebook.presto.hive.TableAlreadyExistsException;
import com.facebook.presto.hive.ViewAlreadyExistsException;
import com.facebook.presto.hive.metastore.Column;
Expand Down Expand Up @@ -135,7 +136,6 @@ public class IcebergHiveMetadata
private static final Logger log = Logger.get(IcebergAbstractMetadata.class);
private final ExtendedHiveMetastore metastore;
private final HdfsEnvironment hdfsEnvironment;
private final String prestoVersion;
private final DateTimeZone timeZone = DateTimeZone.forTimeZone(TimeZone.getTimeZone(ZoneId.of(TimeZone.getDefault().getID())));

private final FilterStatsCalculatorService filterStatsCalculatorService;
Expand All @@ -146,14 +146,13 @@ public IcebergHiveMetadata(
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager,
JsonCodec<CommitTaskData> commitTaskCodec,
String prestoVersion,
NodeVersion nodeVersion,
FilterStatsCalculatorService filterStatsCalculatorService,
RowExpressionService rowExpressionService)
{
super(typeManager, commitTaskCodec);
super(typeManager, commitTaskCodec, nodeVersion);
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.prestoVersion = requireNonNull(prestoVersion, "prestoVersion is null");
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null");
this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null");
}
Expand Down Expand Up @@ -364,7 +363,7 @@ public void createView(ConnectorSession session, ConnectorTableMetadata viewMeta
Table table = createTableObjectForViewCreation(
session,
viewMetadata,
createIcebergViewProperties(session, prestoVersion),
createIcebergViewProperties(session, nodeVersion.toString()),
new HiveTypeTranslator(),
metastoreContext,
encodeViewData(viewData));
Expand Down Expand Up @@ -457,7 +456,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
org.apache.iceberg.Table icebergTable = getHiveIcebergTable(metastore, hdfsEnvironment, session, handle.getSchemaTableName());
TableStatistics icebergStatistics = TableStatisticsMaker.getTableStatistics(typeManager, constraint, handle, icebergTable, columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(Collectors.toList()));
TableStatistics icebergStatistics = TableStatisticsMaker.getTableStatistics(session, typeManager, constraint, handle, icebergTable, columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(Collectors.toList()));
HiveStatisticsMergeStrategy mergeStrategy = getHiveStatisticsMergeStrategy(session);
return tableLayoutHandle.map(IcebergTableLayoutHandle.class::cast).map(layoutHandle -> {
TupleDomain<VariableReferenceExpression> predicate = layoutHandle.getTupleDomain().transform(icebergLayout -> {
Expand Down Expand Up @@ -513,7 +512,7 @@ public TableStatisticsMetadata getStatisticsCollectionMetadata(ConnectorSession
.filter(column -> !column.isHidden())
.flatMap(meta -> metastore.getSupportedColumnStatistics(getMetastoreContext(session), meta.getType())
.stream()
.map(statType -> new ColumnStatisticMetadata(meta.getName(), statType)))
.map(statType -> statType.getColumnStatisticMetadata(meta.getName())))
.collect(toImmutableSet());

Set<TableStatisticType> tableStatistics = ImmutableSet.of(ROW_COUNT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class IcebergHiveMetadataFactory
final HdfsEnvironment hdfsEnvironment;
final TypeManager typeManager;
final JsonCodec<CommitTaskData> commitTaskCodec;
final String prestoVersion;
final NodeVersion nodeVersion;
final FilterStatsCalculatorService filterStatsCalculatorService;
final RowExpressionService rowExpressionService;

Expand All @@ -52,15 +52,14 @@ public IcebergHiveMetadataFactory(
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
requireNonNull(nodeVersion, "nodeVersion is null");
this.prestoVersion = nodeVersion.toString();
this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null");
this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null");
requireNonNull(config, "config is null");
}

public ConnectorMetadata create()
{
return new IcebergHiveMetadata(metastore, hdfsEnvironment, typeManager, commitTaskCodec, prestoVersion, filterStatsCalculatorService, rowExpressionService);
return new IcebergHiveMetadata(metastore, hdfsEnvironment, typeManager, commitTaskCodec, nodeVersion, filterStatsCalculatorService, rowExpressionService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.NodeVersion;
import com.facebook.presto.hive.TableAlreadyExistsException;
import com.facebook.presto.iceberg.util.IcebergPrestoModelConverters;
import com.facebook.presto.spi.ColumnMetadata;
Expand Down Expand Up @@ -74,9 +75,10 @@ public IcebergNativeMetadata(
IcebergResourceFactory resourceFactory,
TypeManager typeManager,
JsonCodec<CommitTaskData> commitTaskCodec,
CatalogType catalogType)
CatalogType catalogType,
NodeVersion nodeVersion)
{
super(typeManager, commitTaskCodec);
super(typeManager, commitTaskCodec, nodeVersion);
this.resourceFactory = requireNonNull(resourceFactory, "resourceFactory is null");
this.catalogType = requireNonNull(catalogType, "catalogType is null");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.NodeVersion;
import com.facebook.presto.spi.connector.ConnectorMetadata;

import javax.inject.Inject;
Expand All @@ -28,23 +29,26 @@ public class IcebergNativeMetadataFactory
final JsonCodec<CommitTaskData> commitTaskCodec;
final IcebergResourceFactory resourceFactory;
final CatalogType catalogType;
final NodeVersion nodeVersion;

@Inject
public IcebergNativeMetadataFactory(
IcebergConfig config,
IcebergResourceFactory resourceFactory,
TypeManager typeManager,
JsonCodec<CommitTaskData> commitTaskCodec)
JsonCodec<CommitTaskData> commitTaskCodec,
NodeVersion nodeVersion)
{
this.resourceFactory = requireNonNull(resourceFactory, "resourceFactory is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
requireNonNull(config, "config is null");
this.catalogType = config.getCatalogType();
}

public ConnectorMetadata create()
{
return new IcebergNativeMetadata(resourceFactory, typeManager, commitTaskCodec, catalogType);
return new IcebergNativeMetadata(resourceFactory, typeManager, commitTaskCodec, catalogType, nodeVersion);
}
}
Loading

0 comments on commit 2229fc4

Please sign in to comment.