diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveStatisticsUtil.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveStatisticsUtil.java index f4cd3b5b6ba6e..f05c524f0012d 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveStatisticsUtil.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveStatisticsUtil.java @@ -34,7 +34,7 @@ import static com.facebook.presto.spi.statistics.TableStatisticType.ROW_COUNT; import static com.google.common.base.Verify.verify; -public class HiveStatisticsUtil +public final class HiveStatisticsUtil { private HiveStatisticsUtil() { diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java index 212b9911d7181..f68dddf4f817e 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java @@ -399,7 +399,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab { IcebergTableHandle handle = (IcebergTableHandle) tableHandle; Table icebergTable = getIcebergTable(session, handle.getSchemaTableName()); - return TableStatisticsMaker.getTableStatistics(typeManager, constraint, handle, icebergTable); + return TableStatisticsMaker.getTableStatistics(typeManager, constraint, handle, icebergTable, columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(Collectors.toList())); } @Override diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java index fd2ff08f65b5c..293da507a3aab 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java @@ -16,6 +16,7 @@ import com.facebook.airlift.configuration.Config; import com.facebook.airlift.configuration.ConfigDescription; import com.facebook.presto.hive.HiveCompressionCodec; +import com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import org.apache.iceberg.FileFormat; @@ -44,6 +45,8 @@ public class IcebergConfig private boolean parquetDereferencePushdownEnabled = true; private boolean mergeOnReadModeEnabled; + private HiveStatisticsMergeStrategy hiveStatisticsMergeStrategy = HiveStatisticsMergeStrategy.NONE; + @NotNull public FileFormat getFileFormat() { @@ -180,4 +183,17 @@ public boolean isMergeOnReadModeEnabled() { return mergeOnReadModeEnabled; } + + @Config("iceberg.hive-statistics-merge-strategy") + @ConfigDescription("determines how to merge statistics that are stored in the Hive Metastore") + public IcebergConfig setHiveStatisticsMergeStrategy(HiveStatisticsMergeStrategy mergeStrategy) + { + this.hiveStatisticsMergeStrategy = mergeStrategy; + return this; + } + + public HiveStatisticsMergeStrategy getHiveStatisticsMergeStrategy() + { + return hiveStatisticsMergeStrategy; + } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java index af07f3f489b6e..a6051c445cb29 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java @@ -14,6 +14,7 @@ package com.facebook.presto.iceberg; import com.facebook.airlift.json.JsonCodec; +import com.facebook.presto.common.predicate.TupleDomain; import com.facebook.presto.common.type.Type; import com.facebook.presto.common.type.TypeManager; import com.facebook.presto.hive.HdfsContext; @@ -30,22 +31,33 @@ import com.facebook.presto.hive.metastore.PartitionStatistics; import com.facebook.presto.hive.metastore.PrincipalPrivileges; import com.facebook.presto.hive.metastore.Table; +import com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy; +import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ColumnMetadata; import com.facebook.presto.spi.ConnectorNewTableLayout; import com.facebook.presto.spi.ConnectorOutputTableHandle; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; import com.facebook.presto.spi.ConnectorTableMetadata; import com.facebook.presto.spi.ConnectorViewDefinition; +import com.facebook.presto.spi.Constraint; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.SchemaNotFoundException; import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.SchemaTablePrefix; import com.facebook.presto.spi.TableNotFoundException; import com.facebook.presto.spi.ViewNotFoundException; +import com.facebook.presto.spi.plan.FilterStatsCalculatorService; +import com.facebook.presto.spi.relation.RowExpression; +import com.facebook.presto.spi.relation.RowExpressionService; +import com.facebook.presto.spi.relation.VariableReferenceExpression; import com.facebook.presto.spi.statistics.ColumnStatisticMetadata; +import com.facebook.presto.spi.statistics.ColumnStatistics; import com.facebook.presto.spi.statistics.ComputedStatistics; +import com.facebook.presto.spi.statistics.Estimate; import com.facebook.presto.spi.statistics.TableStatisticType; +import com.facebook.presto.spi.statistics.TableStatistics; import com.facebook.presto.spi.statistics.TableStatisticsMetadata; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -69,6 +81,7 @@ import java.util.Optional; import java.util.Set; import java.util.TimeZone; +import java.util.stream.Collectors; import static com.facebook.presto.hive.HiveStatisticsUtil.createPartitionStatistics; import static com.facebook.presto.hive.HiveStatisticsUtil.updatePartitionStatistics; @@ -85,6 +98,7 @@ import static com.facebook.presto.hive.metastore.MetastoreUtil.verifyAndPopulateViews; import static com.facebook.presto.hive.metastore.Statistics.createComputedStatisticsToPartitionMap; import static com.facebook.presto.iceberg.IcebergSchemaProperties.getSchemaLocation; +import static com.facebook.presto.iceberg.IcebergSessionProperties.getHiveStatisticsMergeStrategy; import static com.facebook.presto.iceberg.IcebergTableProperties.getFileFormat; import static com.facebook.presto.iceberg.IcebergTableProperties.getFormatVersion; import static com.facebook.presto.iceberg.IcebergTableProperties.getPartitioning; @@ -95,6 +109,7 @@ import static com.facebook.presto.iceberg.IcebergUtil.getTableComment; import static com.facebook.presto.iceberg.IcebergUtil.isIcebergTable; import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields; +import static com.facebook.presto.iceberg.util.StatisticsUtil.mergeHiveStatistics; import static com.facebook.presto.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.spi.StandardErrorCode.SCHEMA_NOT_EMPTY; @@ -120,17 +135,24 @@ public class IcebergHiveMetadata private final String prestoVersion; private final DateTimeZone timeZone = DateTimeZone.forTimeZone(TimeZone.getTimeZone(ZoneId.of(TimeZone.getDefault().getID()))); + private final FilterStatsCalculatorService filterStatsCalculatorService; + private final RowExpressionService rowExpressionService; + public IcebergHiveMetadata( ExtendedHiveMetastore metastore, HdfsEnvironment hdfsEnvironment, TypeManager typeManager, JsonCodec commitTaskCodec, - String prestoVersion) + String prestoVersion, + FilterStatsCalculatorService filterStatsCalculatorService, + RowExpressionService rowExpressionService) { super(typeManager, commitTaskCodec); this.metastore = requireNonNull(metastore, "metastore is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.prestoVersion = requireNonNull(prestoVersion, "prestoVersion is null"); + this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null"); + this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null"); } @Override @@ -424,6 +446,55 @@ private List listSchemas(ConnectorSession session, String schemaNameOrNu } return ImmutableList.of(schemaNameOrNull); } + + public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTableHandle tableHandle, Optional tableLayoutHandle, List columnHandles, Constraint constraint) + { + IcebergTableHandle handle = (IcebergTableHandle) tableHandle; + org.apache.iceberg.Table icebergTable = getHiveIcebergTable(metastore, hdfsEnvironment, session, handle.getSchemaTableName()); + TableStatistics icebergStatistics = TableStatisticsMaker.getTableStatistics(typeManager, constraint, handle, icebergTable, columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(Collectors.toList())); + HiveStatisticsMergeStrategy mergeStrategy = getHiveStatisticsMergeStrategy(session); + return tableLayoutHandle.map(IcebergTableLayoutHandle.class::cast).map(layoutHandle -> { + TupleDomain predicate = layoutHandle.getTupleDomain().transform(icebergLayout -> { + IcebergColumnHandle columnHandle = (IcebergColumnHandle) icebergLayout; + return new VariableReferenceExpression(Optional.empty(), columnHandle.getName(), columnHandle.getType()); + }); + RowExpression translatedPredicate = rowExpressionService.getDomainTranslator().toPredicate(predicate); + PartitionStatistics hiveStatistics = metastore.getTableStatistics(getMetastoreContext(session), handle.getSchemaName(), handle.getTableName()); + TableStatistics mergedStatistics = mergeHiveStatistics(icebergStatistics, hiveStatistics, mergeStrategy, icebergTable.spec()); + TableStatistics.Builder filteredStatsBuilder = TableStatistics.builder() + .setRowCount(mergedStatistics.getRowCount()); + double totalSize = 0; + for (ColumnHandle colHandle : columnHandles) { + IcebergColumnHandle icebergHandle = (IcebergColumnHandle) colHandle; + if (mergedStatistics.getColumnStatistics().containsKey(icebergHandle)) { + ColumnStatistics stats = mergedStatistics.getColumnStatistics().get(icebergHandle); + filteredStatsBuilder.setColumnStatistics(icebergHandle, stats); + if (!stats.getDataSize().isUnknown()) { + totalSize += stats.getDataSize().getValue(); + } + } + } + filteredStatsBuilder.setTotalSize(Estimate.of(totalSize)); + return filterStatsCalculatorService.filterStats( + filteredStatsBuilder.build(), + translatedPredicate, + session, + columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(toImmutableMap( + col -> col, + IcebergColumnHandle::getName)), + columnHandles.stream().map(IcebergColumnHandle.class::cast).collect(toImmutableMap( + IcebergColumnHandle::getName, + IcebergColumnHandle::getType))); + }).orElseGet(() -> { + if (!mergeStrategy.equals(HiveStatisticsMergeStrategy.NONE)) { + PartitionStatistics hiveStats = metastore.getTableStatistics(getMetastoreContext(session), handle.getSchemaName(), handle.getTableName()); + return mergeHiveStatistics(icebergStatistics, hiveStats, mergeStrategy, icebergTable.spec()); + } + return icebergStatistics; + }); + } + + @Override public IcebergTableHandle getTableHandleForStatisticsCollection(ConnectorSession session, SchemaTableName tableName, Map analyzeProperties) { return getTableHandle(session, tableName); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java index b8b2518441a11..51a0c553a0c07 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java @@ -19,6 +19,8 @@ import com.facebook.presto.hive.NodeVersion; import com.facebook.presto.hive.metastore.ExtendedHiveMetastore; import com.facebook.presto.spi.connector.ConnectorMetadata; +import com.facebook.presto.spi.plan.FilterStatsCalculatorService; +import com.facebook.presto.spi.relation.RowExpressionService; import javax.inject.Inject; @@ -32,6 +34,8 @@ public class IcebergHiveMetadataFactory final TypeManager typeManager; final JsonCodec commitTaskCodec; final String prestoVersion; + final FilterStatsCalculatorService filterStatsCalculatorService; + final RowExpressionService rowExpressionService; @Inject public IcebergHiveMetadataFactory( @@ -40,7 +44,9 @@ public IcebergHiveMetadataFactory( HdfsEnvironment hdfsEnvironment, TypeManager typeManager, JsonCodec commitTaskCodec, - NodeVersion nodeVersion) + NodeVersion nodeVersion, + FilterStatsCalculatorService filterStatsCalculatorService, + RowExpressionService rowExpressionService) { this.metastore = requireNonNull(metastore, "metastore is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); @@ -48,11 +54,13 @@ public IcebergHiveMetadataFactory( this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); requireNonNull(nodeVersion, "nodeVersion is null"); this.prestoVersion = nodeVersion.toString(); + this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null"); + this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null"); requireNonNull(config, "config is null"); } public ConnectorMetadata create() { - return new IcebergHiveMetadata(metastore, hdfsEnvironment, typeManager, commitTaskCodec, prestoVersion); + return new IcebergHiveMetadata(metastore, hdfsEnvironment, typeManager, commitTaskCodec, prestoVersion, filterStatsCalculatorService, rowExpressionService); } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java index e932c908a9917..f70f24ffdb478 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSessionProperties.java @@ -19,11 +19,13 @@ import com.facebook.presto.hive.OrcFileWriterConfig; import com.facebook.presto.hive.ParquetFileWriterConfig; import com.facebook.presto.iceberg.nessie.NessieConfig; +import com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy; import com.facebook.presto.orc.OrcWriteValidation; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.schedule.NodeSelectionStrategy; import com.facebook.presto.spi.session.PropertyMetadata; +import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import io.airlift.units.DataSize; import org.apache.parquet.column.ParquetProperties; @@ -81,6 +83,7 @@ public final class IcebergSessionProperties public static final String READ_MASKED_VALUE_ENABLED = "read_null_masked_parquet_encrypted_value_enabled"; public static final String PARQUET_DEREFERENCE_PUSHDOWN_ENABLED = "parquet_dereference_pushdown_enabled"; public static final String MERGE_ON_READ_MODE_ENABLED = "merge_on_read_enabled"; + public static final String HIVE_METASTORE_STATISTICS_MERGE_STRATEGY = "hive_statistics_merge_strategy"; private final List> sessionProperties; @Inject @@ -290,7 +293,17 @@ public IcebergSessionProperties( MERGE_ON_READ_MODE_ENABLED, "Reads enabled for merge-on-read Iceberg tables", icebergConfig.isMergeOnReadModeEnabled(), - false)); + false), + new PropertyMetadata<>( + HIVE_METASTORE_STATISTICS_MERGE_STRATEGY, + "choose how to include statistics from the Hive Metastore when calculating table stats. Valid values are: " + + Joiner.on(", ").join(HiveStatisticsMergeStrategy.values()), + VARCHAR, + HiveStatisticsMergeStrategy.class, + icebergConfig.getHiveStatisticsMergeStrategy(), + false, + val -> HiveStatisticsMergeStrategy.valueOf((String) val), + HiveStatisticsMergeStrategy::name)); } public List> getSessionProperties() @@ -472,4 +485,9 @@ public static boolean isMergeOnReadModeEnabled(ConnectorSession session) { return session.getProperty(MERGE_ON_READ_MODE_ENABLED, Boolean.class); } + + public static HiveStatisticsMergeStrategy getHiveStatisticsMergeStrategy(ConnectorSession session) + { + return session.getProperty(HIVE_METASTORE_STATISTICS_MERGE_STRATEGY, HiveStatisticsMergeStrategy.class); + } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java index 9055f91a4b4a4..b40566701c249 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java @@ -42,6 +42,7 @@ import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager; import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeNodePartitioningProvider; import com.facebook.presto.spi.function.StandardFunctionResolution; +import com.facebook.presto.spi.plan.FilterStatsCalculatorService; import com.facebook.presto.spi.procedure.Procedure; import com.facebook.presto.spi.relation.RowExpressionService; import com.google.common.collect.ImmutableSet; @@ -89,6 +90,7 @@ public static Connector createConnector(String catalogName, Map binder.bind(PageIndexerFactory.class).toInstance(context.getPageIndexerFactory()); binder.bind(StandardFunctionResolution.class).toInstance(context.getStandardFunctionResolution()); binder.bind(RowExpressionService.class).toInstance(context.getRowExpressionService()); + binder.bind(FilterStatsCalculatorService.class).toInstance(context.getFilterStatsCalculatorService()); }); Injector injector = app diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TableStatisticsMaker.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TableStatisticsMaker.java index d11b8af24b3f1..bcf75aa10509f 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TableStatisticsMaker.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/TableStatisticsMaker.java @@ -13,19 +13,24 @@ */ package com.facebook.presto.iceberg; +import com.facebook.airlift.log.Logger; import com.facebook.presto.common.predicate.NullableValue; import com.facebook.presto.common.predicate.TupleDomain; import com.facebook.presto.common.type.TypeManager; import com.facebook.presto.spi.Constraint; +import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.statistics.ColumnStatistics; import com.facebook.presto.spi.statistics.DoubleRange; import com.facebook.presto.spi.statistics.Estimate; import com.facebook.presto.spi.statistics.TableStatistics; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import org.apache.iceberg.BlobMetadata; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionField; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; import org.apache.iceberg.io.CloseableIterable; @@ -36,26 +41,38 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression; +import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; import static com.facebook.presto.iceberg.IcebergUtil.getColumns; import static com.facebook.presto.iceberg.IcebergUtil.getIdentityPartitions; import static com.facebook.presto.iceberg.Partition.toMap; import static com.facebook.presto.iceberg.TypeConverter.toPrestoType; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.Iterables.getOnlyElement; +import static com.google.common.collect.Streams.stream; +import static java.lang.Long.parseLong; +import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.function.Function.identity; import static java.util.stream.Collectors.toSet; +import static org.apache.iceberg.puffin.StandardBlobTypes.APACHE_DATASKETCHES_THETA_V1; public class TableStatisticsMaker { + private static final Logger log = Logger.get(TableStatisticsMaker.class); + + private static final String ICEBERG_APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY = "ndv"; private final TypeManager typeManager; private final Table icebergTable; @@ -65,12 +82,12 @@ private TableStatisticsMaker(TypeManager typeManager, Table icebergTable) this.icebergTable = icebergTable; } - public static TableStatistics getTableStatistics(TypeManager typeManager, Constraint constraint, IcebergTableHandle tableHandle, Table icebergTable) + public static TableStatistics getTableStatistics(TypeManager typeManager, Constraint constraint, IcebergTableHandle tableHandle, Table icebergTable, List columns) { - return new TableStatisticsMaker(typeManager, icebergTable).makeTableStatistics(tableHandle, constraint); + return new TableStatisticsMaker(typeManager, icebergTable).makeTableStatistics(tableHandle, constraint, columns); } - private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Constraint constraint) + private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Constraint constraint, List selectedColumns) { if (!tableHandle.getSnapshotId().isPresent() || constraint.getSummary().isNone()) { return TableStatistics.builder() @@ -123,6 +140,7 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Cons TableScan tableScan = icebergTable.newScan() .filter(toIcebergExpression(intersection)) + .select(selectedColumns.stream().map(IcebergColumnHandle::getName).collect(Collectors.toList())) .useSnapshot(tableHandle.getSnapshotId().get()) .includeColumnStats(); @@ -172,11 +190,46 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Cons .build(); } + // get NDVs from statistics file(s) + ImmutableMap.Builder ndvByColumnId = ImmutableMap.builder(); + Set remainingColumnIds = new HashSet<>(idToColumnHandle.keySet()); + + getLatestStatisticsFile(icebergTable, tableHandle.getSnapshotId()).ifPresent(statisticsFile -> { + Map thetaBlobsByFieldId = statisticsFile.blobMetadata().stream() + .filter(blobMetadata -> blobMetadata.type().equals(APACHE_DATASKETCHES_THETA_V1)) + .filter(blobMetadata -> { + try { + return remainingColumnIds.contains(getOnlyElement(blobMetadata.fields())); + } + catch (IllegalArgumentException e) { + throw new PrestoException(ICEBERG_INVALID_METADATA, + format("blob metadata for blob type %s in statistics file %s must contain only one field. Found %d fields", + APACHE_DATASKETCHES_THETA_V1, statisticsFile.path(), blobMetadata.fields().size())); + } + }) + .collect(toImmutableMap(blobMetadata -> getOnlyElement(blobMetadata.fields()), identity())); + + for (Map.Entry entry : thetaBlobsByFieldId.entrySet()) { + int fieldId = entry.getKey(); + BlobMetadata blobMetadata = entry.getValue(); + String ndv = blobMetadata.properties().get(ICEBERG_APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY); + if (ndv == null) { + log.debug("Blob %s is missing %s property", blobMetadata.type(), ICEBERG_APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY); + remainingColumnIds.remove(fieldId); + } + else { + remainingColumnIds.remove(fieldId); + ndvByColumnId.put(fieldId, parseLong(ndv)); + } + } + }); + Map ndvById = ndvByColumnId.build(); + double recordCount = summary.getRecordCount(); TableStatistics.Builder result = TableStatistics.builder(); result.setRowCount(Estimate.of(recordCount)); result.setTotalSize(Estimate.of(summary.getSize())); - for (IcebergColumnHandle columnHandle : idToColumnHandle.values()) { + for (IcebergColumnHandle columnHandle : selectedColumns) { int fieldId = columnHandle.getId(); ColumnStatistics.Builder columnBuilder = new ColumnStatistics.Builder(); Long nullCount = summary.getNullCounts().get(fieldId); @@ -194,6 +247,7 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Cons if (min instanceof Number && max instanceof Number) { columnBuilder.setRange(Optional.of(new DoubleRange(((Number) min).doubleValue(), ((Number) max).doubleValue()))); } + Optional.ofNullable(ndvById.get(fieldId)).ifPresent(ndv -> columnBuilder.setDistinctValuesCount(Estimate.of(ndv))); result.setColumnStatistics(columnHandle, columnBuilder.build()); } return result.build(); @@ -209,6 +263,51 @@ private boolean dataFileMatches( return true; } + private static Optional getLatestStatisticsFile(Table table, Optional snapshotId) + { + if (table.statisticsFiles().isEmpty()) { + return Optional.empty(); + } + + Map statsFileBySnapshot = table.statisticsFiles().stream() + .collect(toImmutableMap( + StatisticsFile::snapshotId, + identity(), + (file1, file2) -> { + throw new PrestoException( + ICEBERG_INVALID_METADATA, + format("Table '%s' has duplicate statistics files '%s' and '%s' for snapshot ID %s", + table, file1.path(), file2.path(), file1.snapshotId())); + })); + + return stream(snapshotWalk(table, snapshotId.orElse(table.currentSnapshot().snapshotId()))) + .map(statsFileBySnapshot::get) + .filter(Objects::nonNull) + .findFirst(); + } + + public static Iterator snapshotWalk(Table table, Long startSnapshot) + { + return new Iterator() + { + private Long currentSnapshot = startSnapshot; + + @Override + public boolean hasNext() + { + return currentSnapshot != null && table.snapshot(currentSnapshot) != null; + } + + @Override + public Long next() + { + Snapshot snapshot = table.snapshot(currentSnapshot); + currentSnapshot = snapshot.parentId(); + return snapshot.snapshotId(); + } + }; + } + private NullableValue makeNullableValue(com.facebook.presto.common.type.Type type, Object value) { return value == null ? NullableValue.asNull(type) : NullableValue.of(type, value); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/HiveStatisticsMergeStrategy.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/HiveStatisticsMergeStrategy.java new file mode 100644 index 0000000000000..419a04cad263f --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/HiveStatisticsMergeStrategy.java @@ -0,0 +1,37 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.util; + +/** + * strategies that define how to merge hive column statistics into Iceberg column statistics. + */ +public enum HiveStatisticsMergeStrategy +{ + /** + * Do not merge statistics from Hive + */ + NONE, + /** + * Only merge NDV statistics from hive + */ + USE_NDV, + /** + * Only merge null fractions from hive + */ + USE_NULLS_FRACTIONS, + /** + * Merge both null fractions and NDVs from Hive + */ + USE_NULLS_FRACTION_AND_NDV, +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/StatisticsUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/StatisticsUtil.java new file mode 100644 index 0000000000000..c64b1218fdfd7 --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/util/StatisticsUtil.java @@ -0,0 +1,85 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.util; + +import com.facebook.presto.hive.metastore.HiveColumnStatistics; +import com.facebook.presto.hive.metastore.PartitionStatistics; +import com.facebook.presto.iceberg.IcebergColumnHandle; +import com.facebook.presto.spi.statistics.ColumnStatistics; +import com.facebook.presto.spi.statistics.Estimate; +import com.facebook.presto.spi.statistics.TableStatistics; +import org.apache.iceberg.PartitionSpec; + +import java.util.Map; + +import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.NONE; +import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.USE_NDV; +import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.USE_NULLS_FRACTIONS; +import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.USE_NULLS_FRACTION_AND_NDV; + +public final class StatisticsUtil +{ + private StatisticsUtil() + { + } + + public static TableStatistics mergeHiveStatistics(TableStatistics icebergStatistics, PartitionStatistics hiveStatistics, HiveStatisticsMergeStrategy mergeStrategy, PartitionSpec spec) + { + if (mergeStrategy.equals(NONE) || spec.isPartitioned()) { + return icebergStatistics; + } + // We really only need to merge in NDVs and null fractions from the column statistics in hive's stats + // + // take iceberg data and column size, and row count statistics over hive's as they're more likely + // to be up to date since they are computed on the fly + Map columnStats = hiveStatistics.getColumnStatistics(); + TableStatistics.Builder statsBuilder = TableStatistics.builder(); + statsBuilder.setTotalSize(icebergStatistics.getTotalSize()); + statsBuilder.setRowCount(icebergStatistics.getRowCount()); + icebergStatistics.getColumnStatistics().forEach((columnHandle, icebergColumnStats) -> { + HiveColumnStatistics hiveColumnStats = columnStats.get(((IcebergColumnHandle) columnHandle).getName()); + ColumnStatistics.Builder mergedStats = ColumnStatistics.builder() + .setDataSize(icebergColumnStats.getDataSize()) + .setDistinctValuesCount(icebergColumnStats.getDistinctValuesCount()) + .setRange(icebergColumnStats.getRange()) + .setNullsFraction(icebergColumnStats.getNullsFraction()) + .setDistinctValuesCount(icebergColumnStats.getDistinctValuesCount()) + .setRange(icebergColumnStats.getRange()); + if (hiveColumnStats != null) { + if (mergeStrategy.equals(USE_NDV) || mergeStrategy.equals(USE_NULLS_FRACTION_AND_NDV)) { + hiveColumnStats.getDistinctValuesCount().ifPresent(ndvs -> mergedStats.setDistinctValuesCount(Estimate.of(ndvs))); + } + if (mergeStrategy.equals(USE_NULLS_FRACTIONS) || mergeStrategy.equals(USE_NULLS_FRACTION_AND_NDV)) { + hiveColumnStats.getNullsCount().ifPresent(nullCount -> { + Estimate nullsFraction; + if (!hiveStatistics.getBasicStatistics().getRowCount().isPresent()) { + if (icebergStatistics.getRowCount().isUnknown()) { + nullsFraction = Estimate.unknown(); + } + else { + nullsFraction = Estimate.of((double) nullCount / icebergStatistics.getRowCount().getValue()); + } + } + else { + nullsFraction = Estimate.of((double) nullCount / hiveStatistics.getBasicStatistics().getRowCount().getAsLong()); + } + mergedStats.setNullsFraction(nullsFraction); + }); + } + } + statsBuilder.setColumnStatistics(columnHandle, mergedStats.build()); + }); + return statsBuilder.build(); + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java index 1315477aa972f..cf6dc6d3a7ffe 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.iceberg; +import com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy; import com.google.common.collect.ImmutableMap; import org.testng.annotations.Test; @@ -27,6 +28,7 @@ import static com.facebook.presto.iceberg.CatalogType.HIVE; import static com.facebook.presto.iceberg.IcebergFileFormat.ORC; import static com.facebook.presto.iceberg.IcebergFileFormat.PARQUET; +import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.USE_NDV; public class TestIcebergConfig { @@ -40,6 +42,7 @@ public void testDefaults() .setCatalogWarehouse(null) .setCatalogCacheSize(10) .setHadoopConfigResources(null) + .setHiveStatisticsMergeStrategy(HiveStatisticsMergeStrategy.NONE) .setMaxPartitionsPerWriter(100) .setMinimumAssignedSplitWeight(0.05) .setParquetDereferencePushdownEnabled(true) @@ -60,6 +63,7 @@ public void testExplicitPropertyMappings() .put("iceberg.minimum-assigned-split-weight", "0.01") .put("iceberg.enable-parquet-dereference-pushdown", "false") .put("iceberg.enable-merge-on-read-mode", "true") + .put("iceberg.hive-statistics-merge-strategy", "USE_NDV") .build(); IcebergConfig expected = new IcebergConfig() @@ -72,7 +76,8 @@ public void testExplicitPropertyMappings() .setMaxPartitionsPerWriter(222) .setMinimumAssignedSplitWeight(0.01) .setParquetDereferencePushdownEnabled(false) - .setMergeOnReadModeEnabled(true); + .setMergeOnReadModeEnabled(true) + .setHiveStatisticsMergeStrategy(USE_NDV); assertFullMapping(properties, expected); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestStatisticsUtil.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestStatisticsUtil.java new file mode 100644 index 0000000000000..3b25ef185de20 --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestStatisticsUtil.java @@ -0,0 +1,144 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.presto.hive.HiveBasicStatistics; +import com.facebook.presto.hive.metastore.HiveColumnStatistics; +import com.facebook.presto.hive.metastore.PartitionStatistics; +import com.facebook.presto.iceberg.ColumnIdentity.TypeCategory; +import com.facebook.presto.spi.statistics.ColumnStatistics; +import com.facebook.presto.spi.statistics.DoubleRange; +import com.facebook.presto.spi.statistics.Estimate; +import com.facebook.presto.spi.statistics.TableStatistics; +import com.google.common.collect.ImmutableMap; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Types; +import org.testng.annotations.Test; + +import java.util.Collections; +import java.util.Optional; +import java.util.OptionalLong; + +import static com.facebook.presto.common.type.IntegerType.INTEGER; +import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.NONE; +import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.USE_NDV; +import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.USE_NULLS_FRACTIONS; +import static com.facebook.presto.iceberg.util.HiveStatisticsMergeStrategy.USE_NULLS_FRACTION_AND_NDV; +import static com.facebook.presto.iceberg.util.StatisticsUtil.mergeHiveStatistics; +import static org.testng.Assert.assertEquals; + +public class TestStatisticsUtil +{ + @Test + public void testMergeStrategyNone() + { + TableStatistics merged = mergeHiveStatistics(generateSingleColumnIcebergStats(), generateSingleColumnHiveStatistics(), NONE, PartitionSpec.unpartitioned()); + assertEquals(Estimate.of(1), merged.getRowCount()); + assertEquals(Estimate.unknown(), merged.getTotalSize()); + assertEquals(1, merged.getColumnStatistics().size()); + ColumnStatistics stats = merged.getColumnStatistics().values().stream().findFirst().get(); + assertEquals(Estimate.of(8), stats.getDataSize()); + assertEquals(Estimate.of(1), stats.getDistinctValuesCount()); + assertEquals(Estimate.of(0.1), stats.getNullsFraction()); + assertEquals(new DoubleRange(0.0, 1.0), stats.getRange().get()); + } + + @Test + public void testMergeStrategyWithPartitioned() + { + TableStatistics merged = mergeHiveStatistics(generateSingleColumnIcebergStats(), generateSingleColumnHiveStatistics(), USE_NULLS_FRACTION_AND_NDV, + PartitionSpec.builderFor(new Schema(Types.NestedField.required(0, "test", Types.IntegerType.get()))).bucket("test", 100).build()); + assertEquals(Estimate.of(1), merged.getRowCount()); + assertEquals(Estimate.unknown(), merged.getTotalSize()); + assertEquals(1, merged.getColumnStatistics().size()); + ColumnStatistics stats = merged.getColumnStatistics().values().stream().findFirst().get(); + assertEquals(Estimate.of(8), stats.getDataSize()); + assertEquals(Estimate.of(1), stats.getDistinctValuesCount()); + assertEquals(Estimate.of(0.1), stats.getNullsFraction()); + assertEquals(new DoubleRange(0.0, 1.0), stats.getRange().get()); + } + + @Test + public void testMergeStrategyNDVs() + { + TableStatistics merged = mergeHiveStatistics(generateSingleColumnIcebergStats(), generateSingleColumnHiveStatistics(), USE_NDV, PartitionSpec.unpartitioned()); + assertEquals(Estimate.of(1), merged.getRowCount()); + assertEquals(Estimate.unknown(), merged.getTotalSize()); + assertEquals(1, merged.getColumnStatistics().size()); + ColumnStatistics stats = merged.getColumnStatistics().values().stream().findFirst().get(); + assertEquals(Estimate.of(8), stats.getDataSize()); + assertEquals(Estimate.of(2), stats.getDistinctValuesCount()); + assertEquals(Estimate.of(0.1), stats.getNullsFraction()); + assertEquals(new DoubleRange(0.0, 1.0), stats.getRange().get()); + } + + @Test + public void testMergeStrategyNulls() + { + TableStatistics merged = mergeHiveStatistics(generateSingleColumnIcebergStats(), generateSingleColumnHiveStatistics(), USE_NULLS_FRACTIONS, PartitionSpec.unpartitioned()); + assertEquals(Estimate.of(1), merged.getRowCount()); + assertEquals(Estimate.unknown(), merged.getTotalSize()); + assertEquals(1, merged.getColumnStatistics().size()); + ColumnStatistics stats = merged.getColumnStatistics().values().stream().findFirst().get(); + assertEquals(Estimate.of(8), stats.getDataSize()); + assertEquals(Estimate.of(1), stats.getDistinctValuesCount()); + assertEquals(Estimate.of(1.0), stats.getNullsFraction()); + assertEquals(new DoubleRange(0.0, 1.0), stats.getRange().get()); + } + + @Test + public void testMergeStrategyNDVsAndNulls() + { + TableStatistics merged = mergeHiveStatistics(generateSingleColumnIcebergStats(), generateSingleColumnHiveStatistics(), USE_NULLS_FRACTION_AND_NDV, PartitionSpec.unpartitioned()); + assertEquals(Estimate.of(1), merged.getRowCount()); + assertEquals(Estimate.unknown(), merged.getTotalSize()); + assertEquals(1, merged.getColumnStatistics().size()); + ColumnStatistics stats = merged.getColumnStatistics().values().stream().findFirst().get(); + assertEquals(Estimate.of(8), stats.getDataSize()); + assertEquals(Estimate.of(2), stats.getDistinctValuesCount()); + assertEquals(Estimate.of(1.0), stats.getNullsFraction()); + assertEquals(new DoubleRange(0.0, 1.0), stats.getRange().get()); + } + + private static TableStatistics generateSingleColumnIcebergStats() + { + return TableStatistics.builder() + .setRowCount(Estimate.of(1)) + .setTotalSize(Estimate.unknown()) + .setColumnStatistics(new IcebergColumnHandle( + new ColumnIdentity(1, "test", TypeCategory.PRIMITIVE, Collections.emptyList()), + INTEGER, + Optional.empty()), + ColumnStatistics.builder() + .setNullsFraction(Estimate.of(0.1)) + .setRange(new DoubleRange(0.0, 1.0)) + .setDataSize(Estimate.of(8)) + .setDistinctValuesCount(Estimate.of(1)) + .build()) + .build(); + } + + private static PartitionStatistics generateSingleColumnHiveStatistics() + { + return new PartitionStatistics( + new HiveBasicStatistics(1, 2, 16, 16), + ImmutableMap.of("test", + HiveColumnStatistics.createIntegerColumnStatistics( + OptionalLong.of(1), + OptionalLong.of(2), + OptionalLong.of(2), + OptionalLong.of(2)))); + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java new file mode 100644 index 0000000000000..6dd53b5753776 --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java @@ -0,0 +1,352 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.hive; + +import com.facebook.presto.Session; +import com.facebook.presto.common.QualifiedObjectName; +import com.facebook.presto.common.predicate.Domain; +import com.facebook.presto.common.predicate.Range; +import com.facebook.presto.common.predicate.TupleDomain; +import com.facebook.presto.common.predicate.ValueSet; +import com.facebook.presto.common.transaction.TransactionId; +import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.Constraint; +import com.facebook.presto.spi.TableHandle; +import com.facebook.presto.spi.analyzer.MetadataResolver; +import com.facebook.presto.spi.security.AllowAllAccessControl; +import com.facebook.presto.spi.statistics.ColumnStatistics; +import com.facebook.presto.spi.statistics.TableStatistics; +import com.facebook.presto.testing.MaterializedResult; +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestQueryFramework; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static com.facebook.presto.common.type.DoubleType.DOUBLE; +import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner; +import static com.facebook.presto.testing.assertions.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +/** + * This class tests the ability for the Hive implementation of the IcebergMetadata to store and read + * table statistics + */ +public class TestIcebergHiveStatistics + extends AbstractTestQueryFramework +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return createIcebergQueryRunner(ImmutableMap.of(), ImmutableMap.of("iceberg.hive-statistics-merge-strategy", "USE_NDV")); + } + + private static final Set NUMERIC_ORDERS_COLUMNS = ImmutableSet.builder() + .add("orderkey") + .add("custkey") + .add("totalprice") + .add("orderdate") + .add("shippriority") + .build(); + + private static final Set NON_NUMERIC_ORDERS_COLUMNS = ImmutableSet.builder() + .add("orderstatus") + .add("orderpriority") + .add("clerk") + .add("comment") + .build(); + private static final Set ALL_ORDERS_COLUMNS = Arrays.stream(StatsSchema.values()).map(StatsSchema::getColumnName).collect(Collectors.toSet()); + + enum StatsSchema + { + COLUMN_NAME("column_name"), + DATA_SIZE("data_size"), + DISTINCT_VALUES_COUNT("distinct_values_count"), + NULLS_FRACTION("nulls_fraction"), + ROW_COUNT("row_count"), + LOW_VALUE("low_value"), + HIGH_VALUE("high_value"); + private final String name; + + StatsSchema(String name) + { + this.name = name; + } + + public String getColumnName() + { + return name; + } + } + + /** + * ensures that the stats not provided by {@link com.facebook.presto.iceberg.TableStatisticsMaker} are + * populated and served from the metadata after running an ANALYZE query + */ + @Test + public void testSimpleAnalyze() + { + assertQuerySucceeds("CREATE TABLE simpleAnalyze as SELECT * FROM orders"); + MaterializedResult stats = getQueryRunner().execute("SHOW STATS FOR simpleAnalyze"); + assertStatValueAbsent(StatsSchema.DISTINCT_VALUES_COUNT, stats, NUMERIC_ORDERS_COLUMNS); + assertQuerySucceeds("ANALYZE simpleAnalyze"); + stats = getQueryRunner().execute("SHOW STATS FOR simpleAnalyze"); + assertStatValuePresent(StatsSchema.DISTINCT_VALUES_COUNT, stats, NUMERIC_ORDERS_COLUMNS); + assertStatValuePresent(StatsSchema.NULLS_FRACTION, stats, NUMERIC_ORDERS_COLUMNS); + assertStatValuePresent(StatsSchema.NULLS_FRACTION, stats, NON_NUMERIC_ORDERS_COLUMNS); + } + + /** + * Tests the TableStatisticsMaker is used, even when ANALYZE has not been run yet + */ + @Test + public void testStatsBeforeAnalyze() + { + assertQuerySucceeds("CREATE TABLE statsBeforeAnalyze as SELECT * FROM orders"); + MaterializedResult stats = getQueryRunner().execute("SHOW STATS FOR statsBeforeAnalyze"); + assertStatValueAbsent(StatsSchema.DISTINCT_VALUES_COUNT, stats, NUMERIC_ORDERS_COLUMNS); + assertStatValuePresent(StatsSchema.DATA_SIZE, stats, ALL_ORDERS_COLUMNS); + assertStatValuePresent(StatsSchema.LOW_VALUE, stats, NUMERIC_ORDERS_COLUMNS); + assertStatValuePresent(StatsSchema.HIGH_VALUE, stats, ALL_ORDERS_COLUMNS); + } + + @Test + public void testStatsWithPartitionedTableAnalyzed() + { + assertQuerySucceeds("CREATE TABLE statsNoPartitionAnalyze as SELECT * FROM orders LIMIT 100"); + assertQuerySucceeds("CREATE TABLE statsWithPartitionAnalyze WITH (partitioning = ARRAY['orderdate']) as SELECT * FROM statsNoPartitionAnalyze"); + assertQuerySucceeds("ANALYZE statsNoPartitionAnalyze"); + assertQuerySucceeds("ANALYZE statsWithPartitionAnalyze"); + Metadata meta = getQueryRunner().getMetadata(); + TransactionId txid = getQueryRunner().getTransactionManager().beginTransaction(false); + Session session = getSession().beginTransactionId(txid, getQueryRunner().getTransactionManager(), new AllowAllAccessControl()); + Map noPartColumns = getColumnHandles("statsnopartitionanalyze", session); + Map partColumns = getColumnHandles("statswithpartitionanalyze", session); + List noPartColumnHandles = new ArrayList<>(noPartColumns.values()); + List partColumnHandles = new ArrayList<>(partColumns.values()); + // Test that with all columns and no constraints that stats are equivalent + TableStatistics statsNoPartition = meta.getTableStatistics(session, getAnalyzeTableHandle("statsNoPartitionAnalyze", session), noPartColumnHandles, Constraint.alwaysTrue()); + TableStatistics statsWithPartition = meta.getTableStatistics(session, getAnalyzeTableHandle("statsWithPartitionAnalyze", session), partColumnHandles, Constraint.alwaysTrue()); + assertNDVsPresent(statsNoPartition); + assertNDVsNotPresent(statsWithPartition); + assertEquals(statsNoPartition.getRowCount(), statsWithPartition.getRowCount()); + + // Test that with one column and no constraints that stats are equivalent + statsNoPartition = meta.getTableStatistics(session, getAnalyzeTableHandle("statsNoPartitionAnalyze", session), Collections.singletonList(noPartColumns.get("totalprice")), Constraint.alwaysTrue()); + statsWithPartition = meta.getTableStatistics(session, getAnalyzeTableHandle("statsWithPartitionAnalyze", session), Collections.singletonList(partColumns.get("totalprice")), Constraint.alwaysTrue()); + assertEquals(statsNoPartition.getRowCount(), statsWithPartition.getRowCount()); + assertEquals(statsNoPartition.getColumnStatistics().size(), 1); + assertEquals(statsWithPartition.getColumnStatistics().size(), 1); + assertNotNull(statsWithPartition.getColumnStatistics().get(partColumns.get("totalprice"))); + assertNotNull(statsNoPartition.getColumnStatistics().get(noPartColumns.get("totalprice"))); + assertNDVsPresent(statsNoPartition); + assertNDVsNotPresent(statsWithPartition); + + // Test that with all columns and a Tuple constraint that stats are equivalent + statsNoPartition = meta.getTableStatistics(session, getAnalyzeTableHandle("statsNoPartitionAnalyze", session), noPartColumnHandles, Constraint.alwaysTrue()); + statsWithPartition = meta.getTableStatistics(session, getAnalyzeTableHandle("statsWithPartitionAnalyze", session), partColumnHandles, Constraint.alwaysTrue()); + assertEquals(statsNoPartition.getRowCount(), statsWithPartition.getRowCount()); + assertEquals(statsNoPartition.getColumnStatistics().size(), noPartColumnHandles.size()); + assertEquals(statsWithPartition.getColumnStatistics().size(), partColumnHandles.size()); + assertNDVsPresent(statsNoPartition); + assertNDVsNotPresent(statsWithPartition); + + // Test that with one column and a constraint on that column that the partitioned stats return less values + Constraint noPartConstraint = constraintWithMinValue(noPartColumns.get("totalprice"), 100000.0); + Constraint partConstraint = constraintWithMinValue(partColumns.get("totalprice"), 100000.0); + statsNoPartition = meta.getTableStatistics(session, getAnalyzeTableHandle("statsNoPartitionAnalyze", session), Collections.singletonList(noPartColumns.get("totalprice")), noPartConstraint); + statsWithPartition = meta.getTableStatistics(session, getAnalyzeTableHandle("statsWithPartitionAnalyze", session), Collections.singletonList(partColumns.get("totalprice")), partConstraint); + // ensure partitioned table actually returns less rows + assertEquals(statsNoPartition.getColumnStatistics().size(), 1); + assertEquals(statsWithPartition.getColumnStatistics().size(), 1); + assertNotNull(statsWithPartition.getColumnStatistics().get(partColumns.get("totalprice"))); + assertNotNull(statsNoPartition.getColumnStatistics().get(noPartColumns.get("totalprice"))); + assertNDVsPresent(statsNoPartition); + assertNDVsNotPresent(statsWithPartition); + // partitioned table should have stats partially filtered since data should span > 1 file + assertTrue(statsWithPartition.getRowCount().getValue() < statsNoPartition.getRowCount().getValue()); + + // Test that with one column and a constraint on a different column that stats are equivalent. + noPartConstraint = constraintWithMinValue(noPartColumns.get("totalprice"), 100000.0); + partConstraint = constraintWithMinValue(partColumns.get("totalprice"), 100000.0); + statsNoPartition = meta.getTableStatistics(session, getAnalyzeTableHandle("statsNoPartitionAnalyze", session), Collections.singletonList(noPartColumns.get("orderkey")), noPartConstraint); + statsWithPartition = meta.getTableStatistics(session, getAnalyzeTableHandle("statsWithPartitionAnalyze", session), Collections.singletonList(partColumns.get("orderkey")), partConstraint); + assertEquals(statsNoPartition.getColumnStatistics().size(), 1); + assertEquals(statsWithPartition.getColumnStatistics().size(), 1); + assertNotNull(statsWithPartition.getColumnStatistics().get(partColumns.get("orderkey"))); + assertNotNull(statsNoPartition.getColumnStatistics().get(noPartColumns.get("orderkey"))); + assertNDVsPresent(statsNoPartition); + assertNDVsNotPresent(statsWithPartition); + // partitioned table should have stats partially filtered since data should span > 1 file + assertTrue(statsWithPartition.getRowCount().getValue() < statsNoPartition.getRowCount().getValue()); + + assertQuerySucceeds("DROP TABLE statsNoPartitionAnalyze"); + assertQuerySucceeds("DROP TABLE statsWithPartitionAnalyze"); + } + + @Test + public void testStatsWithPartitionedTablesNoAnalyze() + { + assertQuerySucceeds("CREATE TABLE statsNoPartition as SELECT * FROM orders LIMIT 100"); + assertQuerySucceeds("CREATE TABLE statsWithPartition WITH (partitioning = ARRAY['orderdate']) as SELECT * FROM statsNoPartition"); + Metadata meta = getQueryRunner().getMetadata(); + TransactionId txid = getQueryRunner().getTransactionManager().beginTransaction(false); + Session s = getSession().beginTransactionId(txid, getQueryRunner().getTransactionManager(), new AllowAllAccessControl()); + Map noPartColumns = getColumnHandles("statsnopartition", s); + Map partColumns = getColumnHandles("statswithpartition", s); + List noPartColumnHandles = new ArrayList<>(noPartColumns.values()); + List partColumnHandles = new ArrayList<>(partColumns.values()); + // Test that with all columns and no constraints that stats are equivalent + TableStatistics statsNoPartition = meta.getTableStatistics(s, getAnalyzeTableHandle("statsNoPartition", s), noPartColumnHandles, Constraint.alwaysTrue()); + TableStatistics statsWithPartition = meta.getTableStatistics(s, getAnalyzeTableHandle("statsWithPartition", s), partColumnHandles, Constraint.alwaysTrue()); + assertEquals(statsNoPartition.getRowCount(), statsWithPartition.getRowCount()); + columnStatsEqual(statsNoPartition.getColumnStatistics(), statsWithPartition.getColumnStatistics()); + + // Test that with one column and no constraints that stats are equivalent + statsNoPartition = meta.getTableStatistics(s, getAnalyzeTableHandle("statsNoPartition", s), Collections.singletonList(noPartColumns.get("totalprice")), Constraint.alwaysTrue()); + statsWithPartition = meta.getTableStatistics(s, getAnalyzeTableHandle("statsWithPartition", s), Collections.singletonList(partColumns.get("totalprice")), Constraint.alwaysTrue()); + assertEquals(statsNoPartition.getRowCount(), statsWithPartition.getRowCount()); + assertEquals(statsNoPartition.getColumnStatistics().size(), 1); + assertEquals(statsWithPartition.getColumnStatistics().size(), 1); + assertNotNull(statsWithPartition.getColumnStatistics().get(partColumns.get("totalprice"))); + assertNotNull(statsNoPartition.getColumnStatistics().get(noPartColumns.get("totalprice"))); + columnStatsEqual(statsNoPartition.getColumnStatistics(), statsWithPartition.getColumnStatistics()); + + // Test that with all columns and a Tuple constraint that stats are equivalent + statsNoPartition = meta.getTableStatistics(s, getAnalyzeTableHandle("statsNoPartition", s), noPartColumnHandles, Constraint.alwaysTrue()); + statsWithPartition = meta.getTableStatistics(s, getAnalyzeTableHandle("statsWithPartition", s), partColumnHandles, Constraint.alwaysTrue()); + assertEquals(statsNoPartition.getRowCount(), statsWithPartition.getRowCount()); + assertEquals(statsNoPartition.getColumnStatistics().size(), noPartColumnHandles.size()); + assertEquals(statsWithPartition.getColumnStatistics().size(), partColumnHandles.size()); + columnStatsEqual(statsNoPartition.getColumnStatistics(), statsWithPartition.getColumnStatistics()); + + // Test that with one column and a constraint on that column that the partitioned stats return less values + Constraint noPartConstraint = constraintWithMinValue(noPartColumns.get("totalprice"), 100000.0); + Constraint partConstraint = constraintWithMinValue(partColumns.get("totalprice"), 100000.0); + statsNoPartition = meta.getTableStatistics(s, getAnalyzeTableHandle("statsNoPartition", s), Collections.singletonList(noPartColumns.get("totalprice")), noPartConstraint); + statsWithPartition = meta.getTableStatistics(s, getAnalyzeTableHandle("statsWithPartition", s), Collections.singletonList(partColumns.get("totalprice")), partConstraint); + // ensure partitioned table actually returns less rows + assertEquals(statsNoPartition.getColumnStatistics().size(), 1); + assertEquals(statsWithPartition.getColumnStatistics().size(), 1); + assertNotNull(statsWithPartition.getColumnStatistics().get(partColumns.get("totalprice"))); + assertNotNull(statsNoPartition.getColumnStatistics().get(noPartColumns.get("totalprice"))); + // partitioned table should have stats partially filtered since data should span > 1 file + assertTrue(statsWithPartition.getRowCount().getValue() < statsNoPartition.getRowCount().getValue()); + + // Test that with one column and a constraint on a different column that stats are equivalent. + noPartConstraint = constraintWithMinValue(noPartColumns.get("totalprice"), 100000.0); + partConstraint = constraintWithMinValue(partColumns.get("totalprice"), 100000.0); + statsNoPartition = meta.getTableStatistics(s, getAnalyzeTableHandle("statsNoPartition", s), Collections.singletonList(noPartColumns.get("orderkey")), noPartConstraint); + statsWithPartition = meta.getTableStatistics(s, getAnalyzeTableHandle("statsWithPartition", s), Collections.singletonList(partColumns.get("orderkey")), partConstraint); + assertEquals(statsNoPartition.getColumnStatistics().size(), 1); + assertEquals(statsWithPartition.getColumnStatistics().size(), 1); + assertNotNull(statsWithPartition.getColumnStatistics().get(partColumns.get("orderkey"))); + assertNotNull(statsNoPartition.getColumnStatistics().get(noPartColumns.get("orderkey"))); + // partitioned table should have stats partially filtered since data should span > 1 file + assertTrue(statsWithPartition.getRowCount().getValue() < statsNoPartition.getRowCount().getValue()); + assertQuerySucceeds("DROP TABLE statsNoPartition"); + assertQuerySucceeds("DROP TABLE statsWithPartition"); + } + + private void columnStatsEqual(Map actualStats, Map expectedStats) + { + for (ColumnHandle handle : actualStats.keySet()) { + ColumnStatistics actual = actualStats.get(handle); + ColumnStatistics expected = expectedStats.get(handle); + assertEquals(actual.getRange(), expected.getRange(), "range for col: " + handle); + assertEquals(actual.getNullsFraction(), expected.getNullsFraction(), "nulls fraction for col: " + handle); + assertEquals(actual.getDistinctValuesCount(), expected.getDistinctValuesCount(), "NDVs for col: " + handle); + } + } + + private static Constraint constraintWithMinValue(ColumnHandle col, Double min) + { + return new Constraint<>( + TupleDomain.withColumnDomains( + ImmutableMap.of(col, Domain.create(ValueSet.ofRanges(Range.greaterThan(DOUBLE, min)), true)))); + } + + private TableHandle getAnalyzeTableHandle(String tableName, Session session) + { + Metadata meta = getQueryRunner().getMetadata(); + return meta.getTableHandleForStatisticsCollection( + session, + new QualifiedObjectName("iceberg", "tpch", tableName.toLowerCase(Locale.US)), + Collections.emptyMap()).get(); + } + + private TableHandle getTableHandle(String tableName, Session session) + { + MetadataResolver resolver = getQueryRunner().getMetadata().getMetadataResolver(session); + return resolver.getTableHandle(new QualifiedObjectName("iceberg", "tpch", tableName.toLowerCase(Locale.US))).get(); + } + + private Map getColumnHandles(String tableName, Session session) + { + return getQueryRunner().getMetadata().getColumnHandles(session, getTableHandle(tableName, session)); + } + + static void assertStatValuePresent(StatsSchema column, MaterializedResult result, Set columnNames) + { + assertStatValue(column, result, columnNames, null, true); + } + + static void assertStatValueAbsent(StatsSchema column, MaterializedResult result, Set columnNames) + { + assertStatValue(column, result, columnNames, null, false); + } + + static void assertStatValue(StatsSchema column, MaterializedResult result, Set columnNames, Object value, boolean assertNot) + { + result.getMaterializedRows().forEach(row -> { + if (columnNames.contains((String) row.getField(StatsSchema.COLUMN_NAME.ordinal()))) { + Object resultValue = row.getField(column.ordinal()); + if (assertNot) { + assertNotEquals(resultValue, value); + } + else { + assertEquals(resultValue, value); + } + } + }); + } + + static void assertNDVsPresent(TableStatistics stats) + { + for (Map.Entry entry : stats.getColumnStatistics().entrySet()) { + assertFalse(entry.getValue().getDistinctValuesCount().isUnknown(), entry.getKey() + " NDVs are unknown"); + } + } + + static void assertNDVsNotPresent(TableStatistics stats) + { + for (Map.Entry entry : stats.getColumnStatistics().entrySet()) { + assertTrue(entry.getValue().getDistinctValuesCount().isUnknown(), entry.getKey() + " NDVs are not unknown"); + } + } +}