From 0c0077697e55eb154dbfcf3127a3f39e63be2df8 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Thu, 10 Jun 2021 02:16:42 +0800 Subject: [PATCH] refactor lexico sort (#424) --- arrow/Cargo.toml | 4 + arrow/benches/partition_kernels.rs | 142 +++++++++++ arrow/benches/sort_kernel.rs | 4 +- arrow/src/compute/kernels/mod.rs | 1 + arrow/src/compute/kernels/partition.rs | 314 +++++++++++++++++++++++++ arrow/src/compute/mod.rs | 1 + 6 files changed, 464 insertions(+), 2 deletions(-) create mode 100644 arrow/benches/partition_kernels.rs create mode 100644 arrow/src/compute/kernels/partition.rs diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index 337e167cbb96..c3ba8b1ea4be 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -124,6 +124,10 @@ harness = false name = "sort_kernel" harness = false +[[bench]] +name = "partition_kernels" +harness = false + [[bench]] name = "csv_writer" harness = false diff --git a/arrow/benches/partition_kernels.rs b/arrow/benches/partition_kernels.rs new file mode 100644 index 000000000000..6a9ce709d33c --- /dev/null +++ b/arrow/benches/partition_kernels.rs @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[macro_use] +extern crate criterion; +use criterion::Criterion; +use std::sync::Arc; +extern crate arrow; +use arrow::compute::kernels::partition::lexicographical_partition_ranges; +use arrow::compute::kernels::sort::{lexsort, SortColumn}; +use arrow::util::bench_util::*; +use arrow::{ + array::*, + datatypes::{ArrowPrimitiveType, Float64Type, UInt8Type}, +}; +use rand::distributions::{Distribution, Standard}; +use std::iter; + +fn create_array(size: usize, with_nulls: bool) -> ArrayRef +where + Standard: Distribution, +{ + let null_density = if with_nulls { 0.5 } else { 0.0 }; + let array = create_primitive_array::(size, null_density); + Arc::new(array) +} + +fn bench_partition(sorted_columns: &[ArrayRef]) { + let columns = sorted_columns + .iter() + .map(|arr| SortColumn { + values: arr.clone(), + options: None, + }) + .collect::>(); + + criterion::black_box(lexicographical_partition_ranges(&columns).unwrap()); +} + +fn create_sorted_low_cardinality_data(length: usize) -> Vec { + let arr = Int64Array::from_iter_values( + iter::repeat(1) + .take(length / 4) + .chain(iter::repeat(2).take(length / 4)) + .chain(iter::repeat(3).take(length / 4)) + .chain(iter::repeat(4).take(length / 4)), + ); + lexsort( + &[SortColumn { + values: Arc::new(arr), + options: None, + }], + None, + ) + .unwrap() +} + +fn create_sorted_float_data(pow: u32, with_nulls: bool) -> Vec { + lexsort( + &[ + SortColumn { + values: create_array::(2u64.pow(pow) as usize, with_nulls), + options: None, + }, + SortColumn { + values: create_array::(2u64.pow(pow) as usize, with_nulls), + options: None, + }, + ], + None, + ) + .unwrap() +} + +fn create_sorted_data(pow: u32, with_nulls: bool) -> Vec { + lexsort( + &[ + SortColumn { + values: create_array::(2u64.pow(pow) as usize, with_nulls), + options: None, + }, + SortColumn { + values: create_array::(2u64.pow(pow) as usize, with_nulls), + options: None, + }, + ], + None, + ) + .unwrap() +} + +fn add_benchmark(c: &mut Criterion) { + let sorted_columns = create_sorted_data(10, false); + c.bench_function("lexicographical_partition_ranges(u8) 2^10", |b| { + b.iter(|| bench_partition(&sorted_columns)) + }); + + let sorted_columns = create_sorted_data(12, false); + c.bench_function("lexicographical_partition_ranges(u8) 2^12", |b| { + b.iter(|| bench_partition(&sorted_columns)) + }); + + let sorted_columns = create_sorted_data(10, true); + c.bench_function( + "lexicographical_partition_ranges(u8) 2^10 with nulls", + |b| b.iter(|| bench_partition(&sorted_columns)), + ); + + let sorted_columns = create_sorted_data(12, true); + c.bench_function( + "lexicographical_partition_ranges(u8) 2^12 with nulls", + |b| b.iter(|| bench_partition(&sorted_columns)), + ); + + let sorted_columns = create_sorted_float_data(10, false); + c.bench_function("lexicographical_partition_ranges(f64) 2^10", |b| { + b.iter(|| bench_partition(&sorted_columns)) + }); + + let sorted_columns = create_sorted_low_cardinality_data(1024); + c.bench_function( + "lexicographical_partition_ranges(low cardinality) 1024", + |b| b.iter(|| bench_partition(&sorted_columns)), + ); +} + +criterion_group!(benches, add_benchmark); +criterion_main!(benches); diff --git a/arrow/benches/sort_kernel.rs b/arrow/benches/sort_kernel.rs index 74dc0ceae181..8467b500f176 100644 --- a/arrow/benches/sort_kernel.rs +++ b/arrow/benches/sort_kernel.rs @@ -33,10 +33,10 @@ fn create_array(size: usize, with_nulls: bool) -> ArrayRef { Arc::new(array) } -fn bench_sort(arr_a: &ArrayRef, array_b: &ArrayRef, limit: Option) { +fn bench_sort(array_a: &ArrayRef, array_b: &ArrayRef, limit: Option) { let columns = vec![ SortColumn { - values: arr_a.clone(), + values: array_a.clone(), options: None, }, SortColumn { diff --git a/arrow/src/compute/kernels/mod.rs b/arrow/src/compute/kernels/mod.rs index 862f55fe2f23..a0ef50a7b85a 100644 --- a/arrow/src/compute/kernels/mod.rs +++ b/arrow/src/compute/kernels/mod.rs @@ -28,6 +28,7 @@ pub mod concat; pub mod filter; pub mod length; pub mod limit; +pub mod partition; pub mod regexp; pub mod sort; pub mod substring; diff --git a/arrow/src/compute/kernels/partition.rs b/arrow/src/compute/kernels/partition.rs new file mode 100644 index 000000000000..e91f80bb558f --- /dev/null +++ b/arrow/src/compute/kernels/partition.rs @@ -0,0 +1,314 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines partition kernel for `ArrayRef` + +use crate::compute::kernels::sort::LexicographicalComparator; +use crate::compute::SortColumn; +use crate::error::{ArrowError, Result}; +use std::cmp::Ordering; +use std::ops::Range; + +/// Given a list of already sorted columns, find partition ranges that would partition +/// lexicographically equal values across columns. +/// +/// Here LexicographicalComparator is used in conjunction with binary +/// search so the columns *MUST* be pre-sorted already. +/// +/// The returned vec would be of size k where k is cardinality of the sorted values; Consecutive +/// values will be connected: (a, b) and (b, c), where start = 0 and end = n for the first and last +/// range. +pub fn lexicographical_partition_ranges( + columns: &[SortColumn], +) -> Result>> { + let partition_points = lexicographical_partition_points(columns)?; + Ok(partition_points + .iter() + .zip(partition_points[1..].iter()) + .map(|(&start, &end)| Range { start, end }) + .collect()) +} + +/// Given a list of already sorted columns, find partition ranges that would partition +/// lexicographically equal values across columns. +/// +/// Here LexicographicalComparator is used in conjunction with binary +/// search so the columns *MUST* be pre-sorted already. +/// +/// The returned vec would be of size k+1 where k is cardinality of the sorted values; the first and +/// last value would be 0 and n. +fn lexicographical_partition_points(columns: &[SortColumn]) -> Result> { + if columns.is_empty() { + return Err(ArrowError::InvalidArgumentError( + "Sort requires at least one column".to_string(), + )); + } + let row_count = columns[0].values.len(); + if columns.iter().any(|item| item.values.len() != row_count) { + return Err(ArrowError::ComputeError( + "Lexical sort columns have different row counts".to_string(), + )); + }; + + let mut result = vec![]; + if row_count == 0 { + return Ok(result); + } + + let lexicographical_comparator = LexicographicalComparator::try_new(columns)?; + let value_indices = (0..row_count).collect::>(); + + let mut previous_partition_point = 0; + result.push(previous_partition_point); + while previous_partition_point < row_count { + // invariant: + // value_indices[0..previous_partition_point] all are values <= value_indices[previous_partition_point] + // so in order to save time we can do binary search on the value_indices[previous_partition_point..] + // and find when any value is greater than value_indices[previous_partition_point]; because we are using + // new indices, the new offset is _added_ to the previous_partition_point. + // + // be careful that idx is of type &usize which points to the actual value within value_indices, which itself + // contains usize (0..row_count), providing access to lexicographical_comparator as pointers into the + // original columnar data. + previous_partition_point += value_indices[previous_partition_point..] + .partition_point(|idx| { + lexicographical_comparator.compare(idx, &previous_partition_point) + != Ordering::Greater + }); + result.push(previous_partition_point); + } + + Ok(result) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::array::*; + use crate::compute::SortOptions; + use crate::datatypes::DataType; + use std::sync::Arc; + + #[test] + fn test_lexicographical_partition_points_empty() { + let input = vec![]; + assert!( + lexicographical_partition_points(&input).is_err(), + "lexicographical_partition_points should reject columns with empty rows" + ); + } + + #[test] + fn test_lexicographical_partition_points_unaligned_rows() { + let input = vec![ + SortColumn { + values: Arc::new(Int64Array::from(vec![None, Some(-1)])) as ArrayRef, + options: None, + }, + SortColumn { + values: Arc::new(StringArray::from(vec![Some("foo")])) as ArrayRef, + options: None, + }, + ]; + assert!( + lexicographical_partition_points(&input).is_err(), + "lexicographical_partition_points should reject columns with different row counts" + ); + } + + #[test] + fn test_lexicographical_partition_single_column() -> Result<()> { + let input = vec![SortColumn { + values: Arc::new(Int64Array::from(vec![1, 2, 2, 2, 2, 2, 2, 2, 9])) + as ArrayRef, + options: Some(SortOptions { + descending: false, + nulls_first: true, + }), + }]; + { + let results = lexicographical_partition_points(&input)?; + assert_eq!(vec![0, 1, 8, 9], results); + } + { + let results = lexicographical_partition_ranges(&input)?; + assert_eq!( + vec![(0_usize..1_usize), (1_usize..8_usize), (8_usize..9_usize)], + results + ); + } + Ok(()) + } + + #[test] + fn test_lexicographical_partition_all_equal_values() -> Result<()> { + let input = vec![SortColumn { + values: Arc::new(Int64Array::from_value(1, 1000)) as ArrayRef, + options: Some(SortOptions { + descending: false, + nulls_first: true, + }), + }]; + { + let results = lexicographical_partition_points(&input)?; + assert_eq!(vec![0, 1000], results); + } + { + let results = lexicographical_partition_ranges(&input)?; + assert_eq!(vec![(0_usize..1000_usize)], results); + } + Ok(()) + } + + #[test] + fn test_lexicographical_partition_all_null_values() -> Result<()> { + let input = vec![ + SortColumn { + values: new_null_array(&DataType::Int8, 1000), + options: Some(SortOptions { + descending: false, + nulls_first: true, + }), + }, + SortColumn { + values: new_null_array(&DataType::UInt16, 1000), + options: Some(SortOptions { + descending: false, + nulls_first: false, + }), + }, + ]; + { + let results = lexicographical_partition_points(&input)?; + assert_eq!(vec![0, 1000], results); + } + { + let results = lexicographical_partition_ranges(&input)?; + assert_eq!(vec![(0_usize..1000_usize)], results); + } + Ok(()) + } + + #[test] + fn test_lexicographical_partition_unique_column_1() -> Result<()> { + let input = vec![ + SortColumn { + values: Arc::new(Int64Array::from(vec![None, Some(-1)])) as ArrayRef, + options: Some(SortOptions { + descending: false, + nulls_first: true, + }), + }, + SortColumn { + values: Arc::new(StringArray::from(vec![Some("foo"), Some("bar")])) + as ArrayRef, + options: Some(SortOptions { + descending: true, + nulls_first: true, + }), + }, + ]; + { + let results = lexicographical_partition_points(&input)?; + assert_eq!(vec![0, 1, 2], results); + } + { + let results = lexicographical_partition_ranges(&input)?; + assert_eq!(vec![(0_usize..1_usize), (1_usize..2_usize)], results); + } + Ok(()) + } + + #[test] + fn test_lexicographical_partition_unique_column_2() -> Result<()> { + let input = vec![ + SortColumn { + values: Arc::new(Int64Array::from(vec![None, Some(-1), Some(-1)])) + as ArrayRef, + options: Some(SortOptions { + descending: false, + nulls_first: true, + }), + }, + SortColumn { + values: Arc::new(StringArray::from(vec![ + Some("foo"), + Some("bar"), + Some("apple"), + ])) as ArrayRef, + options: Some(SortOptions { + descending: true, + nulls_first: true, + }), + }, + ]; + { + let results = lexicographical_partition_points(&input)?; + assert_eq!(vec![0, 1, 2, 3], results); + } + { + let results = lexicographical_partition_ranges(&input)?; + assert_eq!( + vec![(0_usize..1_usize), (1_usize..2_usize), (2_usize..3_usize),], + results + ); + } + Ok(()) + } + + #[test] + fn test_lexicographical_partition_non_unique_column_1() -> Result<()> { + let input = vec![ + SortColumn { + values: Arc::new(Int64Array::from(vec![ + None, + Some(-1), + Some(-1), + Some(1), + ])) as ArrayRef, + options: Some(SortOptions { + descending: false, + nulls_first: true, + }), + }, + SortColumn { + values: Arc::new(StringArray::from(vec![ + Some("foo"), + Some("bar"), + Some("bar"), + Some("bar"), + ])) as ArrayRef, + options: Some(SortOptions { + descending: true, + nulls_first: true, + }), + }, + ]; + { + let results = lexicographical_partition_points(&input)?; + assert_eq!(vec![0, 1, 3, 4], results); + } + { + let results = lexicographical_partition_ranges(&input)?; + assert_eq!( + vec![(0_usize..1_usize), (1_usize..3_usize), (3_usize..4_usize),], + results + ); + } + Ok(()) + } +} diff --git a/arrow/src/compute/mod.rs b/arrow/src/compute/mod.rs index 166f1568359c..2b3b9a76873a 100644 --- a/arrow/src/compute/mod.rs +++ b/arrow/src/compute/mod.rs @@ -30,6 +30,7 @@ pub use self::kernels::comparison::*; pub use self::kernels::concat::*; pub use self::kernels::filter::*; pub use self::kernels::limit::*; +pub use self::kernels::partition::*; pub use self::kernels::regexp::*; pub use self::kernels::sort::*; pub use self::kernels::take::*;