From df20098ee7ffbaf8f7b0bb0b227a4af86e7e9302 Mon Sep 17 00:00:00 2001 From: Asura7969 <1402357969@qq.com> Date: Thu, 14 Dec 2023 17:35:25 +0800 Subject: [PATCH 01/12] init --- datafusion/core/src/datasource/listing/url.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/listing/url.rs b/datafusion/core/src/datasource/listing/url.rs index 979ed9e975c4..2431b152abb1 100644 --- a/datafusion/core/src/datasource/listing/url.rs +++ b/datafusion/core/src/datasource/listing/url.rs @@ -187,8 +187,7 @@ impl ListingTableUrl { match self.strip_prefix(path) { Some(mut segments) => match &self.glob { Some(glob) => { - let stripped = segments.join("/"); - glob.matches(&stripped) + glob.matches(&segments.next().unwrap()) } None => true, }, From 424d6653c78f884b49116e1092359ff0f6ec3fee Mon Sep 17 00:00:00 2001 From: asura7969 <1402357969@qq.com> Date: Thu, 14 Dec 2023 22:58:02 +0800 Subject: [PATCH 02/12] test --- datafusion/core/src/datasource/listing/url.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/datasource/listing/url.rs b/datafusion/core/src/datasource/listing/url.rs index 2431b152abb1..bbf65d16b8a2 100644 --- a/datafusion/core/src/datasource/listing/url.rs +++ b/datafusion/core/src/datasource/listing/url.rs @@ -186,9 +186,9 @@ impl ListingTableUrl { pub fn contains(&self, path: &Path) -> bool { match self.strip_prefix(path) { Some(mut segments) => match &self.glob { - Some(glob) => { - glob.matches(&segments.next().unwrap()) - } + Some(glob) => segments + .next() + .map_or(false, |file_name| glob.matches(file_name)), None => true, }, None => false, @@ -422,6 +422,12 @@ mod tests { let b = ListingTableUrl::parse("../bar/./foo/../baz").unwrap(); assert_eq!(a, b); assert!(a.prefix.as_ref().ends_with("bar/baz")); + + let url = ListingTableUrl::parse("../foo/*.parquet").unwrap(); + let child = url.prefix.child("aa.parquet"); + assert!(url.contains(&child)); + let child = url.prefix.child("dir").child("aa.parquet"); + assert!(!url.contains(&child)); } #[test] From b2d288a28f01ff0d7f49c9b83f49035ec2a42d1e Mon Sep 17 00:00:00 2001 From: asura7969 <1402357969@qq.com> Date: Sat, 16 Dec 2023 22:05:35 +0800 Subject: [PATCH 03/12] add config --- datafusion/common/src/config.rs | 4 +++ .../core/src/datasource/listing/helpers.rs | 6 ++++- datafusion/core/src/datasource/listing/url.rs | 25 +++++++++++++------ docs/source/user-guide/configs.md | 3 ++- 4 files changed, 29 insertions(+), 9 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 03fb5ea320a0..15400c65bdd7 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -273,6 +273,10 @@ config_namespace! { /// memory consumption pub max_buffered_batches_per_output_file: usize, default = 2 + /// When scanning file paths, whether to ignore subdirectory files, + /// ignored by default (true) + pub ignore_child_dir: bool, default = true + } } diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index be74afa1f4d6..6ae51b435330 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -40,6 +40,7 @@ use crate::execution::context::SessionState; use datafusion_common::tree_node::{TreeNode, VisitRecursion}; use datafusion_common::{internal_err, Column, DFField, DFSchema, DataFusionError}; use datafusion_expr::{Expr, ScalarFunctionDefinition, Volatility}; +use datafusion_optimizer::OptimizerConfig; use datafusion_physical_expr::create_physical_expr; use datafusion_physical_expr::execution_props::ExecutionProps; use object_store::path::Path; @@ -376,9 +377,12 @@ pub async fn pruned_partition_list<'a>( } }; + let exec_options = &ctx.options().execution; + let ignore_child_dir = exec_options.ignore_child_dir; + let files = files.into_iter().filter(move |o| { let extension_match = o.location.as_ref().ends_with(file_extension); - let glob_match = table_path.contains(&o.location); + let glob_match = table_path.contains(&o.location, ignore_child_dir); extension_match && glob_match }); diff --git a/datafusion/core/src/datasource/listing/url.rs b/datafusion/core/src/datasource/listing/url.rs index bbf65d16b8a2..42b1369aeed5 100644 --- a/datafusion/core/src/datasource/listing/url.rs +++ b/datafusion/core/src/datasource/listing/url.rs @@ -20,6 +20,7 @@ use std::fs; use crate::datasource::object_store::ObjectStoreUrl; use crate::execution::context::SessionState; use datafusion_common::{DataFusionError, Result}; +use datafusion_optimizer::OptimizerConfig; use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; use glob::Pattern; @@ -183,12 +184,19 @@ impl ListingTableUrl { } /// Returns `true` if `path` matches this [`ListingTableUrl`] - pub fn contains(&self, path: &Path) -> bool { + pub fn contains(&self, path: &Path, ignore_child_dir: bool) -> bool { match self.strip_prefix(path) { Some(mut segments) => match &self.glob { - Some(glob) => segments - .next() - .map_or(false, |file_name| glob.matches(file_name)), + Some(glob) => { + if ignore_child_dir { + segments + .next() + .map_or(false, |file_name| glob.matches(file_name)) + } else { + let stripped = segments.join("/"); + glob.matches(&stripped) + } + } None => true, }, None => false, @@ -221,6 +229,8 @@ impl ListingTableUrl { store: &'a dyn ObjectStore, file_extension: &'a str, ) -> Result>> { + let exec_options = &ctx.options().execution; + let ignore_child_dir = exec_options.ignore_child_dir; // If the prefix is a file, use a head request, otherwise list let list = match self.is_collection() { true => match ctx.runtime_env().cache_manager.get_list_files_cache() { @@ -244,7 +254,7 @@ impl ListingTableUrl { .try_filter(move |meta| { let path = &meta.location; let extension_match = path.as_ref().ends_with(file_extension); - let glob_match = self.contains(path); + let glob_match = self.contains(path, ignore_child_dir); futures::future::ready(extension_match && glob_match) }) .map_err(DataFusionError::ObjectStore) @@ -425,9 +435,10 @@ mod tests { let url = ListingTableUrl::parse("../foo/*.parquet").unwrap(); let child = url.prefix.child("aa.parquet"); - assert!(url.contains(&child)); + assert!(url.contains(&child, true)); let child = url.prefix.child("dir").child("aa.parquet"); - assert!(!url.contains(&child)); + assert!(!url.contains(&child, true)); + assert!(url.contains(&child, false)); } #[test] diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 6fb5cc4ca870..8a606119d403 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -36,7 +36,7 @@ If the value in the environment variable cannot be cast to the type of the confi Environment variables are read during `SessionConfig` initialisation so they must be set beforehand and will not affect running sessions. | key | default | description | -| ----------------------------------------------------------------------- | ------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +|-------------------------------------------------------------------------| ------------------------- |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | datafusion.catalog.create_default_catalog_and_schema | true | Whether the default catalog and schema should be created automatically. | | datafusion.catalog.default_catalog | datafusion | The default catalog name - this impacts what SQL queries use if not specified | | datafusion.catalog.default_schema | public | The default schema name - this impacts what SQL queries use if not specified | @@ -82,6 +82,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. | | datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max | | datafusion.execution.max_buffered_batches_per_output_file | 2 | This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption | +| datafusion.execution.ignore_child_dir | true | When scanning file paths, whether to ignore subdirectory files,ignored by default (true) | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | From bbeffd7a62392e9dd0f5326f1baf30dea8e5a8dc Mon Sep 17 00:00:00 2001 From: asura7969 <1402357969@qq.com> Date: Sat, 16 Dec 2023 22:20:40 +0800 Subject: [PATCH 04/12] rename --- datafusion/common/src/config.rs | 2 +- datafusion/core/src/datasource/listing/helpers.rs | 4 ++-- datafusion/core/src/datasource/listing/url.rs | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 15400c65bdd7..31f0de61243e 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -275,7 +275,7 @@ config_namespace! { /// When scanning file paths, whether to ignore subdirectory files, /// ignored by default (true) - pub ignore_child_dir: bool, default = true + pub ignore_subdirectory: bool, default = true } } diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 6ae51b435330..d3b5cda9e3f8 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -378,11 +378,11 @@ pub async fn pruned_partition_list<'a>( }; let exec_options = &ctx.options().execution; - let ignore_child_dir = exec_options.ignore_child_dir; + let ignore_subdirectory = exec_options.ignore_subdirectory; let files = files.into_iter().filter(move |o| { let extension_match = o.location.as_ref().ends_with(file_extension); - let glob_match = table_path.contains(&o.location, ignore_child_dir); + let glob_match = table_path.contains(&o.location, ignore_subdirectory); extension_match && glob_match }); diff --git a/datafusion/core/src/datasource/listing/url.rs b/datafusion/core/src/datasource/listing/url.rs index 6512bfd577df..9e1e9350a192 100644 --- a/datafusion/core/src/datasource/listing/url.rs +++ b/datafusion/core/src/datasource/listing/url.rs @@ -185,11 +185,11 @@ impl ListingTableUrl { } /// Returns `true` if `path` matches this [`ListingTableUrl`] - pub fn contains(&self, path: &Path, ignore_child_dir: bool) -> bool { + pub fn contains(&self, path: &Path, ignore_subdirectory: bool) -> bool { match self.strip_prefix(path) { Some(mut segments) => match &self.glob { Some(glob) => { - if ignore_child_dir { + if ignore_subdirectory { segments .next() .map_or(false, |file_name| glob.matches(file_name)) @@ -231,7 +231,7 @@ impl ListingTableUrl { file_extension: &'a str, ) -> Result>> { let exec_options = &ctx.options().execution; - let ignore_child_dir = exec_options.ignore_child_dir; + let ignore_subdirectory = exec_options.ignore_subdirectory; // If the prefix is a file, use a head request, otherwise list let list = match self.is_collection() { true => match ctx.runtime_env().cache_manager.get_list_files_cache() { @@ -255,7 +255,7 @@ impl ListingTableUrl { .try_filter(move |meta| { let path = &meta.location; let extension_match = path.as_ref().ends_with(file_extension); - let glob_match = self.contains(path, ignore_child_dir); + let glob_match = self.contains(path, ignore_subdirectory); futures::future::ready(extension_match && glob_match) }) .map_err(DataFusionError::ObjectStore) From abb765027ffac16447f392d2cda00b0dfe7d477c Mon Sep 17 00:00:00 2001 From: asura7969 <1402357969@qq.com> Date: Sat, 16 Dec 2023 22:21:22 +0800 Subject: [PATCH 05/12] doc --- docs/source/user-guide/configs.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 8a606119d403..faa2fbf29912 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -82,7 +82,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. | | datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max | | datafusion.execution.max_buffered_batches_per_output_file | 2 | This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption | -| datafusion.execution.ignore_child_dir | true | When scanning file paths, whether to ignore subdirectory files,ignored by default (true) | +| datafusion.execution.ignore_subdirectory | true | When scanning file paths, whether to ignore subdirectory files,ignored by default (true) | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | From e69183b9ce4f978c2d199d54734c87dcff75aa54 Mon Sep 17 00:00:00 2001 From: asura7969 <1402357969@qq.com> Date: Sat, 16 Dec 2023 23:56:12 +0800 Subject: [PATCH 06/12] fix doc --- datafusion/core/src/execution/context/parquet.rs | 2 +- datafusion/sqllogictest/test_files/information_schema.slt | 2 ++ docs/source/user-guide/configs.md | 4 ++-- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/execution/context/parquet.rs b/datafusion/core/src/execution/context/parquet.rs index 5d649d3e6df8..d637d7a8b42d 100644 --- a/datafusion/core/src/execution/context/parquet.rs +++ b/datafusion/core/src/execution/context/parquet.rs @@ -109,7 +109,7 @@ mod tests { .read_parquet( // it was reported that when a path contains // (two consecutive separator) no files were found // in this test, regardless of parquet_test_data() value, our path now contains a // - format!("{}/..//*/alltypes_plain*.parquet", parquet_test_data()), + format!("{}//alltypes_plain*.parquet", parquet_test_data()), ParquetReadOptions::default(), ) .await?; diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 5c6bf6e2dac1..b99d6c0ee77b 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -150,6 +150,7 @@ datafusion.execution.aggregate.scalar_update_factor 10 datafusion.execution.batch_size 8192 datafusion.execution.coalesce_batches true datafusion.execution.collect_statistics false +datafusion.execution.ignore_subdirectory true datafusion.execution.max_buffered_batches_per_output_file 2 datafusion.execution.meta_fetch_concurrency 32 datafusion.execution.minimum_parallel_output_files 4 @@ -224,6 +225,7 @@ datafusion.execution.aggregate.scalar_update_factor 10 Specifies the threshold f datafusion.execution.batch_size 8192 Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting datafusion.execution.collect_statistics false Should DataFusion collect statistics after listing files +datafusion.execution.ignore_subdirectory true When scanning file paths, whether to ignore subdirectory files, ignored by default (true) datafusion.execution.max_buffered_batches_per_output_file 2 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics datafusion.execution.minimum_parallel_output_files 4 Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index faa2fbf29912..00797fa2d294 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -36,7 +36,7 @@ If the value in the environment variable cannot be cast to the type of the confi Environment variables are read during `SessionConfig` initialisation so they must be set beforehand and will not affect running sessions. | key | default | description | -|-------------------------------------------------------------------------| ------------------------- |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| ----------------------------------------------------------------------- | ------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | datafusion.catalog.create_default_catalog_and_schema | true | Whether the default catalog and schema should be created automatically. | | datafusion.catalog.default_catalog | datafusion | The default catalog name - this impacts what SQL queries use if not specified | | datafusion.catalog.default_schema | public | The default schema name - this impacts what SQL queries use if not specified | @@ -82,7 +82,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. | | datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max | | datafusion.execution.max_buffered_batches_per_output_file | 2 | This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption | -| datafusion.execution.ignore_subdirectory | true | When scanning file paths, whether to ignore subdirectory files,ignored by default (true) | +| datafusion.execution.ignore_subdirectory | true | When scanning file paths, whether to ignore subdirectory files, ignored by default (true) | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | From aff207fe29a19712e2cf507f159c08213751051a Mon Sep 17 00:00:00 2001 From: Asura7969 <1402357969@qq.com> Date: Wed, 20 Dec 2023 11:59:39 +0800 Subject: [PATCH 07/12] add sqllogictests & rename --- datafusion/common/src/config.rs | 2 +- .../core/src/datasource/listing/helpers.rs | 2 +- datafusion/core/src/datasource/listing/url.rs | 18 +-- .../core/src/execution/context/parquet.rs | 11 +- .../test_files/information_schema.slt | 4 +- .../sqllogictest/test_files/parquet.slt | 112 ++++++++++++++++++ docs/source/user-guide/configs.md | 2 +- 7 files changed, 134 insertions(+), 17 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 31f0de61243e..1e2d58ff80e4 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -275,7 +275,7 @@ config_namespace! { /// When scanning file paths, whether to ignore subdirectory files, /// ignored by default (true) - pub ignore_subdirectory: bool, default = true + pub listing_table_ignore_subdirectory: bool, default = true } } diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index d3b5cda9e3f8..a7533b45c55a 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -378,7 +378,7 @@ pub async fn pruned_partition_list<'a>( }; let exec_options = &ctx.options().execution; - let ignore_subdirectory = exec_options.ignore_subdirectory; + let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory; let files = files.into_iter().filter(move |o| { let extension_match = o.location.as_ref().ends_with(file_extension); diff --git a/datafusion/core/src/datasource/listing/url.rs b/datafusion/core/src/datasource/listing/url.rs index 9e1e9350a192..766dee7de901 100644 --- a/datafusion/core/src/datasource/listing/url.rs +++ b/datafusion/core/src/datasource/listing/url.rs @@ -198,7 +198,14 @@ impl ListingTableUrl { glob.matches(&stripped) } } - None => true, + None => { + if ignore_subdirectory { + let has_subdirectory = segments.collect::>().len() > 1; + !has_subdirectory + } else { + true + } + } }, None => false, } @@ -231,7 +238,7 @@ impl ListingTableUrl { file_extension: &'a str, ) -> Result>> { let exec_options = &ctx.options().execution; - let ignore_subdirectory = exec_options.ignore_subdirectory; + let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory; // If the prefix is a file, use a head request, otherwise list let list = match self.is_collection() { true => match ctx.runtime_env().cache_manager.get_list_files_cache() { @@ -433,13 +440,6 @@ mod tests { let b = ListingTableUrl::parse("../bar/./foo/../baz").unwrap(); assert_eq!(a, b); assert!(a.prefix.as_ref().ends_with("bar/baz")); - - let url = ListingTableUrl::parse("../foo/*.parquet").unwrap(); - let child = url.prefix.child("aa.parquet"); - assert!(url.contains(&child, true)); - let child = url.prefix.child("dir").child("aa.parquet"); - assert!(!url.contains(&child, true)); - assert!(url.contains(&child, false)); } #[test] diff --git a/datafusion/core/src/execution/context/parquet.rs b/datafusion/core/src/execution/context/parquet.rs index d637d7a8b42d..e0512b7df350 100644 --- a/datafusion/core/src/execution/context/parquet.rs +++ b/datafusion/core/src/execution/context/parquet.rs @@ -81,6 +81,7 @@ mod tests { use crate::parquet::basic::Compression; use crate::test_util::parquet_test_data; use tempfile::tempdir; + use datafusion_execution::config::SessionConfig; use super::*; @@ -103,13 +104,17 @@ mod tests { #[tokio::test] async fn read_with_glob_path_issue_2465() -> Result<()> { - let ctx = SessionContext::new(); - + let config = + SessionConfig::from_string_hash_map(std::collections::HashMap::from([( + "datafusion.execution.listing_table_ignore_subdirectory".to_owned(), + "false".to_owned(), + )]))?; + let ctx = SessionContext::new_with_config(config); let df = ctx .read_parquet( // it was reported that when a path contains // (two consecutive separator) no files were found // in this test, regardless of parquet_test_data() value, our path now contains a // - format!("{}//alltypes_plain*.parquet", parquet_test_data()), + format!("{}/..//*/alltypes_plain*.parquet", parquet_test_data()), ParquetReadOptions::default(), ) .await?; diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index b99d6c0ee77b..5949be50ea20 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -150,7 +150,7 @@ datafusion.execution.aggregate.scalar_update_factor 10 datafusion.execution.batch_size 8192 datafusion.execution.coalesce_batches true datafusion.execution.collect_statistics false -datafusion.execution.ignore_subdirectory true +datafusion.execution.listing_table_ignore_subdirectory true datafusion.execution.max_buffered_batches_per_output_file 2 datafusion.execution.meta_fetch_concurrency 32 datafusion.execution.minimum_parallel_output_files 4 @@ -225,7 +225,7 @@ datafusion.execution.aggregate.scalar_update_factor 10 Specifies the threshold f datafusion.execution.batch_size 8192 Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting datafusion.execution.collect_statistics false Should DataFusion collect statistics after listing files -datafusion.execution.ignore_subdirectory true When scanning file paths, whether to ignore subdirectory files, ignored by default (true) +datafusion.execution.listing_table_ignore_subdirectory true When scanning file paths, whether to ignore subdirectory files, ignored by default (true) datafusion.execution.max_buffered_batches_per_output_file 2 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics datafusion.execution.minimum_parallel_output_files 4 Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index bbe7f33e260c..c8e17cd4f7a2 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -276,6 +276,118 @@ LIMIT 10; 0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) 0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) +# test for +query ITID +COPY (SELECT * FROM src_table WHERE int_col > 6 LIMIT 3) +TO 'test_files/scratch/parquet/test_table/subdir/3.parquet' +(FORMAT PARQUET, SINGLE_FILE_OUTPUT true); +---- +3 + +# Test config ignore_subdirectory: + +statement ok +set datafusion.execution.listing_table_ignore_subdirectory = true; + +statement ok +CREATE EXTERNAL TABLE t1_ignore_subdirectory +STORED AS PARQUET +WITH HEADER ROW +LOCATION 'test_files/scratch/parquet/test_table/*.parquet'; + +query TT +explain select count(*) from t1_ignore_subdirectory; +---- +logical_plan +Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]] +--TableScan: t1_ignore_subdirectory projection=[] +physical_plan +AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)] +--CoalescePartitionsExec +----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(*)] +------ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet]]} + +statement ok +CREATE EXTERNAL TABLE t2_ignore_subdirectory +STORED AS PARQUET +WITH HEADER ROW +LOCATION 'test_files/scratch/parquet/test_table/'; + +query TT +explain select count(*) from t2_ignore_subdirectory; +---- +logical_plan +Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]] +--TableScan: t2_ignore_subdirectory projection=[] +physical_plan +AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)] +--CoalescePartitionsExec +----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(*)] +------ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet]]} + +# scan file: 0.parquet 1.parquet 2.parquet + +query I +select count(*) from t1_ignore_subdirectory; +---- +9 + +query I +select count(*) from t2_ignore_subdirectory; +---- +9 + +statement ok +set datafusion.execution.listing_table_ignore_subdirectory = false; + +statement ok +CREATE EXTERNAL TABLE t1_with_subdirectory +STORED AS PARQUET +WITH HEADER ROW +LOCATION 'test_files/scratch/parquet/test_table/*.parquet'; + +query TT +explain select count(*) from t1_with_subdirectory; +---- +logical_plan +Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]] +--TableScan: t1_with_subdirectory projection=[] +physical_plan +AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)] +--CoalescePartitionsExec +----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(*)] +------ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/subdir/3.parquet]]} + + +statement ok +CREATE EXTERNAL TABLE t2_with_subdirectory +STORED AS PARQUET +WITH HEADER ROW +LOCATION 'test_files/scratch/parquet/test_table/'; + +query TT +explain select count(*) from t2_with_subdirectory; +---- +logical_plan +Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]] +--TableScan: t2_with_subdirectory projection=[] +physical_plan +AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)] +--CoalescePartitionsExec +----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(*)] +------ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/subdir/3.parquet]]} + +# scan file: 0.parquet 1.parquet 2.parquet 3.parquet +query I +select count(*) from t1_with_subdirectory; +---- +12 + +query I +select count(*) from t2_with_subdirectory; +---- +12 + # Clean up statement ok DROP TABLE timestamp_with_tz; diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 00797fa2d294..945373bcf3e5 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -82,7 +82,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. | | datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max | | datafusion.execution.max_buffered_batches_per_output_file | 2 | This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption | -| datafusion.execution.ignore_subdirectory | true | When scanning file paths, whether to ignore subdirectory files, ignored by default (true) | +| datafusion.execution.listing_table_ignore_subdirectory | true | When scanning file paths, whether to ignore subdirectory files, ignored by default (true) | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | From 7a7c8f8c373f6b7a5efb41da747e72b8f3cca545 Mon Sep 17 00:00:00 2001 From: Asura7969 <1402357969@qq.com> Date: Wed, 20 Dec 2023 12:32:27 +0800 Subject: [PATCH 08/12] fmt & fix test --- datafusion/core/src/datasource/listing/helpers.rs | 7 ++----- datafusion/core/src/execution/context/parquet.rs | 2 +- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index a7533b45c55a..b09321d69fc9 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -376,13 +376,10 @@ pub async fn pruned_partition_list<'a>( store.list(Some(&partition.path)).try_collect().await? } }; - - let exec_options = &ctx.options().execution; - let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory; - let files = files.into_iter().filter(move |o| { let extension_match = o.location.as_ref().ends_with(file_extension); - let glob_match = table_path.contains(&o.location, ignore_subdirectory); + // here need to scan subdirectories(`listing_table_ignore_subdirectory` = false) + let glob_match = table_path.contains(&o.location, false); extension_match && glob_match }); diff --git a/datafusion/core/src/execution/context/parquet.rs b/datafusion/core/src/execution/context/parquet.rs index e0512b7df350..7825d9b88297 100644 --- a/datafusion/core/src/execution/context/parquet.rs +++ b/datafusion/core/src/execution/context/parquet.rs @@ -80,8 +80,8 @@ mod tests { use crate::dataframe::DataFrameWriteOptions; use crate::parquet::basic::Compression; use crate::test_util::parquet_test_data; - use tempfile::tempdir; use datafusion_execution::config::SessionConfig; + use tempfile::tempdir; use super::*; From e745b919606d2e025747ef804ec5c2fc50538974 Mon Sep 17 00:00:00 2001 From: Asura7969 <1402357969@qq.com> Date: Wed, 20 Dec 2023 12:46:37 +0800 Subject: [PATCH 09/12] clippy --- datafusion/core/src/datasource/listing/helpers.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index b09321d69fc9..68de55e1a410 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -40,7 +40,6 @@ use crate::execution::context::SessionState; use datafusion_common::tree_node::{TreeNode, VisitRecursion}; use datafusion_common::{internal_err, Column, DFField, DFSchema, DataFusionError}; use datafusion_expr::{Expr, ScalarFunctionDefinition, Volatility}; -use datafusion_optimizer::OptimizerConfig; use datafusion_physical_expr::create_physical_expr; use datafusion_physical_expr::execution_props::ExecutionProps; use object_store::path::Path; From 3ee2e61c51490e24966834c1aa307b4c665f6a37 Mon Sep 17 00:00:00 2001 From: Asura7969 <1402357969@qq.com> Date: Wed, 20 Dec 2023 13:50:49 +0800 Subject: [PATCH 10/12] test read partition table --- .../sqllogictest/test_files/csv_files.slt | 57 +++++++++++++++++++ .../sqllogictest/test_files/parquet.slt | 13 +++++ 2 files changed, 70 insertions(+) diff --git a/datafusion/sqllogictest/test_files/csv_files.slt b/datafusion/sqllogictest/test_files/csv_files.slt index 9facb064bf32..f82170fc9496 100644 --- a/datafusion/sqllogictest/test_files/csv_files.slt +++ b/datafusion/sqllogictest/test_files/csv_files.slt @@ -63,3 +63,60 @@ id6 value"6 id7 value"7 id8 value"8 id9 value"9 + + +# When reading a partitioned table, the `listing_table_ignore_subdirectory` configuration will be invalid +statement ok +set datafusion.execution.listing_table_ignore_subdirectory = false; + +statement ok +CREATE EXTERNAL TABLE partition_csv_table ( + name VARCHAR, + ts TIMESTAMP, + c_date DATE, +) +STORED AS CSV +PARTITIONED BY (c_date) +LOCATION '../core/tests/data/partitioned_table'; + +query I +select count(*) from partition_csv_table; +---- +4 + +statement ok +DROP TABLE partition_csv_table + +statement ok +set datafusion.execution.listing_table_ignore_subdirectory = true; + +statement ok +CREATE EXTERNAL TABLE partition_csv_table ( + name VARCHAR, + ts TIMESTAMP, + c_date DATE, +) +STORED AS CSV +PARTITIONED BY (c_date) +LOCATION '../core/tests/data/partitioned_table'; + +query TT +explain select count(*) from partition_csv_table; +---- +logical_plan +Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]] +--TableScan: partition_csv_table projection=[] +physical_plan +AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)] +--CoalescePartitionsExec +----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(*)] +------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 +--------CsvExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_table/c_date=2018-11-13/timestamps.csv], [WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_table/c_date=2018-12-13/timestamps.csv]]}, has_header=false + +query I +select count(*) from partition_csv_table; +---- +4 + +statement ok +DROP TABLE partition_csv_table diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index c8e17cd4f7a2..3e04aa3db78d 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -414,3 +414,16 @@ NULL # Clean up statement ok DROP TABLE single_nan; + +# Clean up +statement ok +DROP TABLE t1_ignore_subdirectory; + +statement ok +DROP TABLE t2_ignore_subdirectory; + +statement ok +DROP TABLE t1_with_subdirectory; + +statement ok +DROP TABLE t2_with_subdirectory; From 9ea237ca8d1d996827390a4e243c4111ec8df579 Mon Sep 17 00:00:00 2001 From: Asura7969 <1402357969@qq.com> Date: Thu, 21 Dec 2023 09:46:31 +0800 Subject: [PATCH 11/12] simplify testing --- datafusion/common/src/config.rs | 3 +- .../sqllogictest/test_files/csv_files.slt | 36 +------ .../test_files/information_schema.slt | 2 +- .../sqllogictest/test_files/parquet.slt | 102 ++---------------- docs/source/user-guide/configs.md | 2 +- 5 files changed, 12 insertions(+), 133 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 1e2d58ff80e4..dedce74ff40d 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -274,7 +274,8 @@ config_namespace! { pub max_buffered_batches_per_output_file: usize, default = 2 /// When scanning file paths, whether to ignore subdirectory files, - /// ignored by default (true) + /// ignored by default (true), when reading a partitioned table, + /// `listing_table_ignore_subdirectory` is always equal to false, even if set to true pub listing_table_ignore_subdirectory: bool, default = true } diff --git a/datafusion/sqllogictest/test_files/csv_files.slt b/datafusion/sqllogictest/test_files/csv_files.slt index f82170fc9496..9afec4da2f27 100644 --- a/datafusion/sqllogictest/test_files/csv_files.slt +++ b/datafusion/sqllogictest/test_files/csv_files.slt @@ -65,28 +65,7 @@ id8 value"8 id9 value"9 -# When reading a partitioned table, the `listing_table_ignore_subdirectory` configuration will be invalid -statement ok -set datafusion.execution.listing_table_ignore_subdirectory = false; - -statement ok -CREATE EXTERNAL TABLE partition_csv_table ( - name VARCHAR, - ts TIMESTAMP, - c_date DATE, -) -STORED AS CSV -PARTITIONED BY (c_date) -LOCATION '../core/tests/data/partitioned_table'; - -query I -select count(*) from partition_csv_table; ----- -4 - -statement ok -DROP TABLE partition_csv_table - +# When reading a partitioned table, `listing_table_ignore_subdirectory` is always equal to false, even if set to true statement ok set datafusion.execution.listing_table_ignore_subdirectory = true; @@ -100,19 +79,6 @@ STORED AS CSV PARTITIONED BY (c_date) LOCATION '../core/tests/data/partitioned_table'; -query TT -explain select count(*) from partition_csv_table; ----- -logical_plan -Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]] ---TableScan: partition_csv_table projection=[] -physical_plan -AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)] ---CoalescePartitionsExec -----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(*)] -------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 ---------CsvExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_table/c_date=2018-11-13/timestamps.csv], [WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_table/c_date=2018-12-13/timestamps.csv]]}, has_header=false - query I select count(*) from partition_csv_table; ---- diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 5949be50ea20..36876beb1447 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -225,7 +225,7 @@ datafusion.execution.aggregate.scalar_update_factor 10 Specifies the threshold f datafusion.execution.batch_size 8192 Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting datafusion.execution.collect_statistics false Should DataFusion collect statistics after listing files -datafusion.execution.listing_table_ignore_subdirectory true When scanning file paths, whether to ignore subdirectory files, ignored by default (true) +datafusion.execution.listing_table_ignore_subdirectory true When scanning file paths, whether to ignore subdirectory files, ignored by default (true), when reading a partitioned table, `listing_table_ignore_subdirectory` is always equal to false, even if set to true datafusion.execution.max_buffered_batches_per_output_file 2 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics datafusion.execution.minimum_parallel_output_files 4 Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index 3e04aa3db78d..8a642cb253be 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -276,7 +276,8 @@ LIMIT 10; 0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) 0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC")) -# test for +# Test config listing_table_ignore_subdirectory: + query ITID COPY (SELECT * FROM src_table WHERE int_col > 6 LIMIT 3) TO 'test_files/scratch/parquet/test_table/subdir/3.parquet' @@ -284,107 +285,27 @@ TO 'test_files/scratch/parquet/test_table/subdir/3.parquet' ---- 3 -# Test config ignore_subdirectory: - statement ok -set datafusion.execution.listing_table_ignore_subdirectory = true; - -statement ok -CREATE EXTERNAL TABLE t1_ignore_subdirectory +CREATE EXTERNAL TABLE listing_table STORED AS PARQUET WITH HEADER ROW LOCATION 'test_files/scratch/parquet/test_table/*.parquet'; -query TT -explain select count(*) from t1_ignore_subdirectory; ----- -logical_plan -Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]] ---TableScan: t1_ignore_subdirectory projection=[] -physical_plan -AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)] ---CoalescePartitionsExec -----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(*)] -------ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet]]} - statement ok -CREATE EXTERNAL TABLE t2_ignore_subdirectory -STORED AS PARQUET -WITH HEADER ROW -LOCATION 'test_files/scratch/parquet/test_table/'; - -query TT -explain select count(*) from t2_ignore_subdirectory; ----- -logical_plan -Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]] ---TableScan: t2_ignore_subdirectory projection=[] -physical_plan -AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)] ---CoalescePartitionsExec -----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(*)] -------ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet]]} +set datafusion.execution.listing_table_ignore_subdirectory = true; # scan file: 0.parquet 1.parquet 2.parquet - query I -select count(*) from t1_ignore_subdirectory; ----- -9 - -query I -select count(*) from t2_ignore_subdirectory; +select count(*) from listing_table; ---- 9 statement ok set datafusion.execution.listing_table_ignore_subdirectory = false; -statement ok -CREATE EXTERNAL TABLE t1_with_subdirectory -STORED AS PARQUET -WITH HEADER ROW -LOCATION 'test_files/scratch/parquet/test_table/*.parquet'; - -query TT -explain select count(*) from t1_with_subdirectory; ----- -logical_plan -Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]] ---TableScan: t1_with_subdirectory projection=[] -physical_plan -AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)] ---CoalescePartitionsExec -----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(*)] -------ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/subdir/3.parquet]]} - - -statement ok -CREATE EXTERNAL TABLE t2_with_subdirectory -STORED AS PARQUET -WITH HEADER ROW -LOCATION 'test_files/scratch/parquet/test_table/'; - -query TT -explain select count(*) from t2_with_subdirectory; ----- -logical_plan -Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]] ---TableScan: t2_with_subdirectory projection=[] -physical_plan -AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)] ---CoalescePartitionsExec -----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(*)] -------ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/subdir/3.parquet]]} - # scan file: 0.parquet 1.parquet 2.parquet 3.parquet query I -select count(*) from t1_with_subdirectory; ----- -12 - -query I -select count(*) from t2_with_subdirectory; +select count(*) from listing_table; ---- 12 @@ -417,13 +338,4 @@ DROP TABLE single_nan; # Clean up statement ok -DROP TABLE t1_ignore_subdirectory; - -statement ok -DROP TABLE t2_ignore_subdirectory; - -statement ok -DROP TABLE t1_with_subdirectory; - -statement ok -DROP TABLE t2_with_subdirectory; +DROP TABLE listing_table; diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 945373bcf3e5..1f7fa7760b94 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -82,7 +82,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. | | datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max | | datafusion.execution.max_buffered_batches_per_output_file | 2 | This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption | -| datafusion.execution.listing_table_ignore_subdirectory | true | When scanning file paths, whether to ignore subdirectory files, ignored by default (true) | +| datafusion.execution.listing_table_ignore_subdirectory | true | When scanning file paths, whether to ignore subdirectory files, ignored by default (true), when reading a partitioned table, `listing_table_ignore_subdirectory` is always equal to false, even if set to true | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | From fe520987293558f78ed8484f39362924904f64c6 Mon Sep 17 00:00:00 2001 From: Asura7969 <1402357969@qq.com> Date: Thu, 21 Dec 2023 13:02:07 +0800 Subject: [PATCH 12/12] simplify testing --- .../sqllogictest/test_files/csv_files.slt | 23 ------------------- .../sqllogictest/test_files/parquet.slt | 1 - 2 files changed, 24 deletions(-) diff --git a/datafusion/sqllogictest/test_files/csv_files.slt b/datafusion/sqllogictest/test_files/csv_files.slt index 9afec4da2f27..9facb064bf32 100644 --- a/datafusion/sqllogictest/test_files/csv_files.slt +++ b/datafusion/sqllogictest/test_files/csv_files.slt @@ -63,26 +63,3 @@ id6 value"6 id7 value"7 id8 value"8 id9 value"9 - - -# When reading a partitioned table, `listing_table_ignore_subdirectory` is always equal to false, even if set to true -statement ok -set datafusion.execution.listing_table_ignore_subdirectory = true; - -statement ok -CREATE EXTERNAL TABLE partition_csv_table ( - name VARCHAR, - ts TIMESTAMP, - c_date DATE, -) -STORED AS CSV -PARTITIONED BY (c_date) -LOCATION '../core/tests/data/partitioned_table'; - -query I -select count(*) from partition_csv_table; ----- -4 - -statement ok -DROP TABLE partition_csv_table diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index af4562e334b7..0f26c14f0017 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -336,7 +336,6 @@ NULL statement ok DROP TABLE single_nan; - statement ok CREATE EXTERNAL TABLE list_columns STORED AS PARQUET