Skip to content

Commit

Permalink
Speed up filter_record_batch with one array (#637) (#666)
Browse files Browse the repository at this point in the history
* Speed up filter_record_batch with one array

* Don't into()

Co-authored-by: Daniël Heres <[email protected]>
  • Loading branch information
alamb and Dandandan authored Aug 8, 2021
1 parent dace74b commit 107a604
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 8 deletions.
19 changes: 17 additions & 2 deletions arrow/benches/filter_kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
// under the License.
extern crate arrow;

use arrow::compute::Filter;
use std::sync::Arc;

use arrow::compute::{filter_record_batch, Filter};
use arrow::record_batch::RecordBatch;
use arrow::util::bench_util::*;

use arrow::array::*;
use arrow::compute::{build_filter, filter};
use arrow::datatypes::{Float32Type, UInt8Type};
use arrow::datatypes::{Field, Float32Type, Schema, UInt8Type};

use criterion::{criterion_group, criterion_main, Criterion};

Expand Down Expand Up @@ -100,6 +103,18 @@ fn add_benchmark(c: &mut Criterion) {
c.bench_function("filter context string low selectivity", |b| {
b.iter(|| bench_built_filter(&sparse_filter, &data_array))
});

let data_array = create_primitive_array::<Float32Type>(size, 0.0);

let field = Field::new("c1", data_array.data_type().clone(), true);
let schema = Schema::new(vec![field]);

let batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data_array)]).unwrap();

c.bench_function("filter single record batch", |b| {
b.iter(|| filter_record_batch(&batch, &filter_array))
});
}

criterion_group!(benches, add_benchmark);
Expand Down
21 changes: 15 additions & 6 deletions arrow/src/compute/kernels/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,21 @@ pub fn filter_record_batch(
return filter_record_batch(record_batch, &predicate);
}

let filter = build_filter(predicate)?;
let filtered_arrays = record_batch
.columns()
.iter()
.map(|a| make_array(filter(a.data())))
.collect();
let num_colums = record_batch.columns().len();

let filtered_arrays = match num_colums {
1 => {
vec![filter(record_batch.columns()[0].as_ref(), predicate)?]
}
_ => {
let filter = build_filter(predicate)?;
record_batch
.columns()
.iter()
.map(|a| make_array(filter(a.data())))
.collect()
}
};
RecordBatch::try_new(record_batch.schema(), filtered_arrays)
}

Expand Down

0 comments on commit 107a604

Please sign in to comment.