Skip to content

Commit

Permalink
Fix sort order aware file group parallelization (#8517)
Browse files Browse the repository at this point in the history
* Minor: Extract file group repartitioning and tests into `FileGroupRepartitioner`

* Implement sort order aware redistribution
  • Loading branch information
alamb authored Dec 17, 2023
1 parent 0f83ffc commit 2e16c75
Show file tree
Hide file tree
Showing 8 changed files with 918 additions and 446 deletions.
16 changes: 11 additions & 5 deletions datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub type PartitionedFileStream =
/// Only scan a subset of Row Groups from the Parquet file whose data "midpoint"
/// lies within the [start, end) byte offsets. This option can be used to scan non-overlapping
/// sections of a Parquet file in parallel.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)]
pub struct FileRange {
/// Range start
pub start: i64,
Expand Down Expand Up @@ -70,13 +70,12 @@ pub struct PartitionedFile {
/// An optional field for user defined per object metadata
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
}

impl PartitionedFile {
/// Create a simple file without metadata or partition
pub fn new(path: String, size: u64) -> Self {
pub fn new(path: impl Into<String>, size: u64) -> Self {
Self {
object_meta: ObjectMeta {
location: Path::from(path),
location: Path::from(path.into()),
last_modified: chrono::Utc.timestamp_nanos(0),
size: size as usize,
e_tag: None,
Expand All @@ -99,9 +98,10 @@ impl PartitionedFile {
version: None,
},
partition_values: vec![],
range: Some(FileRange { start, end }),
range: None,
extensions: None,
}
.with_range(start, end)
}

/// Return a file reference from the given path
Expand All @@ -114,6 +114,12 @@ impl PartitionedFile {
pub fn path(&self) -> &Path {
&self.object_meta.location
}

/// Update the file to only scan the specified range (in bytes)
pub fn with_range(mut self, start: i64, end: i64) -> Self {
self.range = Some(FileRange { start, end });
self
}
}

impl From<ObjectMeta> for PartitionedFile {
Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::ops::Range;
use std::sync::Arc;
use std::task::Poll;

use super::FileScanConfig;
use super::{FileGroupPartitioner, FileScanConfig};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::listing::{FileRange, ListingTableUrl};
use crate::datasource::physical_plan::file_stream::{
Expand Down Expand Up @@ -177,7 +177,7 @@ impl ExecutionPlan for CsvExec {
}

/// Redistribute files across partitions according to their size
/// See comments on `repartition_file_groups()` for more detail.
/// See comments on [`FileGroupPartitioner`] for more detail.
///
/// Return `None` if can't get repartitioned(empty/compressed file).
fn repartitioned(
Expand All @@ -191,11 +191,11 @@ impl ExecutionPlan for CsvExec {
return Ok(None);
}

let repartitioned_file_groups_option = FileScanConfig::repartition_file_groups(
self.base_config.file_groups.clone(),
target_partitions,
repartition_file_min_size,
);
let repartitioned_file_groups_option = FileGroupPartitioner::new()
.with_target_partitions(target_partitions)
.with_preserve_order_within_groups(self.output_ordering().is_some())
.with_repartition_file_min_size(repartition_file_min_size)
.repartition_file_groups(&self.base_config.file_groups);

if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
let mut new_plan = self.clone();
Expand Down
Loading

0 comments on commit 2e16c75

Please sign in to comment.