Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize SortPreservingMergeExec for single-column merge #13642

Closed
Tracked by #10313
Dandandan opened this issue Dec 4, 2024 · 7 comments
Closed
Tracked by #10313

Optimize SortPreservingMergeExec for single-column merge #13642

Dandandan opened this issue Dec 4, 2024 · 7 comments
Assignees
Labels
enhancement New feature or request performance Make DataFusion faster

Comments

@Dandandan
Copy link
Contributor

Dandandan commented Dec 4, 2024

Is your feature request related to a problem or challenge?

Describe the solution you'd like

Optimize the SortPreservingMergeStream for single column sorts, only converting to row-format when dealing with multi-column sorts.

Describe alternatives you've considered

No response

Additional context

This should be beneficial for many queries involving SortPreservingMerge, such as #13586

@Dandandan Dandandan added enhancement New feature or request performance Make DataFusion faster labels Dec 4, 2024
@alan910127
Copy link
Contributor

take

@comphead
Copy link
Contributor

comphead commented Dec 6, 2024

Hi @Dandandan does the SortPreservingMergeStream work with row format? I checked quickly https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/sorts/merge.rs#L44 I dont see RowConverter or SortFields in there. Am I missing something?

@alan910127
Copy link
Contributor

As shown in my flamegraph profile, there are code paths that invoke arrow::Row::cmp.
From what I understand, RowValues (which implements CursorValues and whose cmp method is called in is_gt) might come from RowCursorStream::convert_batch.
However, I’m not entirely sure if this is accurate or if it can explain for the lack of usage of RowConverter or SortFields.

@comphead
Copy link
Contributor

comphead commented Dec 7, 2024

Thanks for the flamegraph, indeed looks like it happens when dealing with working with PartitionedStream. namely in

let cursor = self.convert_batch(&batch)?;

and type CursorStream<C> = Box<dyn PartitionedStream<Output = Result<(C, RecordBatch)>>>; so it explains

@jayzhan211
Copy link
Contributor

It seems single column case is optimized here

// Special case single column comparisons with optimized cursor implementations
if expressions.len() == 1 {
let sort = expressions[0].clone();
let data_type = sort.expr.data_type(schema.as_ref())?;
downcast_primitive! {
data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker),
DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
_ => {}
}
}

@alan910127
Copy link
Contributor

alan910127 commented Feb 4, 2025

EDIT: seems like the first SortExpr comes from PARTITION BY id6, than this may not be a single-column sort problem?

I've found that the sort isn't actually planned as a single-column sort. The expressions in this case contains multiple columns:

// SQL: row_number() over (partition by id6 order by v3 desc)
LexOrdering { 
    inner: [
        PhysicalSortExpr {
            expr: Column { name: "id6", index: 0 },
            options: SortOptions { descending: false, nulls_first: false }
        },
        PhysicalSortExpr {
            expr: Column { name: "v3", index: 1 }, 
            options: SortOptions { descending: true, nulls_first: true }
        }
    ]
}

I'm not sure whether this behavior is expected or if the plan should have used a single-column sort optimization in this case.

@Dandandan Dandandan changed the title Optimize SortPreservingMergeStream for single-column merge Optimize SortExec for single-column merge Feb 4, 2025
@Dandandan Dandandan changed the title Optimize SortExec for single-column merge Optimize SortPreservingMergeExec for single-column merge Feb 4, 2025
@Dandandan
Copy link
Contributor Author

Hm I see the query has two
I am not totally sure where I found the single row still in SortPreservingMergeStream.

I do think however there is opportunity for sort_batch in ExternalSorter (

) I'll open a new ticket about that one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request performance Make DataFusion faster
Projects
None yet
Development

No branches or pull requests

4 participants