Skip to content

Commit

Permalink
[Iceberg] Write Presto statistics to HMS
Browse files Browse the repository at this point in the history
This change adds the ability for Hive-configured Iceberg
connectors to write calculated table statistics from
ANALYZE into the Hive metastore.
  • Loading branch information
ZacBlanco authored and tdcmeehan committed Oct 13, 2023
1 parent 6a582e8 commit 45bcbcf
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 87 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.hive.metastore.HiveColumnStatistics;
import com.facebook.presto.hive.metastore.PartitionStatistics;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.statistics.ColumnStatisticMetadata;
import com.facebook.presto.spi.statistics.ComputedStatistics;
import com.google.common.base.VerifyException;
import com.google.common.collect.ImmutableMap;
import org.joda.time.DateTimeZone;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.hive.metastore.Statistics.fromComputedStatistics;
import static com.facebook.presto.spi.statistics.TableStatisticType.ROW_COUNT;
import static com.google.common.base.Verify.verify;

public class HiveStatisticsUtil
{
private HiveStatisticsUtil()
{
}

public static PartitionStatistics createPartitionStatistics(
ConnectorSession session,
HiveBasicStatistics basicStatistics,
Map<String, Type> columnTypes,
Map<ColumnStatisticMetadata, Block> computedColumnStatistics,
DateTimeZone timeZone)
{
long rowCount = basicStatistics.getRowCount().orElseThrow(() -> new IllegalArgumentException("rowCount not present"));
Map<String, HiveColumnStatistics> columnStatistics = fromComputedStatistics(
session,
timeZone,
computedColumnStatistics,
columnTypes,
rowCount);
return new PartitionStatistics(basicStatistics, columnStatistics);
}

public static PartitionStatistics createPartitionStatistics(
ConnectorSession session,
Map<String, Type> columnTypes,
ComputedStatistics computedStatistics,
DateTimeZone timeZone)
{
Map<ColumnStatisticMetadata, Block> computedColumnStatistics = computedStatistics.getColumnStatistics();

Block rowCountBlock = Optional.ofNullable(computedStatistics.getTableStatistics().get(ROW_COUNT))
.orElseThrow(() -> new VerifyException("rowCount not present"));
verify(!rowCountBlock.isNull(0), "rowCount must never be null");
long rowCount = BIGINT.getLong(rowCountBlock, 0);
HiveBasicStatistics rowCountOnlyBasicStatistics = new HiveBasicStatistics(OptionalLong.empty(), OptionalLong.of(rowCount), OptionalLong.empty(), OptionalLong.empty());
return createPartitionStatistics(session, rowCountOnlyBasicStatistics, columnTypes, computedColumnStatistics, timeZone);
}

public static Map<ColumnStatisticMetadata, Block> getColumnStatistics(Map<List<String>, ComputedStatistics> statistics, List<String> partitionValues)
{
return Optional.ofNullable(statistics.get(partitionValues))
.map(ComputedStatistics::getColumnStatistics)
.orElse(ImmutableMap.of());
}

// TODO: Collect file count, on-disk size and in-memory size during ANALYZE
/**
* This method updates old {@link PartitionStatistics} with new statistics, only if the new
* partition stats are not empty. This method always overwrites each of the
* {@link HiveColumnStatistics} contained in the new partition statistics.
*
* @param oldPartitionStats old version of partition statistics
* @param newPartitionStats new version of partition statistics
* @return updated partition statistics
*/
public static PartitionStatistics updatePartitionStatistics(PartitionStatistics oldPartitionStats, PartitionStatistics newPartitionStats)
{
HiveBasicStatistics oldBasicStatistics = oldPartitionStats.getBasicStatistics();
HiveBasicStatistics newBasicStatistics = newPartitionStats.getBasicStatistics();
HiveBasicStatistics updatedBasicStatistics = new HiveBasicStatistics(
firstPresent(newBasicStatistics.getFileCount(), oldBasicStatistics.getFileCount()),
firstPresent(newBasicStatistics.getRowCount(), oldBasicStatistics.getRowCount()),
firstPresent(newBasicStatistics.getInMemoryDataSizeInBytes(), oldBasicStatistics.getInMemoryDataSizeInBytes()),
firstPresent(newBasicStatistics.getOnDiskDataSizeInBytes(), oldBasicStatistics.getOnDiskDataSizeInBytes()));
return new PartitionStatistics(updatedBasicStatistics, newPartitionStats.getColumnStatistics());
}

private static OptionalLong firstPresent(OptionalLong first, OptionalLong second)
{
return first.isPresent() ? first : second;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.facebook.presto.hive.ColumnConverterProvider;
import com.facebook.presto.hive.HdfsContext;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveBasicStatistics;
import com.facebook.presto.hive.HiveTableHandle;
import com.facebook.presto.hive.HiveType;
import com.facebook.presto.hive.LocationHandle.WriteMode;
Expand Down Expand Up @@ -63,7 +62,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -75,6 +73,7 @@
import static com.facebook.presto.hive.HiveErrorCode.HIVE_METASTORE_USER_ERROR;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_PATH_ALREADY_EXISTS;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_TABLE_DROPPED_DURING_QUERY;
import static com.facebook.presto.hive.HiveStatisticsUtil.updatePartitionStatistics;
import static com.facebook.presto.hive.LocationHandle.WriteMode.DIRECT_TO_TARGET_NEW_DIRECTORY;
import static com.facebook.presto.hive.metastore.HiveCommitHandle.EMPTY_HIVE_COMMIT_HANDLE;
import static com.facebook.presto.hive.metastore.HivePrivilegeInfo.HivePrivilege.OWNERSHIP;
Expand Down Expand Up @@ -398,26 +397,6 @@ public synchronized void setPartitionStatistics(MetastoreContext metastoreContex
});
}

// For HiveBasicStatistics, we only overwrite the original statistics if the new one is not empty.
// For HiveColumnStatistics, we always overwrite every statistics.
// TODO: Collect file count, on-disk size and in-memory size during ANALYZE
private PartitionStatistics updatePartitionStatistics(PartitionStatistics oldPartitionStats, PartitionStatistics newPartitionStats)
{
HiveBasicStatistics oldBasicStatistics = oldPartitionStats.getBasicStatistics();
HiveBasicStatistics newBasicStatistics = newPartitionStats.getBasicStatistics();
HiveBasicStatistics updatedBasicStatistics = new HiveBasicStatistics(
firstPresent(newBasicStatistics.getFileCount(), oldBasicStatistics.getFileCount()),
firstPresent(newBasicStatistics.getRowCount(), oldBasicStatistics.getRowCount()),
firstPresent(newBasicStatistics.getInMemoryDataSizeInBytes(), oldBasicStatistics.getInMemoryDataSizeInBytes()),
firstPresent(newBasicStatistics.getOnDiskDataSizeInBytes(), oldBasicStatistics.getOnDiskDataSizeInBytes()));
return new PartitionStatistics(updatedBasicStatistics, newPartitionStats.getColumnStatistics());
}

private static OptionalLong firstPresent(OptionalLong first, OptionalLong second)
{
return first.isPresent() ? first : second;
}

/**
* {@code currentLocation} needs to be supplied if a writePath exists for the table.
*/
Expand Down Expand Up @@ -837,7 +816,7 @@ public synchronized void addPartition(
* @param partition The new partition object to be added
* @param currentLocation The path for which the partition is added in the table
* @param statistics The basic statistics and column statistics for the added partition
* @param noNeedToValidatePath check metastore file path. True for no check which is enabled by the sync partition code path only
* @param isPathValidationNeeded check metastore file path. True for no check which is enabled by the sync partition code path only
*/
public synchronized void addPartition(
ConnectorSession session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.json.smile.SmileCodec;
import com.facebook.presto.common.Subfield;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.predicate.NullableValue;
import com.facebook.presto.common.predicate.TupleDomain;
Expand All @@ -29,7 +28,6 @@
import com.facebook.presto.hive.PartitionUpdate.FileWriteInfo;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.Database;
import com.facebook.presto.hive.metastore.HiveColumnStatistics;
import com.facebook.presto.hive.metastore.HivePrivilegeInfo;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.hive.metastore.MetastoreUtil;
Expand Down Expand Up @@ -102,7 +100,6 @@
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.base.Suppliers;
import com.google.common.base.VerifyException;
import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -152,7 +149,6 @@
import static com.facebook.airlift.concurrent.MoreFutures.toCompletableFuture;
import static com.facebook.airlift.json.JsonCodec.jsonCodec;
import static com.facebook.presto.common.predicate.TupleDomain.withColumnDomains;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
import static com.facebook.presto.common.type.Chars.isCharType;
import static com.facebook.presto.common.type.DateType.DATE;
Expand Down Expand Up @@ -232,6 +228,8 @@
import static com.facebook.presto.hive.HiveSessionProperties.isStatisticsEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isUsePageFileForHiveUnsupportedType;
import static com.facebook.presto.hive.HiveSessionProperties.shouldCreateEmptyBucketFilesForTemporaryTable;
import static com.facebook.presto.hive.HiveStatisticsUtil.createPartitionStatistics;
import static com.facebook.presto.hive.HiveStatisticsUtil.getColumnStatistics;
import static com.facebook.presto.hive.HiveStorageFormat.AVRO;
import static com.facebook.presto.hive.HiveStorageFormat.DWRF;
import static com.facebook.presto.hive.HiveStorageFormat.ORC;
Expand Down Expand Up @@ -320,7 +318,6 @@
import static com.facebook.presto.hive.metastore.Statistics.ReduceOperator.ADD;
import static com.facebook.presto.hive.metastore.Statistics.createComputedStatisticsToPartitionMap;
import static com.facebook.presto.hive.metastore.Statistics.createEmptyPartitionStatistics;
import static com.facebook.presto.hive.metastore.Statistics.fromComputedStatistics;
import static com.facebook.presto.hive.metastore.Statistics.reduce;
import static com.facebook.presto.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static com.facebook.presto.hive.metastore.thrift.ThriftMetastoreUtil.listEnabledPrincipals;
Expand Down Expand Up @@ -1497,7 +1494,7 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH

if (partitionColumns.isEmpty()) {
// commit analyze to unpartitioned table
metastore.setTableStatistics(metastoreContext, table, createPartitionStatistics(session, columnTypes, computedStatisticsMap.get(ImmutableList.<String>of())));
metastore.setTableStatistics(metastoreContext, table, createPartitionStatistics(session, columnTypes, computedStatisticsMap.get(ImmutableList.<String>of()), timeZone));
}
else {
List<List<String>> partitionValuesList;
Expand Down Expand Up @@ -1527,7 +1524,7 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH
}
else {
usedComputedStatistics++;
partitionStatistics.put(partitionValues, createPartitionStatistics(session, columnTypes, collectedStatistics));
partitionStatistics.put(partitionValues, createPartitionStatistics(session, columnTypes, collectedStatistics, timeZone));
}
}
verify(usedComputedStatistics == computedStatistics.size(), "All computed statistics must be used");
Expand Down Expand Up @@ -1702,7 +1699,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
.map(PartitionUpdate::getStatistics)
.reduce((first, second) -> reduce(first, second, ADD))
.orElse(createZeroStatistics());
tableStatistics = createPartitionStatistics(session, basicStatistics, columnTypes, getColumnStatistics(partitionComputedStatistics, ImmutableList.of()));
tableStatistics = createPartitionStatistics(session, basicStatistics, columnTypes, getColumnStatistics(partitionComputedStatistics, ImmutableList.of()), timeZone);
}
else {
tableStatistics = new PartitionStatistics(createEmptyStatistics(), ImmutableMap.of());
Expand All @@ -1728,7 +1725,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
session,
update.getStatistics(),
columnTypes,
getColumnStatistics(partitionComputedStatistics, partition.getValues()));
getColumnStatistics(partitionComputedStatistics, partition.getValues()), timeZone);
metastore.addPartition(
session,
handle.getSchemaName(),
Expand Down Expand Up @@ -2035,7 +2032,7 @@ private Optional<ConnectorOutputMetadata> finishInsertInternal(ConnectorSession
session,
partitionUpdate.getStatistics(),
columnTypes,
getColumnStatistics(partitionComputedStatistics, ImmutableList.of()));
getColumnStatistics(partitionComputedStatistics, ImmutableList.of()), timeZone);
metastore.finishInsertIntoExistingTable(
session,
handle.getSchemaName(),
Expand All @@ -2054,7 +2051,7 @@ else if (partitionUpdate.getUpdateMode() == APPEND) {
session,
partitionUpdate.getStatistics(),
columnTypes,
getColumnStatistics(partitionComputedStatistics, partitionValues));
getColumnStatistics(partitionComputedStatistics, partitionValues), timeZone);
metastore.finishInsertIntoExistingPartition(
session,
handle.getSchemaName(),
Expand Down Expand Up @@ -2113,7 +2110,8 @@ else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode
session,
partitionUpdate.getStatistics(),
columnTypes,
getColumnStatistics(partitionComputedStatistics, partition.getValues()));
getColumnStatistics(partitionComputedStatistics, partition.getValues()),
timeZone);

// New partition or overwriting existing partition by staging and moving the new partition
if (!isExistingPartition || handle.getLocationHandle().getWriteMode() != DIRECT_TO_TARGET_EXISTING_DIRECTORY) {
Expand Down Expand Up @@ -2182,44 +2180,6 @@ private List<String> getTargetFileNames(List<FileWriteInfo> fileWriteInfos)
.collect(toImmutableList());
}

private PartitionStatistics createPartitionStatistics(
ConnectorSession session,
Map<String, Type> columnTypes,
ComputedStatistics computedStatistics)
{
Map<ColumnStatisticMetadata, Block> computedColumnStatistics = computedStatistics.getColumnStatistics();

Block rowCountBlock = Optional.ofNullable(computedStatistics.getTableStatistics().get(ROW_COUNT))
.orElseThrow(() -> new VerifyException("rowCount not present"));
verify(!rowCountBlock.isNull(0), "rowCount must never be null");
long rowCount = BIGINT.getLong(rowCountBlock, 0);
HiveBasicStatistics rowCountOnlyBasicStatistics = new HiveBasicStatistics(OptionalLong.empty(), OptionalLong.of(rowCount), OptionalLong.empty(), OptionalLong.empty());
return createPartitionStatistics(session, rowCountOnlyBasicStatistics, columnTypes, computedColumnStatistics);
}

private PartitionStatistics createPartitionStatistics(
ConnectorSession session,
HiveBasicStatistics basicStatistics,
Map<String, Type> columnTypes,
Map<ColumnStatisticMetadata, Block> computedColumnStatistics)
{
long rowCount = basicStatistics.getRowCount().orElseThrow(() -> new IllegalArgumentException("rowCount not present"));
Map<String, HiveColumnStatistics> columnStatistics = fromComputedStatistics(
session,
timeZone,
computedColumnStatistics,
columnTypes,
rowCount);
return new PartitionStatistics(basicStatistics, columnStatistics);
}

private static Map<ColumnStatisticMetadata, Block> getColumnStatistics(Map<List<String>, ComputedStatistics> statistics, List<String> partitionValues)
{
return Optional.ofNullable(statistics.get(partitionValues))
.map(ComputedStatistics::getColumnStatistics)
.orElse(ImmutableMap.of());
}

private Map<String, Optional<Partition>> getExistingPartitionsByNames(MetastoreContext metastoreContext, String databaseName, String tableName, List<PartitionUpdate> partitionUpdates)
{
ImmutableMap.Builder<String, Optional<Partition>> existingPartitions = ImmutableMap.builder();
Expand Down
5 changes: 5 additions & 0 deletions presto-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@
<artifactId>presto-hive-metastore</artifactId>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-memory-context</artifactId>
Expand Down
Loading

0 comments on commit 45bcbcf

Please sign in to comment.