Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new configuration item listing_table_ignore_subdirectory #8565

Merged
merged 15 commits into from
Dec 22, 2023
5 changes: 5 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,11 @@ config_namespace! {
/// memory consumption
pub max_buffered_batches_per_output_file: usize, default = 2

/// When scanning file paths, whether to ignore subdirectory files,
/// ignored by default (true), when reading a partitioned table,
/// `listing_table_ignore_subdirectory` is always equal to false, even if set to true
Comment on lines +276 to +278
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the description information, do you agree? @alamb

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what this is trying to say 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

## read partition table

CREATE EXTERNAL TABLE csv_with_timestamps (
  name VARCHAR,
  ts TIMESTAMP,
  c_date DATE,
)
STORED AS CSV
PARTITIONED BY (c_date)
LOCATION '../core/tests/data/partitioned_table';

set datafusion.execution.listing_table_ignore_subdirectory = true;

select count(*) from partition_tbale;    ## return 4

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, got it -- thank you -- I will propose a clarification in a follow on PR

pub listing_table_ignore_subdirectory: bool, default = true

}
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,10 @@ pub async fn pruned_partition_list<'a>(
store.list(Some(&partition.path)).try_collect().await?
}
};

let files = files.into_iter().filter(move |o| {
let extension_match = o.location.as_ref().ends_with(file_extension);
let glob_match = table_path.contains(&o.location);
// here need to scan subdirectories(`listing_table_ignore_subdirectory` = false)
let glob_match = table_path.contains(&o.location, false);
extension_match && glob_match
});

Expand Down
26 changes: 21 additions & 5 deletions datafusion/core/src/datasource/listing/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::fs;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::context::SessionState;
use datafusion_common::{DataFusionError, Result};
use datafusion_optimizer::OptimizerConfig;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use glob::Pattern;
Expand Down Expand Up @@ -184,14 +185,27 @@ impl ListingTableUrl {
}

/// Returns `true` if `path` matches this [`ListingTableUrl`]
pub fn contains(&self, path: &Path) -> bool {
pub fn contains(&self, path: &Path, ignore_subdirectory: bool) -> bool {
match self.strip_prefix(path) {
Some(mut segments) => match &self.glob {
Some(glob) => {
let stripped = segments.join("/");
glob.matches(&stripped)
if ignore_subdirectory {
segments
.next()
.map_or(false, |file_name| glob.matches(file_name))
} else {
let stripped = segments.join("/");
glob.matches(&stripped)
}
}
None => {
if ignore_subdirectory {
let has_subdirectory = segments.collect::<Vec<_>>().len() > 1;
!has_subdirectory
} else {
true
}
}
None => true,
},
None => false,
}
Expand Down Expand Up @@ -223,6 +237,8 @@ impl ListingTableUrl {
store: &'a dyn ObjectStore,
file_extension: &'a str,
) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
let exec_options = &ctx.options().execution;
let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory;
// If the prefix is a file, use a head request, otherwise list
let list = match self.is_collection() {
true => match ctx.runtime_env().cache_manager.get_list_files_cache() {
Expand All @@ -246,7 +262,7 @@ impl ListingTableUrl {
.try_filter(move |meta| {
let path = &meta.location;
let extension_match = path.as_ref().ends_with(file_extension);
let glob_match = self.contains(path);
let glob_match = self.contains(path, ignore_subdirectory);
futures::future::ready(extension_match && glob_match)
})
.map_err(DataFusionError::ObjectStore)
Expand Down
9 changes: 7 additions & 2 deletions datafusion/core/src/execution/context/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ mod tests {
use crate::dataframe::DataFrameWriteOptions;
use crate::parquet::basic::Compression;
use crate::test_util::parquet_test_data;
use datafusion_execution::config::SessionConfig;
use tempfile::tempdir;

use super::*;
Expand All @@ -103,8 +104,12 @@ mod tests {

#[tokio::test]
async fn read_with_glob_path_issue_2465() -> Result<()> {
let ctx = SessionContext::new();

let config =
SessionConfig::from_string_hash_map(std::collections::HashMap::from([(
"datafusion.execution.listing_table_ignore_subdirectory".to_owned(),
"false".to_owned(),
)]))?;
let ctx = SessionContext::new_with_config(config);
let df = ctx
.read_parquet(
// it was reported that when a path contains // (two consecutive separator) no files were found
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ datafusion.execution.aggregate.scalar_update_factor 10
datafusion.execution.batch_size 8192
datafusion.execution.coalesce_batches true
datafusion.execution.collect_statistics false
datafusion.execution.listing_table_ignore_subdirectory true
datafusion.execution.max_buffered_batches_per_output_file 2
datafusion.execution.meta_fetch_concurrency 32
datafusion.execution.minimum_parallel_output_files 4
Expand Down Expand Up @@ -224,6 +225,7 @@ datafusion.execution.aggregate.scalar_update_factor 10 Specifies the threshold f
datafusion.execution.batch_size 8192 Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption
datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting
datafusion.execution.collect_statistics false Should DataFusion collect statistics after listing files
datafusion.execution.listing_table_ignore_subdirectory true When scanning file paths, whether to ignore subdirectory files, ignored by default (true), when reading a partitioned table, `listing_table_ignore_subdirectory` is always equal to false, even if set to true
datafusion.execution.max_buffered_batches_per_output_file 2 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption
datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics
datafusion.execution.minimum_parallel_output_files 4 Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached.
Expand Down
38 changes: 37 additions & 1 deletion datafusion/sqllogictest/test_files/parquet.slt
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,39 @@ LIMIT 10;
0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))
0 2014-08-27T14:00:00Z Timestamp(Millisecond, Some("UTC"))

# Test config listing_table_ignore_subdirectory:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


query ITID
COPY (SELECT * FROM src_table WHERE int_col > 6 LIMIT 3)
TO 'test_files/scratch/parquet/test_table/subdir/3.parquet'
(FORMAT PARQUET, SINGLE_FILE_OUTPUT true);
----
3

statement ok
CREATE EXTERNAL TABLE listing_table
STORED AS PARQUET
WITH HEADER ROW
LOCATION 'test_files/scratch/parquet/test_table/*.parquet';

statement ok
set datafusion.execution.listing_table_ignore_subdirectory = true;

# scan file: 0.parquet 1.parquet 2.parquet
query I
select count(*) from listing_table;
----
9

statement ok
set datafusion.execution.listing_table_ignore_subdirectory = false;

# scan file: 0.parquet 1.parquet 2.parquet 3.parquet
query I
select count(*) from listing_table;
----
12

# Clean up
statement ok
DROP TABLE timestamp_with_tz;
Expand Down Expand Up @@ -303,7 +336,6 @@ NULL
statement ok
DROP TABLE single_nan;


statement ok
CREATE EXTERNAL TABLE list_columns
STORED AS PARQUET
Expand All @@ -319,3 +351,7 @@ SELECT int64_list, utf8_list FROM list_columns

statement ok
DROP TABLE list_columns;

# Clean up
statement ok
DROP TABLE listing_table;
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. |
| datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max |
| datafusion.execution.max_buffered_batches_per_output_file | 2 | This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption |
| datafusion.execution.listing_table_ignore_subdirectory | true | When scanning file paths, whether to ignore subdirectory files, ignored by default (true), when reading a partitioned table, `listing_table_ignore_subdirectory` is always equal to false, even if set to true |
| datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. |
| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores |
| datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |
Expand Down