diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index feb488ab9a32..02e1a2851934 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -114,6 +114,7 @@ import static io.trino.plugin.iceberg.IcebergSessionProperties.isOrcBloomFiltersEnabled; import static io.trino.plugin.iceberg.IcebergSessionProperties.isOrcNestedLazy; import static io.trino.plugin.iceberg.IcebergSessionProperties.isUseFileSizeFromMetadata; +import static io.trino.plugin.iceberg.IcebergSplitManager.ICEBERG_DOMAIN_COMPACTION_THRESHOLD; import static io.trino.plugin.iceberg.TypeConverter.ICEBERG_BINARY_TYPE; import static io.trino.plugin.iceberg.TypeConverter.ORC_ICEBERG_ID_KEY; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; @@ -169,6 +170,9 @@ public ConnectorPageSource createPageSource( .map(IcebergColumnHandle.class::cast) .filter(column -> !partitionKeys.containsKey(column.getId())) .collect(toImmutableList()); + TupleDomain effectivePredicate = table.getUnenforcedPredicate() + .intersect(dynamicFilter.getCurrentPredicate().transformKeys(IcebergColumnHandle.class::cast)) + .simplify(ICEBERG_DOMAIN_COMPACTION_THRESHOLD); HdfsContext hdfsContext = new HdfsContext(session); ConnectorPageSource dataPageSource = createDataPageSource( @@ -180,7 +184,7 @@ public ConnectorPageSource createPageSource( split.getFileSize(), split.getFileFormat(), regularColumns, - table.getUnenforcedPredicate()); + effectivePredicate); return new IcebergPageSource(icebergColumns, partitionKeys, dataPageSource, session.getTimeZoneKey()); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index e26aee6153a0..34ad4fa3914c 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -29,6 +29,7 @@ import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.Constraint; import io.trino.spi.connector.ConstraintApplicationResult; +import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableNotFoundException; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.NullableValue; @@ -39,6 +40,7 @@ import io.trino.spi.type.MapType; import io.trino.spi.type.RowType; import io.trino.spi.type.TypeOperators; +import io.trino.sql.analyzer.FeaturesConfig; import io.trino.testing.BaseConnectorTest; import io.trino.testing.MaterializedResult; import io.trino.testing.MaterializedRow; @@ -47,6 +49,7 @@ import io.trino.testing.TestingConnectorBehavior; import io.trino.testing.sql.TestTable; import io.trino.testng.services.Flaky; +import io.trino.tpch.TpchTable; import org.apache.avro.Schema; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileWriter; @@ -86,6 +89,7 @@ import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.collect.MoreCollectors.onlyElement; +import static io.trino.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; import static io.trino.plugin.hive.HdfsEnvironment.HdfsContext; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; import static io.trino.plugin.hive.HiveTestUtils.SESSION; @@ -103,6 +107,7 @@ import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder; import static io.trino.testing.assertions.Assert.assertEquals; import static io.trino.testing.assertions.Assert.assertEventually; +import static io.trino.tpch.TpchTable.LINE_ITEM; import static io.trino.transaction.TransactionBuilder.transaction; import static java.lang.String.format; import static java.util.Collections.nCopies; @@ -137,7 +142,10 @@ protected QueryRunner createQueryRunner() return createIcebergQueryRunner( Map.of(), Map.of("iceberg.file-format", format.name()), - REQUIRED_TPCH_TABLES); + ImmutableList.>builder() + .addAll(REQUIRED_TPCH_TABLES) + .add(LINE_ITEM) + .build()); } @Override @@ -2175,7 +2183,7 @@ public void testAllAvailableTypes() new MapType(INTEGER, VARCHAR, new TypeOperators())) .row( ZonedDateTime.of(2021, 7, 23, 15, 43, 57, 348000000, ZoneId.of("UTC")), - new byte[]{00, 01, 02, -16, -2, -1}, + new byte[] {00, 01, 02, -16, -2, -1}, "binary2/3values".getBytes(StandardCharsets.UTF_8), new MaterializedRow(Arrays.asList(null, "this is a random value")), Arrays.asList("uno", "dos", "tres"), @@ -2183,6 +2191,34 @@ public void testAllAvailableTypes() .build()); } + @Test + public void testLocalDynamicFilteringWithSelectiveBuildSizeJoin() + { + long fullTableScan = (Long) computeActual("SELECT count(*) FROM lineitem").getOnlyValue(); + long numberOfFiles = (Long) computeActual("SELECT count(DISTINCT file_path) FROM \"lineitem$files\"").getOnlyValue(); + Session session = Session.builder(getSession()) + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, FeaturesConfig.JoinDistributionType.BROADCAST.name()) + .build(); + + // TODO (https://github.com/trinodb/trino/issues/5172): remove assertEventually + assertEventually(() -> { + ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( + session, + "SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey AND orders.totalprice = 974.04"); + assertEquals(result.getResult().getRowCount(), 1); + + OperatorStats probeStats = searchScanFilterAndProjectOperatorStats( + result.getQueryId(), + table -> (table instanceof IcebergTableHandle) && + ((IcebergTableHandle) table).getSchemaTableName().equals(new SchemaTableName("tpch", "lineitem"))); + + // Assert no split level pruning occurs. If this starts failing a new totalprice may need to be selected + assertThat(probeStats.getTotalDrivers()).isEqualTo(numberOfFiles); + // Assert some lineitem rows were filtered out on file level + assertThat(probeStats.getInputPositions()).isLessThan(fullTableScan); + }); + } + @Test(dataProvider = "testDataMappingSmokeTestDataProvider") @Flaky(issue = "https://github.com/trinodb/trino/issues/5172", match = "Couldn't find operator summary, probably due to query statistic collection error") public void testSplitPruningForFilterOnNonPartitionColumn(DataMappingTestSetup testSetup)