Skip to content

Commit

Permalink
refactor lexico sort (#424)
Browse files Browse the repository at this point in the history
  • Loading branch information
jimexist authored Jun 9, 2021
1 parent 5f7d111 commit 0c00776
Show file tree
Hide file tree
Showing 6 changed files with 464 additions and 2 deletions.
4 changes: 4 additions & 0 deletions arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ harness = false
name = "sort_kernel"
harness = false

[[bench]]
name = "partition_kernels"
harness = false

[[bench]]
name = "csv_writer"
harness = false
Expand Down
142 changes: 142 additions & 0 deletions arrow/benches/partition_kernels.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#[macro_use]
extern crate criterion;
use criterion::Criterion;
use std::sync::Arc;
extern crate arrow;
use arrow::compute::kernels::partition::lexicographical_partition_ranges;
use arrow::compute::kernels::sort::{lexsort, SortColumn};
use arrow::util::bench_util::*;
use arrow::{
array::*,
datatypes::{ArrowPrimitiveType, Float64Type, UInt8Type},
};
use rand::distributions::{Distribution, Standard};
use std::iter;

fn create_array<T: ArrowPrimitiveType>(size: usize, with_nulls: bool) -> ArrayRef
where
Standard: Distribution<T::Native>,
{
let null_density = if with_nulls { 0.5 } else { 0.0 };
let array = create_primitive_array::<T>(size, null_density);
Arc::new(array)
}

fn bench_partition(sorted_columns: &[ArrayRef]) {
let columns = sorted_columns
.iter()
.map(|arr| SortColumn {
values: arr.clone(),
options: None,
})
.collect::<Vec<_>>();

criterion::black_box(lexicographical_partition_ranges(&columns).unwrap());
}

fn create_sorted_low_cardinality_data(length: usize) -> Vec<ArrayRef> {
let arr = Int64Array::from_iter_values(
iter::repeat(1)
.take(length / 4)
.chain(iter::repeat(2).take(length / 4))
.chain(iter::repeat(3).take(length / 4))
.chain(iter::repeat(4).take(length / 4)),
);
lexsort(
&[SortColumn {
values: Arc::new(arr),
options: None,
}],
None,
)
.unwrap()
}

fn create_sorted_float_data(pow: u32, with_nulls: bool) -> Vec<ArrayRef> {
lexsort(
&[
SortColumn {
values: create_array::<Float64Type>(2u64.pow(pow) as usize, with_nulls),
options: None,
},
SortColumn {
values: create_array::<Float64Type>(2u64.pow(pow) as usize, with_nulls),
options: None,
},
],
None,
)
.unwrap()
}

fn create_sorted_data(pow: u32, with_nulls: bool) -> Vec<ArrayRef> {
lexsort(
&[
SortColumn {
values: create_array::<UInt8Type>(2u64.pow(pow) as usize, with_nulls),
options: None,
},
SortColumn {
values: create_array::<UInt8Type>(2u64.pow(pow) as usize, with_nulls),
options: None,
},
],
None,
)
.unwrap()
}

fn add_benchmark(c: &mut Criterion) {
let sorted_columns = create_sorted_data(10, false);
c.bench_function("lexicographical_partition_ranges(u8) 2^10", |b| {
b.iter(|| bench_partition(&sorted_columns))
});

let sorted_columns = create_sorted_data(12, false);
c.bench_function("lexicographical_partition_ranges(u8) 2^12", |b| {
b.iter(|| bench_partition(&sorted_columns))
});

let sorted_columns = create_sorted_data(10, true);
c.bench_function(
"lexicographical_partition_ranges(u8) 2^10 with nulls",
|b| b.iter(|| bench_partition(&sorted_columns)),
);

let sorted_columns = create_sorted_data(12, true);
c.bench_function(
"lexicographical_partition_ranges(u8) 2^12 with nulls",
|b| b.iter(|| bench_partition(&sorted_columns)),
);

let sorted_columns = create_sorted_float_data(10, false);
c.bench_function("lexicographical_partition_ranges(f64) 2^10", |b| {
b.iter(|| bench_partition(&sorted_columns))
});

let sorted_columns = create_sorted_low_cardinality_data(1024);
c.bench_function(
"lexicographical_partition_ranges(low cardinality) 1024",
|b| b.iter(|| bench_partition(&sorted_columns)),
);
}

criterion_group!(benches, add_benchmark);
criterion_main!(benches);
4 changes: 2 additions & 2 deletions arrow/benches/sort_kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ fn create_array(size: usize, with_nulls: bool) -> ArrayRef {
Arc::new(array)
}

fn bench_sort(arr_a: &ArrayRef, array_b: &ArrayRef, limit: Option<usize>) {
fn bench_sort(array_a: &ArrayRef, array_b: &ArrayRef, limit: Option<usize>) {
let columns = vec![
SortColumn {
values: arr_a.clone(),
values: array_a.clone(),
options: None,
},
SortColumn {
Expand Down
1 change: 1 addition & 0 deletions arrow/src/compute/kernels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub mod concat;
pub mod filter;
pub mod length;
pub mod limit;
pub mod partition;
pub mod regexp;
pub mod sort;
pub mod substring;
Expand Down
Loading

0 comments on commit 0c00776

Please sign in to comment.