Skip to content

Commit

Permalink
Local dynamic filter support for Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
alexjo2144 authored and findepi committed Oct 14, 2021
1 parent a0626d1 commit 0ff73f3
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import static io.trino.plugin.iceberg.IcebergSessionProperties.isOrcBloomFiltersEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isOrcNestedLazy;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isUseFileSizeFromMetadata;
import static io.trino.plugin.iceberg.IcebergSplitManager.ICEBERG_DOMAIN_COMPACTION_THRESHOLD;
import static io.trino.plugin.iceberg.TypeConverter.ICEBERG_BINARY_TYPE;
import static io.trino.plugin.iceberg.TypeConverter.ORC_ICEBERG_ID_KEY;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
Expand Down Expand Up @@ -169,6 +170,9 @@ public ConnectorPageSource createPageSource(
.map(IcebergColumnHandle.class::cast)
.filter(column -> !partitionKeys.containsKey(column.getId()))
.collect(toImmutableList());
TupleDomain<IcebergColumnHandle> effectivePredicate = table.getUnenforcedPredicate()
.intersect(dynamicFilter.getCurrentPredicate().transformKeys(IcebergColumnHandle.class::cast))
.simplify(ICEBERG_DOMAIN_COMPACTION_THRESHOLD);

HdfsContext hdfsContext = new HdfsContext(session);
ConnectorPageSource dataPageSource = createDataPageSource(
Expand All @@ -180,7 +184,7 @@ public ConnectorPageSource createPageSource(
split.getFileSize(),
split.getFileFormat(),
regularColumns,
table.getUnenforcedPredicate());
effectivePredicate);

return new IcebergPageSource(icebergColumns, partitionKeys, dataPageSource, session.getTimeZoneKey());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.NullableValue;
Expand All @@ -39,6 +40,7 @@
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.TypeOperators;
import io.trino.sql.analyzer.FeaturesConfig;
import io.trino.testing.BaseConnectorTest;
import io.trino.testing.MaterializedResult;
import io.trino.testing.MaterializedRow;
Expand All @@ -47,6 +49,7 @@
import io.trino.testing.TestingConnectorBehavior;
import io.trino.testing.sql.TestTable;
import io.trino.testng.services.Flaky;
import io.trino.tpch.TpchTable;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
Expand Down Expand Up @@ -86,6 +89,7 @@
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.collect.MoreCollectors.onlyElement;
import static io.trino.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE;
import static io.trino.plugin.hive.HdfsEnvironment.HdfsContext;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.trino.plugin.hive.HiveTestUtils.SESSION;
Expand All @@ -103,6 +107,7 @@
import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder;
import static io.trino.testing.assertions.Assert.assertEquals;
import static io.trino.testing.assertions.Assert.assertEventually;
import static io.trino.tpch.TpchTable.LINE_ITEM;
import static io.trino.transaction.TransactionBuilder.transaction;
import static java.lang.String.format;
import static java.util.Collections.nCopies;
Expand Down Expand Up @@ -137,7 +142,10 @@ protected QueryRunner createQueryRunner()
return createIcebergQueryRunner(
Map.of(),
Map.of("iceberg.file-format", format.name()),
REQUIRED_TPCH_TABLES);
ImmutableList.<TpchTable<?>>builder()
.addAll(REQUIRED_TPCH_TABLES)
.add(LINE_ITEM)
.build());
}

@Override
Expand Down Expand Up @@ -2175,14 +2183,42 @@ public void testAllAvailableTypes()
new MapType(INTEGER, VARCHAR, new TypeOperators()))
.row(
ZonedDateTime.of(2021, 7, 23, 15, 43, 57, 348000000, ZoneId.of("UTC")),
new byte[]{00, 01, 02, -16, -2, -1},
new byte[] {00, 01, 02, -16, -2, -1},
"binary2/3values".getBytes(StandardCharsets.UTF_8),
new MaterializedRow(Arrays.asList(null, "this is a random value")),
Arrays.asList("uno", "dos", "tres"),
ImmutableMap.of(1, "ek", 2, "one"))
.build());
}

@Test
public void testLocalDynamicFilteringWithSelectiveBuildSizeJoin()
{
long fullTableScan = (Long) computeActual("SELECT count(*) FROM lineitem").getOnlyValue();
long numberOfFiles = (Long) computeActual("SELECT count(DISTINCT file_path) FROM \"lineitem$files\"").getOnlyValue();
Session session = Session.builder(getSession())
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, FeaturesConfig.JoinDistributionType.BROADCAST.name())
.build();

// TODO (https://github.com/trinodb/trino/issues/5172): remove assertEventually
assertEventually(() -> {
ResultWithQueryId<MaterializedResult> result = getDistributedQueryRunner().executeWithQueryId(
session,
"SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey AND orders.totalprice = 974.04");
assertEquals(result.getResult().getRowCount(), 1);

OperatorStats probeStats = searchScanFilterAndProjectOperatorStats(
result.getQueryId(),
table -> (table instanceof IcebergTableHandle) &&
((IcebergTableHandle) table).getSchemaTableName().equals(new SchemaTableName("tpch", "lineitem")));

// Assert no split level pruning occurs. If this starts failing a new totalprice may need to be selected
assertThat(probeStats.getTotalDrivers()).isEqualTo(numberOfFiles);
// Assert some lineitem rows were filtered out on file level
assertThat(probeStats.getInputPositions()).isLessThan(fullTableScan);
});
}

@Test(dataProvider = "testDataMappingSmokeTestDataProvider")
@Flaky(issue = "https://github.com/trinodb/trino/issues/5172", match = "Couldn't find operator summary, probably due to query statistic collection error")
public void testSplitPruningForFilterOnNonPartitionColumn(DataMappingTestSetup testSetup)
Expand Down

0 comments on commit 0ff73f3

Please sign in to comment.