Skip to content

Commit

Permalink
[Iceberg] read and combine statistics from the HMS
Browse files Browse the repository at this point in the history
This PR introduces a few things

- Read NDVs if present from Iceberg tables. Previously, the NDVs were
not read, even if they existed.
- Read statistics from Hive, if configured and they exist. Merge them
into Iceberg's existing table statistics if any are missing using a
configurable merging strategy. See HiveStatisticsMergeStrategy.java
  • Loading branch information
ZacBlanco authored and tdcmeehan committed Oct 13, 2023
1 parent 45bcbcf commit ca11fd6
Show file tree
Hide file tree
Showing 13 changed files with 848 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import static com.facebook.presto.spi.statistics.TableStatisticType.ROW_COUNT;
import static com.google.common.base.Verify.verify;

public class HiveStatisticsUtil
public final class HiveStatisticsUtil
{
private HiveStatisticsUtil()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
return TableStatisticsMaker.getTableStatistics(typeManager, constraint, handle, icebergTable);
return TableStatisticsMaker.getTableStatistics(typeManager, constraint, handle, icebergTable, columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(Collectors.toList()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.airlift.configuration.Config;
import com.facebook.airlift.configuration.ConfigDescription;
import com.facebook.presto.hive.HiveCompressionCodec;
import com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import org.apache.iceberg.FileFormat;
Expand Down Expand Up @@ -44,6 +45,8 @@ public class IcebergConfig
private boolean parquetDereferencePushdownEnabled = true;
private boolean mergeOnReadModeEnabled;

private HiveStatisticsMergeStrategy hiveStatisticsMergeStrategy = HiveStatisticsMergeStrategy.NONE;

@NotNull
public FileFormat getFileFormat()
{
Expand Down Expand Up @@ -180,4 +183,17 @@ public boolean isMergeOnReadModeEnabled()
{
return mergeOnReadModeEnabled;
}

@Config("iceberg.hive-statistics-merge-strategy")
@ConfigDescription("determines how to merge statistics that are stored in the Hive Metastore")
public IcebergConfig setHiveStatisticsMergeStrategy(HiveStatisticsMergeStrategy mergeStrategy)
{
this.hiveStatisticsMergeStrategy = mergeStrategy;
return this;
}

public HiveStatisticsMergeStrategy getHiveStatisticsMergeStrategy()
{
return hiveStatisticsMergeStrategy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.iceberg;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.HdfsContext;
Expand All @@ -30,22 +31,33 @@
import com.facebook.presto.hive.metastore.PartitionStatistics;
import com.facebook.presto.hive.metastore.PrincipalPrivileges;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorNewTableLayout;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.ConnectorViewDefinition;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaNotFoundException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.ViewNotFoundException;
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.RowExpressionService;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.spi.statistics.ColumnStatisticMetadata;
import com.facebook.presto.spi.statistics.ColumnStatistics;
import com.facebook.presto.spi.statistics.ComputedStatistics;
import com.facebook.presto.spi.statistics.Estimate;
import com.facebook.presto.spi.statistics.TableStatisticType;
import com.facebook.presto.spi.statistics.TableStatistics;
import com.facebook.presto.spi.statistics.TableStatisticsMetadata;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -69,6 +81,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.TimeZone;
import java.util.stream.Collectors;

import static com.facebook.presto.hive.HiveStatisticsUtil.createPartitionStatistics;
import static com.facebook.presto.hive.HiveStatisticsUtil.updatePartitionStatistics;
Expand All @@ -85,6 +98,7 @@
import static com.facebook.presto.hive.metastore.MetastoreUtil.verifyAndPopulateViews;
import static com.facebook.presto.hive.metastore.Statistics.createComputedStatisticsToPartitionMap;
import static com.facebook.presto.iceberg.IcebergSchemaProperties.getSchemaLocation;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getHiveStatisticsMergeStrategy;
import static com.facebook.presto.iceberg.IcebergTableProperties.getFileFormat;
import static com.facebook.presto.iceberg.IcebergTableProperties.getFormatVersion;
import static com.facebook.presto.iceberg.IcebergTableProperties.getPartitioning;
Expand All @@ -95,6 +109,7 @@
import static com.facebook.presto.iceberg.IcebergUtil.getTableComment;
import static com.facebook.presto.iceberg.IcebergUtil.isIcebergTable;
import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields;
import static com.facebook.presto.iceberg.util.StatisticsUtil.mergeHiveStatistics;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.StandardErrorCode.SCHEMA_NOT_EMPTY;
Expand All @@ -120,17 +135,24 @@ public class IcebergHiveMetadata
private final String prestoVersion;
private final DateTimeZone timeZone = DateTimeZone.forTimeZone(TimeZone.getTimeZone(ZoneId.of(TimeZone.getDefault().getID())));

private final FilterStatsCalculatorService filterStatsCalculatorService;
private final RowExpressionService rowExpressionService;

public IcebergHiveMetadata(
ExtendedHiveMetastore metastore,
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager,
JsonCodec<CommitTaskData> commitTaskCodec,
String prestoVersion)
String prestoVersion,
FilterStatsCalculatorService filterStatsCalculatorService,
RowExpressionService rowExpressionService)
{
super(typeManager, commitTaskCodec);
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.prestoVersion = requireNonNull(prestoVersion, "prestoVersion is null");
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null");
this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null");
}

@Override
Expand Down Expand Up @@ -424,6 +446,55 @@ private List<String> listSchemas(ConnectorSession session, String schemaNameOrNu
}
return ImmutableList.of(schemaNameOrNull);
}

public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTableHandle tableHandle, Optional<ConnectorTableLayoutHandle> tableLayoutHandle, List<ColumnHandle> columnHandles, Constraint<ColumnHandle> constraint)
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
org.apache.iceberg.Table icebergTable = getHiveIcebergTable(metastore, hdfsEnvironment, session, handle.getSchemaTableName());
TableStatistics icebergStatistics = TableStatisticsMaker.getTableStatistics(typeManager, constraint, handle, icebergTable, columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(Collectors.toList()));
HiveStatisticsMergeStrategy mergeStrategy = getHiveStatisticsMergeStrategy(session);
return tableLayoutHandle.map(IcebergTableLayoutHandle.class::cast).map(layoutHandle -> {
TupleDomain<VariableReferenceExpression> predicate = layoutHandle.getTupleDomain().transform(icebergLayout -> {
IcebergColumnHandle columnHandle = (IcebergColumnHandle) icebergLayout;
return new VariableReferenceExpression(Optional.empty(), columnHandle.getName(), columnHandle.getType());
});
RowExpression translatedPredicate = rowExpressionService.getDomainTranslator().toPredicate(predicate);
PartitionStatistics hiveStatistics = metastore.getTableStatistics(getMetastoreContext(session), handle.getSchemaName(), handle.getTableName());
TableStatistics mergedStatistics = mergeHiveStatistics(icebergStatistics, hiveStatistics, mergeStrategy, icebergTable.spec());
TableStatistics.Builder filteredStatsBuilder = TableStatistics.builder()
.setRowCount(mergedStatistics.getRowCount());
double totalSize = 0;
for (ColumnHandle colHandle : columnHandles) {
IcebergColumnHandle icebergHandle = (IcebergColumnHandle) colHandle;
if (mergedStatistics.getColumnStatistics().containsKey(icebergHandle)) {
ColumnStatistics stats = mergedStatistics.getColumnStatistics().get(icebergHandle);
filteredStatsBuilder.setColumnStatistics(icebergHandle, stats);
if (!stats.getDataSize().isUnknown()) {
totalSize += stats.getDataSize().getValue();
}
}
}
filteredStatsBuilder.setTotalSize(Estimate.of(totalSize));
return filterStatsCalculatorService.filterStats(
filteredStatsBuilder.build(),
translatedPredicate,
session,
columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(toImmutableMap(
col -> col,
IcebergColumnHandle::getName)),
columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(toImmutableMap(
IcebergColumnHandle::getName,
IcebergColumnHandle::getType)));
}).orElseGet(() -> {
if (!mergeStrategy.equals(HiveStatisticsMergeStrategy.NONE)) {
PartitionStatistics hiveStats = metastore.getTableStatistics(getMetastoreContext(session), handle.getSchemaName(), handle.getTableName());
return mergeHiveStatistics(icebergStatistics, hiveStats, mergeStrategy, icebergTable.spec());
}
return icebergStatistics;
});
}

@Override
public IcebergTableHandle getTableHandleForStatisticsCollection(ConnectorSession session, SchemaTableName tableName, Map<String, Object> analyzeProperties)
{
return getTableHandle(session, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.facebook.presto.hive.NodeVersion;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
import com.facebook.presto.spi.relation.RowExpressionService;

import javax.inject.Inject;

Expand All @@ -32,6 +34,8 @@ public class IcebergHiveMetadataFactory
final TypeManager typeManager;
final JsonCodec<CommitTaskData> commitTaskCodec;
final String prestoVersion;
final FilterStatsCalculatorService filterStatsCalculatorService;
final RowExpressionService rowExpressionService;

@Inject
public IcebergHiveMetadataFactory(
Expand All @@ -40,19 +44,23 @@ public IcebergHiveMetadataFactory(
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager,
JsonCodec<CommitTaskData> commitTaskCodec,
NodeVersion nodeVersion)
NodeVersion nodeVersion,
FilterStatsCalculatorService filterStatsCalculatorService,
RowExpressionService rowExpressionService)
{
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
requireNonNull(nodeVersion, "nodeVersion is null");
this.prestoVersion = nodeVersion.toString();
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null");
this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null");
requireNonNull(config, "config is null");
}

public ConnectorMetadata create()
{
return new IcebergHiveMetadata(metastore, hdfsEnvironment, typeManager, commitTaskCodec, prestoVersion);
return new IcebergHiveMetadata(metastore, hdfsEnvironment, typeManager, commitTaskCodec, prestoVersion, filterStatsCalculatorService, rowExpressionService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
import com.facebook.presto.hive.OrcFileWriterConfig;
import com.facebook.presto.hive.ParquetFileWriterConfig;
import com.facebook.presto.iceberg.nessie.NessieConfig;
import com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy;
import com.facebook.presto.orc.OrcWriteValidation;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.facebook.presto.spi.session.PropertyMetadata;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import org.apache.parquet.column.ParquetProperties;
Expand Down Expand Up @@ -81,6 +83,7 @@ public final class IcebergSessionProperties
public static final String READ_MASKED_VALUE_ENABLED = "read_null_masked_parquet_encrypted_value_enabled";
public static final String PARQUET_DEREFERENCE_PUSHDOWN_ENABLED = "parquet_dereference_pushdown_enabled";
public static final String MERGE_ON_READ_MODE_ENABLED = "merge_on_read_enabled";
public static final String HIVE_METASTORE_STATISTICS_MERGE_STRATEGY = "hive_statistics_merge_strategy";
private final List<PropertyMetadata<?>> sessionProperties;

@Inject
Expand Down Expand Up @@ -290,7 +293,17 @@ public IcebergSessionProperties(
MERGE_ON_READ_MODE_ENABLED,
"Reads enabled for merge-on-read Iceberg tables",
icebergConfig.isMergeOnReadModeEnabled(),
false));
false),
new PropertyMetadata<>(
HIVE_METASTORE_STATISTICS_MERGE_STRATEGY,
"choose how to include statistics from the Hive Metastore when calculating table stats. Valid values are: "
+ Joiner.on(", ").join(HiveStatisticsMergeStrategy.values()),
VARCHAR,
HiveStatisticsMergeStrategy.class,
icebergConfig.getHiveStatisticsMergeStrategy(),
false,
val -> HiveStatisticsMergeStrategy.valueOf((String) val),
HiveStatisticsMergeStrategy::name));
}

public List<PropertyMetadata<?>> getSessionProperties()
Expand Down Expand Up @@ -472,4 +485,9 @@ public static boolean isMergeOnReadModeEnabled(ConnectorSession session)
{
return session.getProperty(MERGE_ON_READ_MODE_ENABLED, Boolean.class);
}

public static HiveStatisticsMergeStrategy getHiveStatisticsMergeStrategy(ConnectorSession session)
{
return session.getProperty(HIVE_METASTORE_STATISTICS_MERGE_STRATEGY, HiveStatisticsMergeStrategy.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager;
import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeNodePartitioningProvider;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
import com.facebook.presto.spi.procedure.Procedure;
import com.facebook.presto.spi.relation.RowExpressionService;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -89,6 +90,7 @@ public static Connector createConnector(String catalogName, Map<String, String>
binder.bind(PageIndexerFactory.class).toInstance(context.getPageIndexerFactory());
binder.bind(StandardFunctionResolution.class).toInstance(context.getStandardFunctionResolution());
binder.bind(RowExpressionService.class).toInstance(context.getRowExpressionService());
binder.bind(FilterStatsCalculatorService.class).toInstance(context.getFilterStatsCalculatorService());
});

Injector injector = app
Expand Down
Loading

0 comments on commit ca11fd6

Please sign in to comment.