Skip to content

Commit

Permalink
Improve stats convert performance for Binary/String/Boolean arrays (#…
Browse files Browse the repository at this point in the history
…11319)

* add u64 poc.

* use env to support the quick bench.

* use flatten in builder mode yet.

* add new mode.

* use Builder in Utf8 and LargeUtf8 page level stats' convert.

* use Builder in row group level stats.

* eliminate some unnecessary tmp `Vec`s.

* remove the quick modify in bench.

* process the result return from append_value of FixedSizeBinaryBuilder.

* remove the modification of FixedSizeBinary&Bool, and fix the String case.

* fix and re-enable the modification of FixedSizeBinary.

* fix comments.

* use BooleanBuilder to eliminate the collect in BooleanArray case.

* fix compile.
  • Loading branch information
Rachelint authored Jul 8, 2024
1 parent 1e39a85 commit c254b8b
Showing 1 changed file with 130 additions and 85 deletions.
215 changes: 130 additions & 85 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@
// TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328

use arrow::array::builder::FixedSizeBinaryBuilder;
use arrow::array::{
BooleanBuilder, FixedSizeBinaryBuilder, LargeStringBuilder, StringBuilder,
};
use arrow::datatypes::i256;
use arrow::{array::ArrayRef, datatypes::DataType};
use arrow_array::{
new_empty_array, new_null_array, BinaryArray, BooleanArray, Date32Array, Date64Array,
Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array, Float32Array,
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
LargeStringArray, StringArray, Time32MillisecondArray, Time32SecondArray,
Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
UInt16Array, UInt32Array, UInt64Array, UInt8Array,
Decimal128Array, Decimal256Array, Float16Array, Float32Array, Float64Array,
Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array,
UInt64Array, UInt8Array,
};
use arrow_schema::{Field, FieldRef, Schema, TimeUnit};
use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result};
Expand Down Expand Up @@ -393,51 +395,73 @@ macro_rules! get_statistics {
})
},
DataType::Binary => Ok(Arc::new(BinaryArray::from_iter(
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| x.map(|x| x.to_vec())),
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator)
))),
DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter(
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| x.map(|x|x.to_vec())),
))),
DataType::Utf8 => Ok(Arc::new(StringArray::from_iter(
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| {
x.and_then(|x| {
let res = std::str::from_utf8(x).map(|s| s.to_string()).ok();
if res.is_none() {
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
}
res
})
}),
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator)
))),
DataType::Utf8 => {
let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
let mut builder = StringBuilder::new();
for x in iterator {
let Some(x) = x else {
builder.append_null(); // no statistics value
continue;
};

let Ok(x) = std::str::from_utf8(x) else {
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
builder.append_null();
continue;
};

builder.append_value(x);
}
Ok(Arc::new(builder.finish()))
},
DataType::LargeUtf8 => {
Ok(Arc::new(LargeStringArray::from_iter(
[<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| {
x.and_then(|x| {
let res = std::str::from_utf8(x).map(|s| s.to_string()).ok();
if res.is_none() {
log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it.");
}
res
})
}),
)))
}
DataType::FixedSizeBinary(size) => Ok(Arc::new(FixedSizeBinaryArray::from(
[<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| {
x.and_then(|x| {
if x.len().try_into() == Ok(*size) {
Some(x)
} else {
log::debug!(
"FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.",
size,
x.len(),
);
None
}
})
}).collect::<Vec<_>>(),
))),
let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
let mut builder = LargeStringBuilder::new();
for x in iterator {
let Some(x) = x else {
builder.append_null(); // no statistics value
continue;
};

let Ok(x) = std::str::from_utf8(x) else {
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
builder.append_null();
continue;
};

builder.append_value(x);
}
Ok(Arc::new(builder.finish()))
},
DataType::FixedSizeBinary(size) => {
let iterator = [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator);
let mut builder = FixedSizeBinaryBuilder::new(*size);
for x in iterator {
let Some(x) = x else {
builder.append_null(); // no statistics value
continue;
};

// ignore invalid values
if x.len().try_into() != Ok(*size){
log::debug!(
"FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.",
size,
x.len(),
);
builder.append_null();
continue;
}

builder.append_value(x).expect("ensure to append successfully here, because size have been checked before");
}
Ok(Arc::new(builder.finish()))
},
DataType::Decimal128(precision, scale) => {
let arr = Decimal128Array::from_iter(
[<$stat_type_prefix Decimal128StatsIterator>]::new($iterator)
Expand Down Expand Up @@ -740,15 +764,20 @@ macro_rules! get_data_page_statistics {
($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
paste! {
match $data_type {
Some(DataType::Boolean) => Ok(Arc::new(
BooleanArray::from_iter(
[<$stat_type_prefix BooleanDataPageStatsIterator>]::new($iterator)
.flatten()
// BooleanArray::from_iter required a sized iterator, so collect into Vec first
.collect::<Vec<_>>()
.into_iter()
)
)),
Some(DataType::Boolean) => {
let iterator = [<$stat_type_prefix BooleanDataPageStatsIterator>]::new($iterator);
let mut builder = BooleanBuilder::new();
for x in iterator {
for x in x.into_iter() {
let Some(x) = x else {
builder.append_null(); // no statistics value
continue;
};
builder.append_value(x);
}
}
Ok(Arc::new(builder.finish()))
},
Some(DataType::UInt8) => Ok(Arc::new(
UInt8Array::from_iter(
[<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
Expand Down Expand Up @@ -830,32 +859,48 @@ macro_rules! get_data_page_statistics {
Some(DataType::Float64) => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::Binary) => Ok(Arc::new(BinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::LargeBinary) => Ok(Arc::new(LargeBinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::Utf8) => Ok(Arc::new(StringArray::from(
[<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| {
x.into_iter().map(|x| {
x.and_then(|x| {
let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok();
if res.is_none() {
Some(DataType::Utf8) => {
let mut builder = StringBuilder::new();
let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
for x in iterator {
for x in x.into_iter() {
let Some(x) = x else {
builder.append_null(); // no statistics value
continue;
};

let Ok(x) = std::str::from_utf8(x.data()) else {
log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it.");
}
res
})
})
}).flatten().collect::<Vec<_>>(),
))),
Some(DataType::LargeUtf8) => Ok(Arc::new(LargeStringArray::from(
[<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).map(|x| {
x.into_iter().map(|x| {
x.and_then(|x| {
let res = std::str::from_utf8(x.data()).map(|s| s.to_string()).ok();
if res.is_none() {
log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it.");
}
res
})
})
}).flatten().collect::<Vec<_>>(),
))),
builder.append_null();
continue;
};

builder.append_value(x);
}
}
Ok(Arc::new(builder.finish()))
},
Some(DataType::LargeUtf8) => {
let mut builder = LargeStringBuilder::new();
let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
for x in iterator {
for x in x.into_iter() {
let Some(x) = x else {
builder.append_null(); // no statistics value
continue;
};

let Ok(x) = std::str::from_utf8(x.data()) else {
log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it.");
builder.append_null();
continue;
};

builder.append_value(x);
}
}
Ok(Arc::new(builder.finish()))
},
Some(DataType::Dictionary(_, value_type)) => {
[<$stat_type_prefix:lower _ page_statistics>](Some(value_type), $iterator)
},
Expand All @@ -871,14 +916,14 @@ macro_rules! get_data_page_statistics {
Some(DataType::Date32) => Ok(Arc::new(Date32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::Date64) => Ok(
Arc::new(
Date64Array::from([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
Date64Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
x.into_iter()
.map(|x| {
x.and_then(|x| i64::try_from(x).ok())
.map(|x| x * 24 * 60 * 60 * 1000)
})
}).flatten().collect::<Vec<_>>()
.map(|x| x.map(|x| x * 24 * 60 * 60 * 1000))
}).flatten()
)
)
),
Expand Down

0 comments on commit c254b8b

Please sign in to comment.