Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: add unit tests for range distribution on bucket partition column #11033

Merged
merged 1 commit into from
Aug 29, 2024

Conversation

stevenzwu
Copy link
Contributor

Also started to use the new DataGeneratorSource which is only available in 1.19 and after. hence, didn't add the unit test to 1.18.

@stevenzwu stevenzwu requested a review from pvary August 28, 2024 04:29
@github-actions github-actions bot added the flink label Aug 28, 2024
Comment on lines +210 to +212
// It takes 2 checkpoint cycle for statistics collection and application
// of the globally aggregated statistics in the range partitioner.
// The last two checkpoints should have range shuffle applied
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How stable is this test?
Do I understand correctly, that relaxed the conditions so the test will never fail if the feature is correct?
Would this test fail on a slow machine (like the CI) with the feature turned off?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the relaxed condition is from maxAddedDataFilesPerCheckpoint as NUM_BUCKETS + parallelism, which would be guaranteed by the range partition. In some cases, it can be smaller than that as NUM_BUCKETS or parallelism for divisible scenarios.

this test is guaranteed to fail without range partition, as each writer subtask can write NUM_BUCKETS of files. the total number of data files per commit can get up to NUM_BUCKETS * parallelism.

@stevenzwu stevenzwu merged commit 4b71d40 into apache:main Aug 29, 2024
20 checks passed
@stevenzwu stevenzwu deleted the range-distribution-bucketing branch August 29, 2024 16:27
@manuzhang
Copy link
Collaborator

It looks the new UT is flaky https://github.com/apache/iceberg/actions/runs/10825717894/job/30035219384

TestFlinkIcebergSinkRangeDistributionBucketing > testBucketNumberLessThanWriterParallelismNotDivisible() FAILED
    java.lang.AssertionError: 
    Expecting size of:
      [GenericDataFile{content=data, file_path=file:/tmp/junit5_hadoop_catalog-6564358266702856210/ecb89cf8-f045-4560-bf48-17bac055481d/default/t/data/ts_hour=2024-09-12-07/uuid_bucket=0/00003-0-39af39f3-5278-4295-99a1-9720958db8c9-00009.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{ts_hour=479479, uuid_bucket=0}, record_count=12, file_size_in_bytes=1243, column_sizes=org.apache.iceberg.util.SerializableMap@1a2, value_counts=org.apache.iceberg.util.SerializableMap@2a, null_value_counts=org.apache.iceberg.util.SerializableMap@6, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@f366a4f, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@bf4ec4e7, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=0, data_sequence_number=4, file_sequence_number=4},
        GenericDataFile{content=data, file_path=file:/tmp/junit5_hadoop_catalog-6564358266702856210/ecb89cf8-f045-4560-bf48-17bac055481d/default/t/data/ts_hour=2024-09-12-07/uuid_bucket=1/00003-0-39af39f3-5278-4295-99a1-9720958db8c9-00010.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{ts_hour=479479, uuid_bucket=1}, record_count=8, file_size_in_bytes=1161, column_sizes=org.apache.iceberg.util.SerializableMap@14c, value_counts=org.apache.iceberg.util.SerializableMap@1e, null_value_counts=org.apache.iceberg.util.SerializableMap@6, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@3c0e271f, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@6dbe6441, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=0, data_sequence_number=4, file_sequence_number=4},
        GenericDataFile{content=data, file_path=file:/tmp/junit5_hadoop_catalog-6564358266702856210/ecb89cf8-f045-4560-bf48-17bac055481d/default/t/data/ts_hour=2024-09-12-07/uuid_bucket=3/00003-0-39af39f3-5278-4295-99a1-9720958db8c9-00008.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{ts_hour=479479, uuid_bucket=3}, record_count=20, file_size_in_bytes=1399, column_sizes=org.apache.iceberg.util.SerializableMap@23e, value_counts=org.apache.iceberg.util.SerializableMap@42, null_value_counts=org.apache.iceberg.util.SerializableMap@6, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@586925d, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@2b469148, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=0, data_sequence_number=4, file_sequence_number=4},
        GenericDataFile{content=data, file_path=file:/tmp/junit5_hadoop_catalog-6564358266702856210/ecb89cf8-f045-4560-bf48-17bac055481d/default/t/data/ts_hour=2024-09-12-07/uuid_bucket=0/00002-0-73600ead-4c85-4fae-b82b-8ab7eac90cb5-00006.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{ts_hour=479479, uuid_bucket=0}, record_count=13, file_size_in_bytes=1265, column_sizes=org.apache.iceberg.util.SerializableMap@1b4, value_counts=org.apache.iceberg.util.SerializableMap@29, null_value_counts=org.apache.iceberg.util.SerializableMap@6, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@d31dc682, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@8745843, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=0, data_sequence_number=4, file_sequence_number=4},
        GenericDataFile{content=data, file_path=file:/tmp/junit5_hadoop_catalog-6564358266702856210/ecb89cf8-f045-4560-bf48-17bac055481d/default/t/data/ts_hour=2024-09-12-07/uuid_bucket=1/00002-0-73600ead-4c85-4fae-b82b-8ab7eac90cb5-00007.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{ts_hour=479479, uuid_bucket=1}, record_count=7, file_size_in_bytes=1140, column_sizes=org.apache.iceberg.util.SerializableMap@139, value_counts=org.apache.iceberg.util.SerializableMap@f, null_value_counts=org.apache.iceberg.util.SerializableMap@6, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@b8e44b2e, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@f441240a, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=0, data_sequence_number=4, file_sequence_number=4},
        GenericDataFile{content=data, file_path=file:/tmp/junit5_hadoop_catalog-6564358266702856210/ecb89cf8-f045-4560-bf48-17bac055481d/default/t/data/ts_hour=2024-09-12-07/uuid_bucket=2/00002-0-73600ead-4c85-4fae-b82b-8ab7eac90cb5-00008.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{ts_hour=479479, uuid_bucket=2}, record_count=14, file_size_in_bytes=1282, column_sizes=org.apache.iceberg.util.SerializableMap@1c3, value_counts=org.apache.iceberg.util.SerializableMap@28, null_value_counts=org.apache.iceberg.util.SerializableMap@6, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@4c23e153, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@38ed4c3b, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=0, data_sequence_number=4, file_sequence_number=4},
        GenericDataFile{content=data, file_path=file:/tmp/junit5_hadoop_catalog-6564358266702856210/ecb89cf8-f045-4560-bf48-17bac055481d/default/t/data/ts_hour=2024-09-12-07/uuid_bucket=0/00001-0-851323b3-f50a-4bdb-b87e-f3b7563fea20-00007.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{ts_hour=479479, uuid_bucket=0}, record_count=11, file_size_in_bytes=1221, column_sizes=org.apache.iceberg.util.SerializableMap@188, value_counts=org.apache.iceberg.util.SerializableMap@1b, null_value_counts=org.apache.iceberg.util.SerializableMap@6, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@b7fe886e, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@787336c7, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=0, data_sequence_number=4, file_sequence_number=4},
        GenericDataFile{content=data, file_path=file:/tmp/junit5_hadoop_catalog-6564358266702856210/ecb89cf8-f045-4560-bf48-17bac055481d/default/t/data/ts_hour=2024-09-12-07/uuid_bucket=1/00001-0-851323b3-f50a-4bdb-b87e-f3b7563fea20-00009.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{ts_hour=479479, uuid_bucket=1}, record_count=10, file_size_in_bytes=1204, column_sizes=org.apache.iceberg.util.SerializableMap@179, value_counts=org.apache.iceberg.util.SerializableMap@1c, null_value_counts=org.apache.iceberg.util.SerializableMap@6, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@36958f8c, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@e2d3ea2, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=0, data_sequence_number=4, file_sequence_number=4},
        GenericDataFile{content=data, file_path=file:/tmp/junit5_hadoop_catalog-6564358266702856210/ecb89cf8-f045-4560-bf48-17bac055481d/default/t/data/ts_hour=2024-09-12-07/uuid_bucket=2/00001-0-851323b3-f50a-4bdb-b87e-f3b7563fea20-00008.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{ts_hour=479479, uuid_bucket=2}, record_count=14, file_size_in_bytes=1278, column_sizes=org.apache.iceberg.util.SerializableMap@1c1, value_counts=org.apache.iceberg.util.SerializableMap@28, null_value_counts=org.apache.iceberg.util.SerializableMap@6, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@f13eaf5d, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@fa07314b, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=0, data_sequence_number=4, file_sequence_number=4},
        GenericDataFile{content=data, file_path=file:/tmp/junit5_hadoop_catalog-6564358266702856210/ecb89cf8-f045-4560-bf48-17bac055481d/default/t/data/ts_hour=2024-09-12-07/uuid_bucket=0/00005-0-284db009-4695-4493-bd63-ead0e48e6c64-00009.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{ts_hour=479479, uuid_bucket=0}, record_count=9, file_size_in_bytes=1181, column_sizes=org.apache.iceberg.util.SerializableMap@160, value_counts=org.apache.iceberg.util.SerializableMap@1d, null_value_counts=org.apache.iceberg.util.SerializableMap@6, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@34a756d8, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@f881d6ff, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=0, data_sequence_number=4, file_sequence_number=4},
        GenericDataFile{content=data, file_path=file:/tmp/junit5_hadoop_catalog-6564358266702856210/ecb89cf8-f045-4560-bf48-17bac055481d/default/t/data/ts_hour=2024-09-12-07/uuid_bucket=1/00005-0-284db009-4695-4493-bd63-ead0e48e6c64-00007.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{ts_hour=479479, uuid_bucket=1}, record_count=9, file_size_in_bytes=1183, column_sizes=org.apache.iceberg.util.SerializableMap@166, value_counts=org.apache.iceberg.util.SerializableMap@1d, null_value_counts=org.apache.iceberg.util.SerializableMap@6, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@5a27d75d, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@6548a5f5, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=0, data_sequence_number=4, file_sequence_number=4},
        GenericDataFile{content=data, file_path=file:/tmp/junit5_hadoop_catalog-6564358266702856210/ecb89cf8-f045-4560-bf48-17bac055481d/default/t/data/ts_hour=2024-09-12-07/uuid_bucket=3/00005-0-284db009-4695-4493-bd63-ead0e48e6c64-00008.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{ts_hour=479479, uuid_bucket=3}, record_count=19, file_size_in_bytes=1379, column_sizes=org.apache.iceberg.util.SerializableMap@22a, value_counts=org.apache.iceberg.util.SerializableMap@33, null_value_counts=org.apache.iceberg.util.SerializableMap@6, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@f3b952e4, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@d60de4c2, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=0, data_sequence_number=4, file_sequence_number=4},
        GenericDataFile{content=data, file_path=file:/tmp/junit5_hadoop_catalog-6564358266702856210/ecb89cf8-f045-4560-bf48-17bac055481d/default/t/data/ts_hour=2024-09-12-07/uuid_bucket=0/00000-0-04a400d9-3c02-4305-99c4-8a9513f08fb2-00010.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{ts_hour=479479, uuid_bucket=0}, record_count=9, file_size_in_bytes=1183, column_sizes=org.apache.iceberg.util.SerializableMap@166, value_counts=org.apache.iceberg.util.SerializableMap@1d, null_value_counts=org.apache.iceberg.util.SerializableMap@6, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@45e54b10, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@e15527a1, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=0, data_sequence_number=4, file_sequence_number=4},
        GenericDataFile{content=data, file_path=file:/tmp/junit5_hadoop_catalog-6564358266702856210/ecb89cf8-f045-4560-bf48-17bac055481d/default/t/data/ts_hour=2024-09-12-07/uuid_bucket=1/00000-0-04a400d9-3c02-4305-99c4-8a9513f08fb2-00009.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{ts_hour=479479, uuid_bucket=1}, record_count=8, file_size_in_bytes=1161, column_sizes=org.apache.iceberg.util.SerializableMap@14c, value_counts=org.apache.iceberg.util.SerializableMap@1e, null_value_counts=org.apache.iceberg.util.SerializableMap@6, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@d6177a80, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@b6865eb6, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=0, data_sequence_number=4, file_sequence_number=4},
        GenericDataFile{content=data, file_path=file:/tmp/junit5_hadoop_catalog-6564358266702856210/ecb89cf8-f045-4560-bf48-17bac055481d/default/t/data/ts_hour=2024-09-12-07/uuid_bucket=2/00000-0-04a400d9-3c02-4305-99c4-8a9513f08fb2-00008.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{ts_hour=479479, uuid_bucket=2}, record_count=25, file_size_in_bytes=1487, column_sizes=org.apache.iceberg.util.SerializableMap@296, value_counts=org.apache.iceberg.util.SerializableMap@4d, null_value_counts=org.apache.iceberg.util.SerializableMap@6, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@f1792380, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@eb1d324a, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=0, data_sequence_number=4, file_sequence_number=4},
        GenericDataFile{content=data, file_path=file:/tmp/junit5_hadoop_catalog-6564358266702856210/ecb89cf8-f045-4560-bf48-17bac055481d/default/t/data/ts_hour=2024-09-12-07/uuid_bucket=0/00004-0-5299d71e-bdf6-421e-b49e-7a16d08c501b-00008.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{ts_hour=479479, uuid_bucket=0}, record_count=9, file_size_in_bytes=1184, column_sizes=org.apache.iceberg.util.SerializableMap@165, value_counts=org.apache.iceberg.util.SerializableMap@1d, null_value_counts=org.apache.iceberg.util.SerializableMap@6, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@b07f7ca8, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@e4eafce, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=0, data_sequence_number=4, file_sequence_number=4},
        GenericDataFile{content=data, file_path=file:/tmp/junit5_hadoop_catalog-6564358266702856210/ecb89cf8-f045-4560-bf48-17bac055481d/default/t/data/ts_hour=2024-09-12-07/uuid_bucket=1/00004-0-5299d71e-bdf6-421e-b49e-7a16d08c501b-00010.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{ts_hour=479479, uuid_bucket=1}, record_count=11, file_size_in_bytes=1223, column_sizes=org.apache.iceberg.util.SerializableMap@188, value_counts=org.apache.iceberg.util.SerializableMap@1b, null_value_counts=org.apache.iceberg.util.SerializableMap@6, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@f50f7792, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@76c0af9, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=0, data_sequence_number=4, file_sequence_number=4},
        GenericDataFile{content=data, file_path=file:/tmp/junit5_hadoop_catalog-6564358266702856210/ecb89cf8-f045-4560-bf48-17bac055481d/default/t/data/ts_hour=2024-09-12-07/uuid_bucket=3/00004-0-5299d71e-bdf6-421e-b49e-7a16d08c501b-00009.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{ts_hour=479479, uuid_bucket=3}, record_count=22, file_size_in_bytes=1438, column_sizes=org.apache.iceberg.util.SerializableMap@265, value_counts=org.apache.iceberg.util.SerializableMap@40, null_value_counts=org.apache.iceberg.util.SerializableMap@6, nan_value_counts=org.apache.iceberg.util.SerializableMap@0, lower_bounds=org.apache.iceberg.SerializableByteBufferMap@e253791e, upper_bounds=org.apache.iceberg.SerializableByteBufferMap@94c3e62d, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=0, data_sequence_number=4, file_sequence_number=4}]
    to be less than or equal to 10 but was 18
        at org.apache.iceberg.flink.sink.TestFlinkIcebergSinkRangeDistributionBucketing.testParallelism(TestFlinkIcebergSinkRangeDistributionBucketing.java:220)
        at org.apache.iceberg.flink.sink.TestFlinkIcebergSinkRangeDistributionBucketing.testBucketNumberLessThanWriterParallelismNotDivisible(TestFlinkIcebergSinkRangeDistributionBucketing.java:152)

jenbaldwin pushed a commit to Teradata/iceberg that referenced this pull request Sep 17, 2024
* main: (208 commits)
  Docs: Fix Flink 1.20 support versions (apache#11065)
  Flink: Fix compile warning (apache#11072)
  Docs: Initial committer guidelines and requirements for merging (apache#10780)
  Core: Refactor ZOrderByteUtils (apache#10624)
  API: implement types timestamp_ns and timestamptz_ns (apache#9008)
  Build: Bump com.google.errorprone:error_prone_annotations (apache#11055)
  Build: Bump mkdocs-material from 9.5.33 to 9.5.34 (apache#11062)
  Flink: Backport PR apache#10526 to v1.18 and v1.20 (apache#11018)
  Kafka Connect: Disable publish tasks in runtime project (apache#11032)
  Flink: add unit tests for range distribution on bucket partition column (apache#11033)
  Spark 3.5: Use FileGenerationUtil in PlanningBenchmark (apache#11027)
  Core: Add benchmark for appending files (apache#11029)
  Build: Ignore benchmark output folders across all modules (apache#11030)
  Spec: Add RemovePartitionSpecsUpdate REST update type (apache#10846)
  Docs: bump latest version to 1.6.1 (apache#11036)
  OpenAPI, Build: Apply spotless to testFixtures source code (apache#11024)
  Core: Generate realistic bounds in benchmarks (apache#11022)
  Add REST Compatibility Kit (apache#10908)
  Flink: backport PR apache#10832 of inferring parallelism in FLIP-27 source (apache#11009)
  Docs: Add Druid docs url to sidebar (apache#10997)
  ...
zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants