diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 3536c098bd76..be74afa1f4d6 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -141,12 +141,18 @@ const CONCURRENCY_LIMIT: usize = 100; /// Partition the list of files into `n` groups pub fn split_files( - partitioned_files: Vec, + mut partitioned_files: Vec, n: usize, ) -> Vec> { if partitioned_files.is_empty() { return vec![]; } + + // ObjectStore::list does not guarantee any consistent order and for some + // implementations such as LocalFileSystem, it may be inconsistent. Thus + // Sort files by path to ensure consistent plans when run more than once. + partitioned_files.sort_by(|a, b| a.path().cmp(b.path())); + // effectively this is div with rounding up instead of truncating let chunk_size = (partitioned_files.len() + n - 1) / n; partitioned_files diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index 87c1663ae718..5e5b96f6ba8c 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -109,6 +109,11 @@ impl PartitionedFile { let size = std::fs::metadata(path.clone())?.len(); Ok(Self::new(path, size)) } + + /// Return the path of this partitioned file + pub fn path(&self) -> &Path { + &self.object_meta.location + } } impl From for PartitionedFile { diff --git a/datafusion/core/src/datasource/listing/url.rs b/datafusion/core/src/datasource/listing/url.rs index 9e9fb9210071..979ed9e975c4 100644 --- a/datafusion/core/src/datasource/listing/url.rs +++ b/datafusion/core/src/datasource/listing/url.rs @@ -131,6 +131,10 @@ impl ListingTableUrl { if is_directory { fs::create_dir_all(path)?; } else { + // ensure parent directory exists + if let Some(parent) = path.parent() { + fs::create_dir_all(parent)?; + } fs::File::create(path)?; } } diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs b/datafusion/sqllogictest/bin/sqllogictests.rs index 618e3106c629..484677d58e79 100644 --- a/datafusion/sqllogictest/bin/sqllogictests.rs +++ b/datafusion/sqllogictest/bin/sqllogictests.rs @@ -159,6 +159,7 @@ async fn run_test_file_with_postgres(test_file: TestFile) -> Result<()> { relative_path, } = test_file; info!("Running with Postgres runner: {}", path.display()); + setup_scratch_dir(&relative_path)?; let mut runner = sqllogictest::Runner::new(|| Postgres::connect(relative_path.clone())); runner.with_column_validator(strict_column_validator); @@ -188,6 +189,7 @@ async fn run_complete_file(test_file: TestFile) -> Result<()> { info!("Skipping: {}", path.display()); return Ok(()); }; + setup_scratch_dir(&relative_path)?; let mut runner = sqllogictest::Runner::new(|| async { Ok(DataFusion::new( test_ctx.session_ctx().clone(), diff --git a/datafusion/sqllogictest/test_files/repartition_scan.slt b/datafusion/sqllogictest/test_files/repartition_scan.slt new file mode 100644 index 000000000000..551d6d9ed48a --- /dev/null +++ b/datafusion/sqllogictest/test_files/repartition_scan.slt @@ -0,0 +1,268 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +########## +# Tests for automatically reading files in parallel during scan +########## + +# Set 4 partitions for deterministic output plans +statement ok +set datafusion.execution.target_partitions = 4; + +# automatically partition all files over 1 byte +statement ok +set datafusion.optimizer.repartition_file_min_size = 1; + +################### +### Parquet tests +################### + +# create a single parquet file +# Note filename 2.parquet to test sorting (on local file systems it is often listed before 1.parquet) +statement ok +COPY (VALUES (1), (2), (3), (4), (5)) TO 'test_files/scratch/repartition_scan/parquet_table/2.parquet' +(FORMAT PARQUET, SINGLE_FILE_OUTPUT true); + +statement ok +CREATE EXTERNAL TABLE parquet_table(column1 int) +STORED AS PARQUET +LOCATION 'test_files/scratch/repartition_scan/parquet_table/'; + +query I +select * from parquet_table; +---- +1 +2 +3 +4 +5 + +## Expect to see the scan read the file as "4" groups with even sizes (offsets) +query TT +EXPLAIN SELECT column1 FROM parquet_table WHERE column1 <> 42; +---- +logical_plan +Filter: parquet_table.column1 != Int32(42) +--TableScan: parquet_table projection=[column1], partial_filters=[parquet_table.column1 != Int32(42)] +physical_plan +CoalesceBatchesExec: target_batch_size=8192 +--FilterExec: column1@0 != 42 +----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1 + +# create a second parquet file +statement ok +COPY (VALUES (100), (200)) TO 'test_files/scratch/repartition_scan/parquet_table/1.parquet' +(FORMAT PARQUET, SINGLE_FILE_OUTPUT true); + +## Still expect to see the scan read the file as "4" groups with even sizes. One group should read +## parts of both files. +query TT +EXPLAIN SELECT column1 FROM parquet_table WHERE column1 <> 42 ORDER BY column1; +---- +logical_plan +Sort: parquet_table.column1 ASC NULLS LAST +--Filter: parquet_table.column1 != Int32(42) +----TableScan: parquet_table projection=[column1], partial_filters=[parquet_table.column1 != Int32(42)] +physical_plan +SortPreservingMergeExec: [column1@0 ASC NULLS LAST] +--SortExec: expr=[column1@0 ASC NULLS LAST] +----CoalesceBatchesExec: target_batch_size=8192 +------FilterExec: column1@0 != 42 +--------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..200], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:200..394, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..206], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:206..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1 + + +## Read the files as though they are ordered + +statement ok +CREATE EXTERNAL TABLE parquet_table_with_order(column1 int) +STORED AS PARQUET +LOCATION 'test_files/scratch/repartition_scan/parquet_table' +WITH ORDER (column1 ASC); + +# output should be ordered +query I +SELECT column1 FROM parquet_table_with_order WHERE column1 <> 42 ORDER BY column1; +---- +1 +2 +3 +4 +5 +100 +200 + +# explain should not have any groups with more than one file +# https://github.com/apache/arrow-datafusion/issues/8451 +query TT +EXPLAIN SELECT column1 FROM parquet_table_with_order WHERE column1 <> 42 ORDER BY column1; +---- +logical_plan +Sort: parquet_table_with_order.column1 ASC NULLS LAST +--Filter: parquet_table_with_order.column1 != Int32(42) +----TableScan: parquet_table_with_order projection=[column1], partial_filters=[parquet_table_with_order.column1 != Int32(42)] +physical_plan +SortPreservingMergeExec: [column1@0 ASC NULLS LAST] +--CoalesceBatchesExec: target_batch_size=8192 +----FilterExec: column1@0 != 42 +------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..200], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:200..394, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..206], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:206..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1 + +# Cleanup +statement ok +DROP TABLE parquet_table; + +statement ok +DROP TABLE parquet_table_with_order; + + +################### +### CSV tests +################### + +# Since parquet and CSV share most of the same implementation, this test checks +# that the basics are connected properly + +# create a single csv file +statement ok +COPY (VALUES (1), (2), (3), (4), (5)) TO 'test_files/scratch/repartition_scan/csv_table/1.csv' +(FORMAT csv, SINGLE_FILE_OUTPUT true, HEADER true); + +statement ok +CREATE EXTERNAL TABLE csv_table(column1 int) +STORED AS csv +WITH HEADER ROW +LOCATION 'test_files/scratch/repartition_scan/csv_table/'; + +query I +select * from csv_table; +---- +1 +2 +3 +4 +5 + +## Expect to see the scan read the file as "4" groups with even sizes (offsets) +query TT +EXPLAIN SELECT column1 FROM csv_table WHERE column1 <> 42; +---- +logical_plan +Filter: csv_table.column1 != Int32(42) +--TableScan: csv_table projection=[column1], partial_filters=[csv_table.column1 != Int32(42)] +physical_plan +CoalesceBatchesExec: target_batch_size=8192 +--FilterExec: column1@0 != 42 +----CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/csv_table/1.csv:0..5], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/csv_table/1.csv:5..10], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/csv_table/1.csv:10..15], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/csv_table/1.csv:15..18]]}, projection=[column1], has_header=true + +# Cleanup +statement ok +DROP TABLE csv_table; + + +################### +### JSON tests +################### + +# Since parquet and json share most of the same implementation, this test checks +# that the basics are connected properly + +# create a single json file +statement ok +COPY (VALUES (1), (2), (3), (4), (5)) TO 'test_files/scratch/repartition_scan/json_table/1.json' +(FORMAT json, SINGLE_FILE_OUTPUT true); + +statement ok +CREATE EXTERNAL TABLE json_table(column1 int) +STORED AS json +LOCATION 'test_files/scratch/repartition_scan/json_table/'; + +query I +select * from json_table; +---- +1 +2 +3 +4 +5 + +## In the future it would be cool to see the file read as "4" groups with even sizes (offsets) +## but for now it is just one group +## https://github.com/apache/arrow-datafusion/issues/8502 +query TT +EXPLAIN SELECT column1 FROM json_table WHERE column1 <> 42; +---- +logical_plan +Filter: json_table.column1 != Int32(42) +--TableScan: json_table projection=[column1], partial_filters=[json_table.column1 != Int32(42)] +physical_plan +CoalesceBatchesExec: target_batch_size=8192 +--FilterExec: column1@0 != 42 +----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------JsonExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/json_table/1.json]]}, projection=[column1] + + +# Cleanup +statement ok +DROP TABLE json_table; + + +################### +### Arrow File tests +################### + +## Use pre-existing files we don't have a way to create arrow files yet +## (https://github.com/apache/arrow-datafusion/issues/8504) +statement ok +CREATE EXTERNAL TABLE arrow_table +STORED AS ARROW +LOCATION '../core/tests/data/example.arrow'; + + +# It would be great to see the file read as "4" groups with even sizes (offsets) eventually +# https://github.com/apache/arrow-datafusion/issues/8503 +query TT +EXPLAIN SELECT * FROM arrow_table +---- +logical_plan TableScan: arrow_table projection=[f0, f1, f2] +physical_plan ArrowExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.arrow]]}, projection=[f0, f1, f2] + +# Cleanup +statement ok +DROP TABLE arrow_table; + +################### +### Avro File tests +################### + +## Use pre-existing files we don't have a way to create avro files yet + +statement ok +CREATE EXTERNAL TABLE avro_table +STORED AS AVRO +WITH HEADER ROW +LOCATION '../../testing/data/avro/simple_enum.avro' + + +# It would be great to see the file read as "4" groups with even sizes (offsets) eventually +query TT +EXPLAIN SELECT * FROM avro_table +---- +logical_plan TableScan: avro_table projection=[f1, f2, f3] +physical_plan AvroExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/avro/simple_enum.avro]]}, projection=[f1, f2, f3] + +# Cleanup +statement ok +DROP TABLE avro_table;