Skip to content

Commit

Permalink
Introduce RowLayout to represent rows for different purposes (#2261)
Browse files Browse the repository at this point in the history
* Introduce RowLayout to represent rows for different purposes

* revert default

* Apply suggestions from code review

Co-authored-by: Andrew Lamb <[email protected]>

* more &schema

* more tests and refactor

* logs for flasky test

* unwanted cargo change

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
yjshen and alamb authored Apr 20, 2022
1 parent 7548e96 commit ec3543b
Show file tree
Hide file tree
Showing 10 changed files with 579 additions and 218 deletions.
35 changes: 30 additions & 5 deletions datafusion/core/benches/jit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ extern crate datafusion;
mod data_utils;
use crate::criterion::Criterion;
use crate::data_utils::{create_record_batches, create_schema};
use datafusion::row::writer::{bench_write_batch, bench_write_batch_jit};
use datafusion::row::jit::writer::bench_write_batch_jit;
use datafusion::row::writer::bench_write_batch;
use datafusion::row::RowType;
use std::sync::Arc;

fn criterion_benchmark(c: &mut Criterion) {
Expand All @@ -35,15 +37,38 @@ fn criterion_benchmark(c: &mut Criterion) {
let batches =
create_record_batches(schema.clone(), array_len, partitions_len, batch_size);

c.bench_function("row serializer", |b| {
c.bench_function("compact row serializer", |b| {
b.iter(|| {
criterion::black_box(bench_write_batch(&batches, schema.clone()).unwrap())
criterion::black_box(
bench_write_batch(&batches, schema.clone(), RowType::Compact).unwrap(),
)
})
});

c.bench_function("row serializer jit", |b| {
c.bench_function("word aligned row serializer", |b| {
b.iter(|| {
criterion::black_box(bench_write_batch_jit(&batches, schema.clone()).unwrap())
criterion::black_box(
bench_write_batch(&batches, schema.clone(), RowType::WordAligned)
.unwrap(),
)
})
});

c.bench_function("compact row serializer jit", |b| {
b.iter(|| {
criterion::black_box(
bench_write_batch_jit(&batches, schema.clone(), RowType::Compact)
.unwrap(),
)
})
});

c.bench_function("word aligned row serializer jit", |b| {
b.iter(|| {
criterion::black_box(
bench_write_batch_jit(&batches, schema.clone(), RowType::WordAligned)
.unwrap(),
)
})
});
}
Expand Down
158 changes: 133 additions & 25 deletions datafusion/core/src/row/jit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

//! Just-In-Time(JIT) version for row reader and writers
mod reader;
mod writer;
pub mod reader;
pub mod writer;

#[macro_export]
/// register external functions to the assembler
Expand Down Expand Up @@ -46,42 +46,43 @@ mod tests {
use crate::error::Result;
use crate::row::jit::reader::read_as_batch_jit;
use crate::row::jit::writer::write_batch_unchecked_jit;
use crate::row::layout::RowType::{Compact, WordAligned};
use arrow::record_batch::RecordBatch;
use arrow::{array::*, datatypes::*};
use datafusion_jit::api::Assembler;
use std::sync::Arc;
use DataType::*;

macro_rules! fn_test_single_type {
($ARRAY: ident, $TYPE: expr, $VEC: expr) => {
($ARRAY: ident, $TYPE: expr, $VEC: expr, $ROWTYPE: expr) => {
paste::item! {
#[test]
#[allow(non_snake_case)]
fn [<test_single_ $TYPE _jit>]() -> Result<()> {
fn [<test_ $ROWTYPE _single_ $TYPE _jit>]() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, true)]));
let a = $ARRAY::from($VEC);
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
let mut vector = vec![0; 1024];
let assembler = Assembler::default();
let row_offsets =
{ write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? };
let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
{ write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler, $ROWTYPE)? };
let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler, $ROWTYPE)? };
assert_eq!(batch, output_batch);
Ok(())
}

#[test]
#[allow(non_snake_case)]
fn [<test_single_ $TYPE _jit_null_free>]() -> Result<()> {
fn [<test_ $ROWTYPE _single_ $TYPE _jit_null_free>]() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)]));
let v = $VEC.into_iter().filter(|o| o.is_some()).collect::<Vec<_>>();
let a = $ARRAY::from(v);
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
let mut vector = vec![0; 1024];
let assembler = Assembler::default();
let row_offsets =
{ write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? };
let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
{ write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler, $ROWTYPE)? };
let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler, $ROWTYPE)? };
assert_eq!(batch, output_batch);
Ok(())
}
Expand All @@ -92,85 +93,190 @@ mod tests {
fn_test_single_type!(
BooleanArray,
Boolean,
vec![Some(true), Some(false), None, Some(true), None]
vec![Some(true), Some(false), None, Some(true), None],
Compact
);

fn_test_single_type!(
BooleanArray,
Boolean,
vec![Some(true), Some(false), None, Some(true), None],
WordAligned
);

fn_test_single_type!(
Int8Array,
Int8,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
Int8Array,
Int8,
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
Int16Array,
Int16,
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
Int16Array,
Int16,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
Int32Array,
Int32,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
Int32Array,
Int32,
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
Int64Array,
Int64,
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
Int64Array,
Int64,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
UInt8Array,
UInt8,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
UInt8Array,
UInt8,
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
UInt16Array,
UInt16,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
UInt16Array,
UInt16,
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
UInt32Array,
UInt32,
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
UInt32Array,
UInt32,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
UInt64Array,
UInt64,
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
UInt64Array,
UInt64,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
Float32Array,
Float32,
vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
Compact
);

fn_test_single_type!(
Float32Array,
Float32,
vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
WordAligned
);

fn_test_single_type!(
Float64Array,
Float64,
vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
Compact
);

fn_test_single_type!(
Float64Array,
Float64,
vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
WordAligned
);

fn_test_single_type!(
Date32Array,
Date32,
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
Date32Array,
Date32,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
Date64Array,
Date64,
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
Date64Array,
Date64,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
StringArray,
Utf8,
vec![Some("hello"), Some("world"), None, Some(""), Some("")]
vec![Some("hello"), Some("world"), None, Some(""), Some("")],
Compact
);

#[test]
Expand All @@ -190,10 +296,11 @@ mod tests {
0,
schema.clone(),
&assembler,
Compact,
)?
};
let output_batch =
{ read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
{ read_as_batch_jit(&vector, schema, &row_offsets, &assembler, Compact)? };
assert_eq!(batch, output_batch);
Ok(())
}
Expand All @@ -214,10 +321,11 @@ mod tests {
0,
schema.clone(),
&assembler,
Compact,
)?
};
let output_batch =
{ read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
{ read_as_batch_jit(&vector, schema, &row_offsets, &assembler, Compact)? };
assert_eq!(batch, output_batch);
Ok(())
}
Expand Down
9 changes: 4 additions & 5 deletions datafusion/core/src/row/jit/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use crate::error::{DataFusionError, Result};
use crate::reg_fn;
use crate::row::jit::fn_name;
use crate::row::layout::RowType;
use crate::row::reader::RowReader;
use crate::row::reader::*;
use crate::row::MutableRecordBatch;
Expand All @@ -38,10 +39,11 @@ pub fn read_as_batch_jit(
schema: Arc<Schema>,
offsets: &[usize],
assembler: &Assembler,
row_type: RowType,
) -> Result<RecordBatch> {
let row_num = offsets.len();
let mut output = MutableRecordBatch::new(row_num, schema.clone());
let mut row = RowReader::new(&schema);
let mut row = RowReader::new(&schema, row_type);
register_read_functions(assembler)?;
let gen_func = gen_read_row(&schema, assembler)?;
let mut jit = assembler.create_jit();
Expand Down Expand Up @@ -102,10 +104,7 @@ fn register_read_functions(asm: &Assembler) -> Result<()> {
Ok(())
}

fn gen_read_row(
schema: &Arc<Schema>,
assembler: &Assembler,
) -> Result<GeneratedFunction> {
fn gen_read_row(schema: &Schema, assembler: &Assembler) -> Result<GeneratedFunction> {
use DataType::*;
let mut builder = assembler
.new_func_builder("read_row")
Expand Down
Loading

0 comments on commit ec3543b

Please sign in to comment.