Skip to content

Commit

Permalink
Dynamic bucket pruning on workers
Browse files Browse the repository at this point in the history
Co-Authored-By: James Sun <[email protected]>
  • Loading branch information
kewang1024 and highker committed Aug 14, 2020
1 parent 398a38e commit 0cd9fb1
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,11 @@ public static Optional<HiveBucketHandle> getHiveBucketHandle(Table table)

public static Optional<HiveBucketFilter> getHiveBucketFilter(Table table, TupleDomain<ColumnHandle> effectivePredicate)
{
Optional<HiveBucketProperty> hiveBucketProperty = table.getStorage().getBucketProperty();
return getHiveBucketFilter(table.getStorage().getBucketProperty(), table.getDataColumns(), effectivePredicate);
}

public static Optional<HiveBucketFilter> getHiveBucketFilter(Optional<HiveBucketProperty> hiveBucketProperty, List<Column> dataColumns, TupleDomain<ColumnHandle> effectivePredicate)
{
if (!hiveBucketProperty.isPresent()) {
return Optional.empty();
}
Expand All @@ -320,7 +324,7 @@ public static Optional<HiveBucketFilter> getHiveBucketFilter(Table table, TupleD
return Optional.empty();
}

Optional<Set<Integer>> buckets = getHiveBuckets(table, bindings.get());
Optional<Set<Integer>> buckets = getHiveBuckets(hiveBucketProperty, dataColumns, bindings.get());
if (buckets.isPresent()) {
return Optional.of(new HiveBucketFilter(buckets.get()));
}
Expand All @@ -337,7 +341,7 @@ public static Optional<HiveBucketFilter> getHiveBucketFilter(Table table, TupleD
}
ValueSet values = domain.get().getValues();
ImmutableSet.Builder<Integer> builder = ImmutableSet.builder();
int bucketCount = table.getStorage().getBucketProperty().get().getBucketCount();
int bucketCount = hiveBucketProperty.get().getBucketCount();
for (int i = 0; i < bucketCount; i++) {
if (values.containsValue((long) i)) {
builder.add(i);
Expand All @@ -346,12 +350,13 @@ public static Optional<HiveBucketFilter> getHiveBucketFilter(Table table, TupleD
return Optional.of(new HiveBucketFilter(builder.build()));
}

private static Optional<Set<Integer>> getHiveBuckets(Table table, Map<ColumnHandle, Set<NullableValue>> bindings)
private static Optional<Set<Integer>> getHiveBuckets(Optional<HiveBucketProperty> hiveBucketPropertyOptional, List<Column> dataColumns, Map<ColumnHandle, Set<NullableValue>> bindings)
{
if (bindings.isEmpty()) {
if (bindings.isEmpty() || !hiveBucketPropertyOptional.isPresent()) {
return Optional.empty();
}
HiveBucketProperty hiveBucketProperty = table.getStorage().getBucketProperty().get();

HiveBucketProperty hiveBucketProperty = hiveBucketPropertyOptional.get();
checkArgument(hiveBucketProperty.getBucketFunctionType().equals(HIVE_COMPATIBLE),
"bucketFunctionType is expected to be HIVE_COMPATIBLE, got: %s",
hiveBucketProperty.getBucketFunctionType());
Expand All @@ -360,7 +365,7 @@ private static Optional<Set<Integer>> getHiveBuckets(Table table, Map<ColumnHand
return Optional.empty();
}

Map<String, HiveType> hiveTypes = table.getDataColumns().stream()
Map<String, HiveType> hiveTypes = dataColumns.stream()
.collect(toImmutableMap(Column::getName, Column::getType));

// Verify the bucket column types are supported
Expand All @@ -382,7 +387,7 @@ private static Optional<Set<Integer>> getHiveBuckets(Table table, Map<ColumnHand
}

List<Set<NullableValue>> orderedBindings = orderedBindingsBuilder.build();
int bucketCount = table.getStorage().getBucketProperty().get().getBucketCount();
int bucketCount = hiveBucketProperty.getBucketCount();
List<TypeInfo> types = bucketColumns.stream()
.map(hiveTypes::get)
.map(HiveType::getTypeInfo)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.facebook.presto.common.Page;
import com.facebook.presto.spi.ConnectorPageSource;

public class HiveEmptySplitPageSource
implements ConnectorPageSource
{
@Override
public long getCompletedBytes()
{
return 0;
}

@Override
public long getCompletedPositions()
{
return 0;
}

@Override
public long getReadTimeNanos()
{
return 0;
}

@Override
public boolean isFinished()
{
return true;
}

@Override
public Page getNextPage()
{
return null;
}

@Override
public long getSystemMemoryUsage()
{
return 0;
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.Set;
import java.util.function.Function;

import static com.facebook.presto.hive.HiveBucketing.getHiveBucketFilter;
import static com.facebook.presto.hive.HiveCoercer.createCoercer;
import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.PARTITION_KEY;
import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.REGULAR;
Expand Down Expand Up @@ -156,6 +157,10 @@ public ConnectorPageSource createPageSource(
.transform(Subfield::getRootName)
.transform(hiveLayout.getPredicateColumns()::get);

if (shouldSkipBucket(hiveLayout, hiveSplit, splitContext)) {
return new HiveEmptySplitPageSource();
}

CacheQuota cacheQuota = generateCacheQuota(hiveSplit);
Optional<ConnectorPageSource> pageSource = createHivePageSource(
cursorProviders,
Expand Down Expand Up @@ -264,6 +269,10 @@ private static Optional<ConnectorPageSource> createSelectivePageSource(

RowExpression optimizedRemainingPredicate = rowExpressionCache.getUnchecked(new RowExpressionCacheKey(layout.getRemainingPredicate(), session));

if (shouldSkipBucket(layout, split, splitContext)) {
return Optional.of(new HiveEmptySplitPageSource());
}

CacheQuota cacheQuota = generateCacheQuota(split);
for (HiveSelectivePageSourceFactory pageSourceFactory : selectivePageSourceFactories) {
Optional<? extends ConnectorPageSource> pageSource = pageSourceFactory.createPageSource(
Expand Down Expand Up @@ -477,6 +486,20 @@ public static Optional<ConnectorPageSource> createHivePageSource(
return Optional.empty();
}

private static boolean shouldSkipBucket(HiveTableLayoutHandle hiveLayout, HiveSplit hiveSplit, SplitContext splitContext)
{
if (!splitContext.getDynamicFilterPredicate().isPresent()
|| !hiveSplit.getReadBucketNumber().isPresent()
|| !hiveSplit.getStorage().getBucketProperty().isPresent()) {
return false;
}

TupleDomain<ColumnHandle> dynamicFilter = splitContext.getDynamicFilterPredicate().get();
Optional<HiveBucketing.HiveBucketFilter> hiveBucketFilter = getHiveBucketFilter(hiveSplit.getStorage().getBucketProperty(), hiveLayout.getDataColumns(), dynamicFilter);

return hiveBucketFilter.map(filter -> !filter.getBucketsToKeep().contains(hiveSplit.getReadBucketNumber().getAsInt())).orElse(false);
}

private static BucketAdaptation toBucketAdaptation(BucketConversion conversion, List<ColumnMapping> columnMappings, OptionalInt tableBucketNumber, Function<ColumnMapping, Integer> bucketColumnIndexProducer)
{
Map<Integer, ColumnMapping> hiveIndexToBlockIndex = uniqueIndex(columnMappings, columnMapping -> columnMapping.getHiveColumnHandle().getHiveColumnIndex());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.facebook.airlift.testing.TempFile;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.Storage;
import com.facebook.presto.hive.metastore.StorageFormat;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SplitContext;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.testing.TestingConnectorSession;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.Test;

import java.io.File;
import java.io.IOException;
import java.util.Optional;
import java.util.OptionalInt;

import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.TypeSignature.parseTypeSignature;
import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
import static com.facebook.presto.hive.BucketFunctionType.HIVE_COMPATIBLE;
import static com.facebook.presto.hive.CacheQuotaRequirement.NO_CACHE_REQUIREMENT;
import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.HiveQueryRunner.HIVE_CATALOG;
import static com.facebook.presto.hive.HiveTestUtils.ROW_EXPRESSION_SERVICE;
import static com.facebook.presto.hive.HiveTestUtils.TYPE_MANAGER;
import static com.facebook.presto.hive.HiveTestUtils.createTestHdfsEnvironment;
import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveBatchPageSourceFactories;
import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveRecordCursorProvider;
import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveSelectivePageSourceFactories;
import static com.facebook.presto.hive.HiveType.HIVE_INT;
import static com.facebook.presto.hive.TestHivePageSink.getColumnHandles;
import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.NO_PREFERENCE;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;

public class TestDynamicBucketPruning
{
private static final String SCHEMA_NAME = "test";
private static final String TABLE_NAME = "test";
private static final Column BUCKET_COLUMN = new Column("l_orderkey", HIVE_INT, Optional.empty());

@Test
public void testDynamicBucketPruning()
{
HiveClientConfig config = new HiveClientConfig();
MetastoreClientConfig metastoreClientConfig = new MetastoreClientConfig();
HiveTransactionHandle transaction = new HiveTransactionHandle();
try (TempFile tempFile = new TempFile()) {
ConnectorPageSource emptyPageSource = createTestingPageSource(transaction, config, new SplitContext(false, getToSkipTupleDomain()), metastoreClientConfig, tempFile.file());
assertEquals(emptyPageSource.getClass(), HiveEmptySplitPageSource.class);

ConnectorPageSource nonEmptyPageSource = createTestingPageSource(transaction, config, new SplitContext(false, getToKeepTupleDomain()), metastoreClientConfig, tempFile.file());
assertEquals(nonEmptyPageSource.getClass(), HivePageSource.class);
}
catch (IOException e) {
e.printStackTrace();
fail();
}
}

private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle transaction, HiveClientConfig config, SplitContext splitContext, MetastoreClientConfig metastoreClientConfig, File outputFile)
{
HiveSplit split = new HiveSplit(
SCHEMA_NAME,
TABLE_NAME,
"",
"file:///" + outputFile.getAbsolutePath(),
0,
outputFile.length(),
outputFile.length(),
new Storage(
StorageFormat.create(config.getHiveStorageFormat().getSerDe(), config.getHiveStorageFormat().getInputFormat(), config.getHiveStorageFormat().getOutputFormat()),
"location",
Optional.of(new HiveBucketProperty(ImmutableList.of("l_orderkey"), 10, ImmutableList.of(), HIVE_COMPATIBLE, Optional.empty())),
false,
ImmutableMap.of(),
ImmutableMap.of()),
ImmutableList.of(),
ImmutableList.of(),
OptionalInt.of(1),
OptionalInt.of(1),
NO_PREFERENCE,
getColumnHandles().size(),
ImmutableMap.of(),
Optional.empty(),
false,
Optional.empty(),
NO_CACHE_REQUIREMENT,
Optional.empty(),
ImmutableMap.of());

TableHandle tableHandle = new TableHandle(
new ConnectorId(HIVE_CATALOG),
new HiveTableHandle(SCHEMA_NAME, TABLE_NAME),
transaction,
Optional.of(new HiveTableLayoutHandle(
new SchemaTableName(SCHEMA_NAME, TABLE_NAME),
ImmutableList.of(),
getColumnHandles().stream()
.map(column -> new Column(column.getName(), column.getHiveType(), Optional.empty()))
.collect(toImmutableList()),
ImmutableMap.of(),
TupleDomain.all(),
TRUE_CONSTANT,
ImmutableMap.of(),
TupleDomain.all(),
Optional.empty(),
Optional.empty(),
false,
"layout",
Optional.empty())));
HivePageSourceProvider provider = new HivePageSourceProvider(config, createTestHdfsEnvironment(config, metastoreClientConfig), getDefaultHiveRecordCursorProvider(config, metastoreClientConfig), getDefaultHiveBatchPageSourceFactories(config, metastoreClientConfig), getDefaultHiveSelectivePageSourceFactories(config, metastoreClientConfig), TYPE_MANAGER, ROW_EXPRESSION_SERVICE);
return provider.createPageSource(transaction, getSession(config), split, tableHandle.getLayout().get(), ImmutableList.copyOf(getColumnHandles()), splitContext);
}

private static TupleDomain<ColumnHandle> getToSkipTupleDomain()
{
return TupleDomain.withColumnDomains(
ImmutableMap.of(
new HiveColumnHandle(
BUCKET_COLUMN.getName(),
BUCKET_COLUMN.getType(),
parseTypeSignature(StandardTypes.VARCHAR),
0,
REGULAR,
Optional.empty()),
Domain.singleValue(INTEGER, 10L)));
}

private static TupleDomain<ColumnHandle> getToKeepTupleDomain()
{
return TupleDomain.withColumnDomains(
ImmutableMap.of(
new HiveColumnHandle(
BUCKET_COLUMN.getName(),
BUCKET_COLUMN.getType(),
parseTypeSignature(StandardTypes.VARCHAR),
0,
REGULAR,
Optional.empty()),
Domain.singleValue(INTEGER, 1L)));
}

private static TestingConnectorSession getSession(HiveClientConfig config)
{
return new TestingConnectorSession(new HiveSessionProperties(config, new OrcFileWriterConfig(), new ParquetFileWriterConfig()).getSessionProperties());
}
}

0 comments on commit 0cd9fb1

Please sign in to comment.