From 45bcbcf5e852e62339a859472ce80fc3a1e841d8 Mon Sep 17 00:00:00 2001 From: Zac Blanco Date: Wed, 27 Sep 2023 11:50:25 -0700 Subject: [PATCH] [Iceberg] Write Presto statistics to HMS This change adds the ability for Hive-configured Iceberg connectors to write calculated table statistics from ANALYZE into the Hive metastore. --- .../presto/hive/HiveStatisticsUtil.java | 109 ++++++++++++++++++ .../SemiTransactionalHiveMetastore.java | 25 +--- .../facebook/presto/hive/HiveMetadata.java | 60 ++-------- presto-iceberg/pom.xml | 5 + .../presto/iceberg/IcebergHiveMetadata.java | 97 +++++++++++++--- 5 files changed, 209 insertions(+), 87 deletions(-) create mode 100644 presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveStatisticsUtil.java diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveStatisticsUtil.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveStatisticsUtil.java new file mode 100644 index 0000000000000..f4cd3b5b6ba6e --- /dev/null +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveStatisticsUtil.java @@ -0,0 +1,109 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive; + +import com.facebook.presto.common.block.Block; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.hive.metastore.HiveColumnStatistics; +import com.facebook.presto.hive.metastore.PartitionStatistics; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.statistics.ColumnStatisticMetadata; +import com.facebook.presto.spi.statistics.ComputedStatistics; +import com.google.common.base.VerifyException; +import com.google.common.collect.ImmutableMap; +import org.joda.time.DateTimeZone; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; + +import static com.facebook.presto.common.type.BigintType.BIGINT; +import static com.facebook.presto.hive.metastore.Statistics.fromComputedStatistics; +import static com.facebook.presto.spi.statistics.TableStatisticType.ROW_COUNT; +import static com.google.common.base.Verify.verify; + +public class HiveStatisticsUtil +{ + private HiveStatisticsUtil() + { + } + + public static PartitionStatistics createPartitionStatistics( + ConnectorSession session, + HiveBasicStatistics basicStatistics, + Map columnTypes, + Map computedColumnStatistics, + DateTimeZone timeZone) + { + long rowCount = basicStatistics.getRowCount().orElseThrow(() -> new IllegalArgumentException("rowCount not present")); + Map columnStatistics = fromComputedStatistics( + session, + timeZone, + computedColumnStatistics, + columnTypes, + rowCount); + return new PartitionStatistics(basicStatistics, columnStatistics); + } + + public static PartitionStatistics createPartitionStatistics( + ConnectorSession session, + Map columnTypes, + ComputedStatistics computedStatistics, + DateTimeZone timeZone) + { + Map computedColumnStatistics = computedStatistics.getColumnStatistics(); + + Block rowCountBlock = Optional.ofNullable(computedStatistics.getTableStatistics().get(ROW_COUNT)) + .orElseThrow(() -> new VerifyException("rowCount not present")); + verify(!rowCountBlock.isNull(0), "rowCount must never be null"); + long rowCount = BIGINT.getLong(rowCountBlock, 0); + HiveBasicStatistics rowCountOnlyBasicStatistics = new HiveBasicStatistics(OptionalLong.empty(), OptionalLong.of(rowCount), OptionalLong.empty(), OptionalLong.empty()); + return createPartitionStatistics(session, rowCountOnlyBasicStatistics, columnTypes, computedColumnStatistics, timeZone); + } + + public static Map getColumnStatistics(Map, ComputedStatistics> statistics, List partitionValues) + { + return Optional.ofNullable(statistics.get(partitionValues)) + .map(ComputedStatistics::getColumnStatistics) + .orElse(ImmutableMap.of()); + } + + // TODO: Collect file count, on-disk size and in-memory size during ANALYZE + /** + * This method updates old {@link PartitionStatistics} with new statistics, only if the new + * partition stats are not empty. This method always overwrites each of the + * {@link HiveColumnStatistics} contained in the new partition statistics. + * + * @param oldPartitionStats old version of partition statistics + * @param newPartitionStats new version of partition statistics + * @return updated partition statistics + */ + public static PartitionStatistics updatePartitionStatistics(PartitionStatistics oldPartitionStats, PartitionStatistics newPartitionStats) + { + HiveBasicStatistics oldBasicStatistics = oldPartitionStats.getBasicStatistics(); + HiveBasicStatistics newBasicStatistics = newPartitionStats.getBasicStatistics(); + HiveBasicStatistics updatedBasicStatistics = new HiveBasicStatistics( + firstPresent(newBasicStatistics.getFileCount(), oldBasicStatistics.getFileCount()), + firstPresent(newBasicStatistics.getRowCount(), oldBasicStatistics.getRowCount()), + firstPresent(newBasicStatistics.getInMemoryDataSizeInBytes(), oldBasicStatistics.getInMemoryDataSizeInBytes()), + firstPresent(newBasicStatistics.getOnDiskDataSizeInBytes(), oldBasicStatistics.getOnDiskDataSizeInBytes())); + return new PartitionStatistics(updatedBasicStatistics, newPartitionStats.getColumnStatistics()); + } + + private static OptionalLong firstPresent(OptionalLong first, OptionalLong second) + { + return first.isPresent() ? first : second; + } +} diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/SemiTransactionalHiveMetastore.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/SemiTransactionalHiveMetastore.java index 5d1985462bf44..d54a8ad36cbd4 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/SemiTransactionalHiveMetastore.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/SemiTransactionalHiveMetastore.java @@ -19,7 +19,6 @@ import com.facebook.presto.hive.ColumnConverterProvider; import com.facebook.presto.hive.HdfsContext; import com.facebook.presto.hive.HdfsEnvironment; -import com.facebook.presto.hive.HiveBasicStatistics; import com.facebook.presto.hive.HiveTableHandle; import com.facebook.presto.hive.HiveType; import com.facebook.presto.hive.LocationHandle.WriteMode; @@ -63,7 +62,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -75,6 +73,7 @@ import static com.facebook.presto.hive.HiveErrorCode.HIVE_METASTORE_USER_ERROR; import static com.facebook.presto.hive.HiveErrorCode.HIVE_PATH_ALREADY_EXISTS; import static com.facebook.presto.hive.HiveErrorCode.HIVE_TABLE_DROPPED_DURING_QUERY; +import static com.facebook.presto.hive.HiveStatisticsUtil.updatePartitionStatistics; import static com.facebook.presto.hive.LocationHandle.WriteMode.DIRECT_TO_TARGET_NEW_DIRECTORY; import static com.facebook.presto.hive.metastore.HiveCommitHandle.EMPTY_HIVE_COMMIT_HANDLE; import static com.facebook.presto.hive.metastore.HivePrivilegeInfo.HivePrivilege.OWNERSHIP; @@ -398,26 +397,6 @@ public synchronized void setPartitionStatistics(MetastoreContext metastoreContex }); } - // For HiveBasicStatistics, we only overwrite the original statistics if the new one is not empty. - // For HiveColumnStatistics, we always overwrite every statistics. - // TODO: Collect file count, on-disk size and in-memory size during ANALYZE - private PartitionStatistics updatePartitionStatistics(PartitionStatistics oldPartitionStats, PartitionStatistics newPartitionStats) - { - HiveBasicStatistics oldBasicStatistics = oldPartitionStats.getBasicStatistics(); - HiveBasicStatistics newBasicStatistics = newPartitionStats.getBasicStatistics(); - HiveBasicStatistics updatedBasicStatistics = new HiveBasicStatistics( - firstPresent(newBasicStatistics.getFileCount(), oldBasicStatistics.getFileCount()), - firstPresent(newBasicStatistics.getRowCount(), oldBasicStatistics.getRowCount()), - firstPresent(newBasicStatistics.getInMemoryDataSizeInBytes(), oldBasicStatistics.getInMemoryDataSizeInBytes()), - firstPresent(newBasicStatistics.getOnDiskDataSizeInBytes(), oldBasicStatistics.getOnDiskDataSizeInBytes())); - return new PartitionStatistics(updatedBasicStatistics, newPartitionStats.getColumnStatistics()); - } - - private static OptionalLong firstPresent(OptionalLong first, OptionalLong second) - { - return first.isPresent() ? first : second; - } - /** * {@code currentLocation} needs to be supplied if a writePath exists for the table. */ @@ -837,7 +816,7 @@ public synchronized void addPartition( * @param partition The new partition object to be added * @param currentLocation The path for which the partition is added in the table * @param statistics The basic statistics and column statistics for the added partition - * @param noNeedToValidatePath check metastore file path. True for no check which is enabled by the sync partition code path only + * @param isPathValidationNeeded check metastore file path. True for no check which is enabled by the sync partition code path only */ public synchronized void addPartition( ConnectorSession session, diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java index 40f48d891a3cd..536f934e83587 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java @@ -16,7 +16,6 @@ import com.facebook.airlift.json.JsonCodec; import com.facebook.airlift.json.smile.SmileCodec; import com.facebook.presto.common.Subfield; -import com.facebook.presto.common.block.Block; import com.facebook.presto.common.predicate.Domain; import com.facebook.presto.common.predicate.NullableValue; import com.facebook.presto.common.predicate.TupleDomain; @@ -29,7 +28,6 @@ import com.facebook.presto.hive.PartitionUpdate.FileWriteInfo; import com.facebook.presto.hive.metastore.Column; import com.facebook.presto.hive.metastore.Database; -import com.facebook.presto.hive.metastore.HiveColumnStatistics; import com.facebook.presto.hive.metastore.HivePrivilegeInfo; import com.facebook.presto.hive.metastore.MetastoreContext; import com.facebook.presto.hive.metastore.MetastoreUtil; @@ -102,7 +100,6 @@ import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.base.Suppliers; -import com.google.common.base.VerifyException; import com.google.common.collect.ImmutableBiMap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -152,7 +149,6 @@ import static com.facebook.airlift.concurrent.MoreFutures.toCompletableFuture; import static com.facebook.airlift.json.JsonCodec.jsonCodec; import static com.facebook.presto.common.predicate.TupleDomain.withColumnDomains; -import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.common.type.BooleanType.BOOLEAN; import static com.facebook.presto.common.type.Chars.isCharType; import static com.facebook.presto.common.type.DateType.DATE; @@ -232,6 +228,8 @@ import static com.facebook.presto.hive.HiveSessionProperties.isStatisticsEnabled; import static com.facebook.presto.hive.HiveSessionProperties.isUsePageFileForHiveUnsupportedType; import static com.facebook.presto.hive.HiveSessionProperties.shouldCreateEmptyBucketFilesForTemporaryTable; +import static com.facebook.presto.hive.HiveStatisticsUtil.createPartitionStatistics; +import static com.facebook.presto.hive.HiveStatisticsUtil.getColumnStatistics; import static com.facebook.presto.hive.HiveStorageFormat.AVRO; import static com.facebook.presto.hive.HiveStorageFormat.DWRF; import static com.facebook.presto.hive.HiveStorageFormat.ORC; @@ -320,7 +318,6 @@ import static com.facebook.presto.hive.metastore.Statistics.ReduceOperator.ADD; import static com.facebook.presto.hive.metastore.Statistics.createComputedStatisticsToPartitionMap; import static com.facebook.presto.hive.metastore.Statistics.createEmptyPartitionStatistics; -import static com.facebook.presto.hive.metastore.Statistics.fromComputedStatistics; import static com.facebook.presto.hive.metastore.Statistics.reduce; import static com.facebook.presto.hive.metastore.StorageFormat.fromHiveStorageFormat; import static com.facebook.presto.hive.metastore.thrift.ThriftMetastoreUtil.listEnabledPrincipals; @@ -1497,7 +1494,7 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH if (partitionColumns.isEmpty()) { // commit analyze to unpartitioned table - metastore.setTableStatistics(metastoreContext, table, createPartitionStatistics(session, columnTypes, computedStatisticsMap.get(ImmutableList.of()))); + metastore.setTableStatistics(metastoreContext, table, createPartitionStatistics(session, columnTypes, computedStatisticsMap.get(ImmutableList.of()), timeZone)); } else { List> partitionValuesList; @@ -1527,7 +1524,7 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH } else { usedComputedStatistics++; - partitionStatistics.put(partitionValues, createPartitionStatistics(session, columnTypes, collectedStatistics)); + partitionStatistics.put(partitionValues, createPartitionStatistics(session, columnTypes, collectedStatistics, timeZone)); } } verify(usedComputedStatistics == computedStatistics.size(), "All computed statistics must be used"); @@ -1702,7 +1699,7 @@ public Optional finishCreateTable(ConnectorSession sess .map(PartitionUpdate::getStatistics) .reduce((first, second) -> reduce(first, second, ADD)) .orElse(createZeroStatistics()); - tableStatistics = createPartitionStatistics(session, basicStatistics, columnTypes, getColumnStatistics(partitionComputedStatistics, ImmutableList.of())); + tableStatistics = createPartitionStatistics(session, basicStatistics, columnTypes, getColumnStatistics(partitionComputedStatistics, ImmutableList.of()), timeZone); } else { tableStatistics = new PartitionStatistics(createEmptyStatistics(), ImmutableMap.of()); @@ -1728,7 +1725,7 @@ public Optional finishCreateTable(ConnectorSession sess session, update.getStatistics(), columnTypes, - getColumnStatistics(partitionComputedStatistics, partition.getValues())); + getColumnStatistics(partitionComputedStatistics, partition.getValues()), timeZone); metastore.addPartition( session, handle.getSchemaName(), @@ -2035,7 +2032,7 @@ private Optional finishInsertInternal(ConnectorSession session, partitionUpdate.getStatistics(), columnTypes, - getColumnStatistics(partitionComputedStatistics, ImmutableList.of())); + getColumnStatistics(partitionComputedStatistics, ImmutableList.of()), timeZone); metastore.finishInsertIntoExistingTable( session, handle.getSchemaName(), @@ -2054,7 +2051,7 @@ else if (partitionUpdate.getUpdateMode() == APPEND) { session, partitionUpdate.getStatistics(), columnTypes, - getColumnStatistics(partitionComputedStatistics, partitionValues)); + getColumnStatistics(partitionComputedStatistics, partitionValues), timeZone); metastore.finishInsertIntoExistingPartition( session, handle.getSchemaName(), @@ -2113,7 +2110,8 @@ else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode session, partitionUpdate.getStatistics(), columnTypes, - getColumnStatistics(partitionComputedStatistics, partition.getValues())); + getColumnStatistics(partitionComputedStatistics, partition.getValues()), + timeZone); // New partition or overwriting existing partition by staging and moving the new partition if (!isExistingPartition || handle.getLocationHandle().getWriteMode() != DIRECT_TO_TARGET_EXISTING_DIRECTORY) { @@ -2182,44 +2180,6 @@ private List getTargetFileNames(List fileWriteInfos) .collect(toImmutableList()); } - private PartitionStatistics createPartitionStatistics( - ConnectorSession session, - Map columnTypes, - ComputedStatistics computedStatistics) - { - Map computedColumnStatistics = computedStatistics.getColumnStatistics(); - - Block rowCountBlock = Optional.ofNullable(computedStatistics.getTableStatistics().get(ROW_COUNT)) - .orElseThrow(() -> new VerifyException("rowCount not present")); - verify(!rowCountBlock.isNull(0), "rowCount must never be null"); - long rowCount = BIGINT.getLong(rowCountBlock, 0); - HiveBasicStatistics rowCountOnlyBasicStatistics = new HiveBasicStatistics(OptionalLong.empty(), OptionalLong.of(rowCount), OptionalLong.empty(), OptionalLong.empty()); - return createPartitionStatistics(session, rowCountOnlyBasicStatistics, columnTypes, computedColumnStatistics); - } - - private PartitionStatistics createPartitionStatistics( - ConnectorSession session, - HiveBasicStatistics basicStatistics, - Map columnTypes, - Map computedColumnStatistics) - { - long rowCount = basicStatistics.getRowCount().orElseThrow(() -> new IllegalArgumentException("rowCount not present")); - Map columnStatistics = fromComputedStatistics( - session, - timeZone, - computedColumnStatistics, - columnTypes, - rowCount); - return new PartitionStatistics(basicStatistics, columnStatistics); - } - - private static Map getColumnStatistics(Map, ComputedStatistics> statistics, List partitionValues) - { - return Optional.ofNullable(statistics.get(partitionValues)) - .map(ComputedStatistics::getColumnStatistics) - .orElse(ImmutableMap.of()); - } - private Map> getExistingPartitionsByNames(MetastoreContext metastoreContext, String databaseName, String tableName, List partitionUpdates) { ImmutableMap.Builder> existingPartitions = ImmutableMap.builder(); diff --git a/presto-iceberg/pom.xml b/presto-iceberg/pom.xml index 08a96dd98beae..75cb68b542462 100644 --- a/presto-iceberg/pom.xml +++ b/presto-iceberg/pom.xml @@ -93,6 +93,11 @@ presto-hive-metastore + + joda-time + joda-time + + com.facebook.presto presto-memory-context diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java index 23d64c590a2ec..af07f3f489b6e 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java @@ -14,16 +14,20 @@ package com.facebook.presto.iceberg; import com.facebook.airlift.json.JsonCodec; +import com.facebook.presto.common.type.Type; import com.facebook.presto.common.type.TypeManager; import com.facebook.presto.hive.HdfsContext; import com.facebook.presto.hive.HdfsEnvironment; import com.facebook.presto.hive.HiveColumnConverterProvider; +import com.facebook.presto.hive.HiveColumnHandle; import com.facebook.presto.hive.HiveTypeTranslator; import com.facebook.presto.hive.TableAlreadyExistsException; import com.facebook.presto.hive.ViewAlreadyExistsException; +import com.facebook.presto.hive.metastore.Column; import com.facebook.presto.hive.metastore.Database; import com.facebook.presto.hive.metastore.ExtendedHiveMetastore; import com.facebook.presto.hive.metastore.MetastoreContext; +import com.facebook.presto.hive.metastore.PartitionStatistics; import com.facebook.presto.hive.metastore.PrincipalPrivileges; import com.facebook.presto.hive.metastore.Table; import com.facebook.presto.spi.ColumnMetadata; @@ -39,8 +43,13 @@ import com.facebook.presto.spi.SchemaTablePrefix; import com.facebook.presto.spi.TableNotFoundException; import com.facebook.presto.spi.ViewNotFoundException; +import com.facebook.presto.spi.statistics.ColumnStatisticMetadata; +import com.facebook.presto.spi.statistics.ComputedStatistics; +import com.facebook.presto.spi.statistics.TableStatisticType; +import com.facebook.presto.spi.statistics.TableStatisticsMetadata; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.hadoop.fs.Path; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; @@ -49,15 +58,23 @@ import org.apache.iceberg.SchemaParser; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; +import org.joda.time.DateTimeZone; import java.io.IOException; +import java.time.ZoneId; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.TimeZone; +import static com.facebook.presto.hive.HiveStatisticsUtil.createPartitionStatistics; +import static com.facebook.presto.hive.HiveStatisticsUtil.updatePartitionStatistics; import static com.facebook.presto.hive.HiveUtil.decodeViewData; import static com.facebook.presto.hive.HiveUtil.encodeViewData; +import static com.facebook.presto.hive.HiveUtil.hiveColumnHandles; import static com.facebook.presto.hive.metastore.MetastoreUtil.TABLE_COMMENT; import static com.facebook.presto.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet; import static com.facebook.presto.hive.metastore.MetastoreUtil.checkIfNullView; @@ -66,6 +83,7 @@ import static com.facebook.presto.hive.metastore.MetastoreUtil.isPrestoView; import static com.facebook.presto.hive.metastore.MetastoreUtil.isUserDefinedTypeEncodingEnabled; import static com.facebook.presto.hive.metastore.MetastoreUtil.verifyAndPopulateViews; +import static com.facebook.presto.hive.metastore.Statistics.createComputedStatisticsToPartitionMap; import static com.facebook.presto.iceberg.IcebergSchemaProperties.getSchemaLocation; import static com.facebook.presto.iceberg.IcebergTableProperties.getFileFormat; import static com.facebook.presto.iceberg.IcebergTableProperties.getFormatVersion; @@ -81,7 +99,10 @@ import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.spi.StandardErrorCode.SCHEMA_NOT_EMPTY; import static com.facebook.presto.spi.security.PrincipalType.USER; +import static com.facebook.presto.spi.statistics.TableStatisticType.ROW_COUNT; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static java.util.Objects.requireNonNull; import static org.apache.iceberg.TableMetadata.newTableMetadata; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; @@ -97,6 +118,7 @@ public class IcebergHiveMetadata private final ExtendedHiveMetastore metastore; private final HdfsEnvironment hdfsEnvironment; private final String prestoVersion; + private final DateTimeZone timeZone = DateTimeZone.forTimeZone(TimeZone.getTimeZone(ZoneId.of(TimeZone.getDefault().getID()))); public IcebergHiveMetadata( ExtendedHiveMetastore metastore, @@ -135,14 +157,13 @@ protected boolean tableExists(ConnectorSession session, SchemaTableName schemaTa @Override public List listSchemaNames(ConnectorSession session) { - MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getSource(), Optional.empty(), false, HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER); - return metastore.getAllDatabases(metastoreContext); + return metastore.getAllDatabases(getMetastoreContext(session)); } @Override public List listTables(ConnectorSession session, Optional schemaName) { - MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getSource(), Optional.empty(), false, HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER); + MetastoreContext metastoreContext = getMetastoreContext(session); // If schema name is not present, list tables from all schemas List schemaNames = schemaName .map(ImmutableList::of) @@ -176,7 +197,7 @@ public void createSchema(ConnectorSession session, String schemaName, Map new SchemaNotFoundException(schemaName)); @@ -229,7 +250,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con TableOperations operations = new HiveTableOperations( metastore, - new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getSource(), Optional.empty(), false, HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER), + getMetastoreContext(session), hdfsEnvironment, hdfsContext, schemaName, @@ -279,23 +300,20 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle table.properties().containsKey(WRITE_DATA_LOCATION)) { throw new PrestoException(NOT_SUPPORTED, "Table " + handle.getSchemaTableName() + " contains Iceberg path override properties and cannot be dropped from Presto"); } - MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getSource(), Optional.empty(), false, HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER); - metastore.dropTable(metastoreContext, handle.getSchemaName(), handle.getTableName(), true); + metastore.dropTable(getMetastoreContext(session), handle.getSchemaName(), handle.getTableName(), true); } @Override public void renameTable(ConnectorSession session, ConnectorTableHandle tableHandle, SchemaTableName newTable) { IcebergTableHandle handle = (IcebergTableHandle) tableHandle; - MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getSource(), Optional.empty(), false, HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER); - metastore.renameTable(metastoreContext, handle.getSchemaName(), handle.getTableName(), newTable.getSchemaName(), newTable.getTableName()); + metastore.renameTable(getMetastoreContext(session), handle.getSchemaName(), handle.getTableName(), newTable.getSchemaName(), newTable.getTableName()); } @Override protected ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName table) { - MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getSource(), Optional.empty(), false, HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER); - if (!metastore.getTable(metastoreContext, table.getSchemaName(), table.getTableName()).isPresent()) { + if (!metastore.getTable(getMetastoreContext(session), table.getSchemaName(), table.getTableName()).isPresent()) { throw new TableNotFoundException(table); } @@ -406,4 +424,55 @@ private List listSchemas(ConnectorSession session, String schemaNameOrNu } return ImmutableList.of(schemaNameOrNull); } + public IcebergTableHandle getTableHandleForStatisticsCollection(ConnectorSession session, SchemaTableName tableName, Map analyzeProperties) + { + return getTableHandle(session, tableName); + } + + @Override + public TableStatisticsMetadata getStatisticsCollectionMetadata(ConnectorSession session, ConnectorTableMetadata tableMetadata) + { + Set columnStatistics = tableMetadata.getColumns().stream() + .filter(column -> !column.isHidden()) + .flatMap(meta -> metastore.getSupportedColumnStatistics(getMetastoreContext(session), meta.getType()) + .stream() + .map(statType -> new ColumnStatisticMetadata(meta.getName(), statType))) + .collect(toImmutableSet()); + + Set tableStatistics = ImmutableSet.of(ROW_COUNT); + return new TableStatisticsMetadata(columnStatistics, tableStatistics, Collections.emptyList()); + } + + @Override + public ConnectorTableHandle beginStatisticsCollection(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return tableHandle; + } + + @Override + public void finishStatisticsCollection(ConnectorSession session, ConnectorTableHandle tableHandle, Collection computedStatistics) + { + IcebergTableHandle icebergTableHandle = (IcebergTableHandle) tableHandle; + MetastoreContext metastoreContext = getMetastoreContext(session); + Table table = metastore.getTable(metastoreContext, icebergTableHandle.getSchemaTableName().getSchemaName(), icebergTableHandle.getSchemaTableName().getTableName()) + .orElseThrow(() -> new TableNotFoundException(icebergTableHandle.getSchemaTableName())); + + List partitionColumns = table.getPartitionColumns(); + List partitionColumnNames = partitionColumns.stream() + .map(Column::getName) + .collect(toImmutableList()); + List hiveColumnHandles = hiveColumnHandles(table); + Map columnTypes = hiveColumnHandles.stream() + .filter(columnHandle -> !columnHandle.isHidden()) + .collect(toImmutableMap(HiveColumnHandle::getName, column -> column.getHiveType().getType(typeManager))); + + Map, ComputedStatistics> computedStatisticsMap = createComputedStatisticsToPartitionMap(computedStatistics, partitionColumnNames, columnTypes); + + // commit analyze to unpartitioned table + PartitionStatistics tableStatistics = createPartitionStatistics(session, columnTypes, computedStatisticsMap.get(ImmutableList.of()), timeZone); + metastore.updateTableStatistics(metastoreContext, + table.getDatabaseName(), + table.getTableName(), + oldStats -> updatePartitionStatistics(oldStats, tableStatistics)); + } }