Skip to content

Commit

Permalink
Add ability to do streaming aggregation for hive table scans
Browse files Browse the repository at this point in the history
As of now, we only support streaming aggregation for the cases where group-by keys
are the same as order-by keys, cases where group-by keys are a subset of order-by keys
are not supported for now.

Co-Authored-By: Zhan Yuan <[email protected]>
  • Loading branch information
2 people authored and highker committed Feb 23, 2022
1 parent 39d20a8 commit f66c85b
Show file tree
Hide file tree
Showing 6 changed files with 439 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ public class HiveClientConfig

private boolean s3SelectPushdownEnabled;
private int s3SelectPushdownMaxConnections = 500;
private boolean streamingAggregationEnabled;

private boolean isTemporaryStagingDirectoryEnabled = true;
private String temporaryStagingDirectoryPath = "/tmp/presto-${USER}";
Expand Down Expand Up @@ -1356,6 +1357,19 @@ public HiveClientConfig setS3SelectPushdownMaxConnections(int s3SelectPushdownMa
return this;
}

public boolean isStreamingAggregationEnabled()
{
return streamingAggregationEnabled;
}

@Config("hive.streaming-aggregation-enabled")
@ConfigDescription("Enable streaming aggregation execution")
public HiveClientConfig setStreamingAggregationEnabled(boolean streamingAggregationEnabled)
{
this.streamingAggregationEnabled = streamingAggregationEnabled;
return this;
}

public boolean isTemporaryStagingDirectoryEnabled()
{
return isTemporaryStagingDirectoryEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,15 @@
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.DiscretePredicates;
import com.facebook.presto.spi.InMemoryRecordSet;
import com.facebook.presto.spi.LocalProperty;
import com.facebook.presto.spi.MaterializedViewNotFoundException;
import com.facebook.presto.spi.MaterializedViewStatus;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.RecordCursor;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.SortingProperty;
import com.facebook.presto.spi.SystemTable;
import com.facebook.presto.spi.TableLayoutFilterCoverage;
import com.facebook.presto.spi.TableNotFoundException;
Expand Down Expand Up @@ -222,6 +224,7 @@
import static com.facebook.presto.hive.HiveSessionProperties.isSortedWriteToTempPathEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isSortedWritingEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isStatisticsEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isStreamingAggregationEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isUsePageFileForHiveUnsupportedType;
import static com.facebook.presto.hive.HiveSessionProperties.shouldCreateEmptyBucketFilesForTemporaryTable;
import static com.facebook.presto.hive.HiveStorageFormat.AVRO;
Expand Down Expand Up @@ -2758,14 +2761,46 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa
predicate = createPredicate(partitionColumns, partitions);
}

// Expose ordering property of the table.
ImmutableList.Builder<LocalProperty<ColumnHandle>> localProperties = ImmutableList.builder();
Optional<Set<ColumnHandle>> streamPartitionColumns = Optional.empty();
if (table.getStorage().getBucketProperty().isPresent() && !table.getStorage().getBucketProperty().get().getSortedBy().isEmpty()) {
ImmutableSet.Builder<ColumnHandle> streamPartitionColumnsBuilder = ImmutableSet.builder();

// streamPartitioningColumns is how we partition the data across splits.
// localProperty is how we partition the data within a split.
// 1. add partition columns to streamPartitionColumns
partitionColumns.forEach(streamPartitionColumnsBuilder::add);

// 2. add sorted columns to streamPartitionColumns and localProperties
HiveBucketProperty bucketProperty = table.getStorage().getBucketProperty().get();
Map<String, ColumnHandle> columnHandles = hiveColumnHandles(table).stream()
.collect(toImmutableMap(HiveColumnHandle::getName, identity()));
bucketProperty.getSortedBy().forEach(sortingColumn -> {
ColumnHandle columnHandle = columnHandles.get(sortingColumn.getColumnName());
localProperties.add(new SortingProperty<>(columnHandle, sortingColumn.getOrder().getSortOrder()));
streamPartitionColumnsBuilder.add(columnHandle);
});

// We currently only set streamPartitionColumns when it enables streaming aggregation and also it's eligible to enable streaming aggregation
// 1. When the bucket columns are the same as the prefix of the sort columns
// 2. When all rows of the same value group are guaranteed to be in the same split. We disable splitting a file when isStreamingAggregationEnabled is true to make sure the property is guaranteed.
List<String> sortColumns = bucketProperty.getSortedBy().stream().map(SortingColumn::getColumnName).collect(toImmutableList());
if (bucketProperty.getBucketedBy().size() <= sortColumns.size()
&& bucketProperty.getBucketedBy().containsAll(sortColumns.subList(0, bucketProperty.getBucketedBy().size()))
&& isStreamingAggregationEnabled(session)) {
streamPartitionColumns = Optional.of(streamPartitionColumnsBuilder.build());
}
}

return new ConnectorTableLayout(
hiveLayoutHandle,
Optional.empty(),
predicate,
tablePartitioning,
Optional.empty(),
streamPartitionColumns,
discretePredicates,
ImmutableList.of(),
localProperties.build(),
Optional.of(hiveLayoutHandle.getRemainingPredicate()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public final class HiveSessionProperties
public static final String PARTITION_STATISTICS_BASED_OPTIMIZATION_ENABLED = "partition_stats_based_optimization_enabled";
private static final String OPTIMIZE_MISMATCHED_BUCKET_COUNT = "optimize_mismatched_bucket_count";
private static final String S3_SELECT_PUSHDOWN_ENABLED = "s3_select_pushdown_enabled";
public static final String STREAMING_AGGREGATION_ENABLED = "streaming_aggregation_enabled";
public static final String SHUFFLE_PARTITIONED_COLUMNS_FOR_TABLE_WRITE = "shuffle_partitioned_columns_for_table_write";
public static final String TEMPORARY_STAGING_DIRECTORY_ENABLED = "temporary_staging_directory_enabled";
private static final String TEMPORARY_STAGING_DIRECTORY_PATH = "temporary_staging_directory_path";
Expand Down Expand Up @@ -414,6 +415,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
"S3 Select pushdown enabled",
hiveClientConfig.isS3SelectPushdownEnabled(),
false),
booleanProperty(
STREAMING_AGGREGATION_ENABLED,
"Enable streaming aggregation execution",
hiveClientConfig.isStreamingAggregationEnabled(),
false),
booleanProperty(
TEMPORARY_STAGING_DIRECTORY_ENABLED,
"Should use temporary staging directory for write operations",
Expand Down Expand Up @@ -889,6 +895,11 @@ public static boolean isS3SelectPushdownEnabled(ConnectorSession session)
return session.getProperty(S3_SELECT_PUSHDOWN_ENABLED, Boolean.class);
}

public static boolean isStreamingAggregationEnabled(ConnectorSession session)
{
return session.getProperty(STREAMING_AGGREGATION_ENABLED, Boolean.class);
}

public static boolean isStatisticsEnabled(ConnectorSession session)
{
return session.getProperty(STATISTICS_ENABLED, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import static com.facebook.presto.hive.HiveSessionProperties.getMaxInitialSplitSize;
import static com.facebook.presto.hive.HiveSessionProperties.getNodeSelectionStrategy;
import static com.facebook.presto.hive.HiveSessionProperties.isFileSplittable;
import static com.facebook.presto.hive.HiveSessionProperties.isStreamingAggregationEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isUseListDirectoryCache;
import static com.facebook.presto.hive.HiveUtil.getFooterCount;
import static com.facebook.presto.hive.HiveUtil.getHeaderCount;
Expand Down Expand Up @@ -261,11 +262,13 @@ public ListenableFuture<?> loadPartition(HivePartitionMetadata partition, HiveSp
return addSplitsToSource(splits, splitFactory, hiveSplitSource, stopped);
}
PathFilter pathFilter = isHudiParquetInputFormat(inputFormat) ? hoodiePathFilterLoadingCache.getUnchecked(configuration) : path1 -> true;
// Streaming aggregation works at the granularity of individual files
// S3 Select pushdown works at the granularity of individual S3 objects,
// Partial aggregation pushdown works at the granularity of individual files
// therefore we must not split files when either is enabled.
// Skip header / footer lines are not splittable except for a special case when skip.header.line.count=1
boolean splittable = isFileSplittable(session) &&
!isStreamingAggregationEnabled(session) &&
!s3SelectPushdownEnabled &&
!partialAggregationsPushedDown &&
getFooterCount(schema) == 0 && getHeaderCount(schema) <= 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public void testDefaults()
.setPartitionStatisticsBasedOptimizationEnabled(false)
.setS3SelectPushdownEnabled(false)
.setS3SelectPushdownMaxConnections(500)
.setStreamingAggregationEnabled(false)
.setTemporaryStagingDirectoryEnabled(true)
.setTemporaryStagingDirectoryPath("/tmp/presto-${USER}")
.setTemporaryTableSchema("default")
Expand Down Expand Up @@ -247,6 +248,7 @@ public void testExplicitPropertyMappings()
.put("hive.partition-statistics-based-optimization-enabled", "true")
.put("hive.s3select-pushdown.enabled", "true")
.put("hive.s3select-pushdown.max-connections", "1234")
.put("hive.streaming-aggregation-enabled", "true")
.put("hive.temporary-staging-directory-enabled", "false")
.put("hive.temporary-staging-directory-path", "updated")
.put("hive.temporary-table-schema", "other")
Expand Down Expand Up @@ -367,6 +369,7 @@ public void testExplicitPropertyMappings()
.setPartitionStatisticsBasedOptimizationEnabled(true)
.setS3SelectPushdownEnabled(true)
.setS3SelectPushdownMaxConnections(1234)
.setStreamingAggregationEnabled(true)
.setTemporaryStagingDirectoryEnabled(false)
.setTemporaryStagingDirectoryPath("updated")
.setTemporaryTableSchema("other")
Expand Down
Loading

0 comments on commit f66c85b

Please sign in to comment.