Skip to content

Commit

Permalink
Optimize base64/hex decoding by pre-allocating output buffers (~2x fa…
Browse files Browse the repository at this point in the history
…ster) (#12675)

* add bench

* replace macro with generic function

* remove duplicated code

* optimize base64/hex decode
  • Loading branch information
simonvandel authored Oct 2, 2024
1 parent dcc018e commit 5a318cd
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 25 deletions.
5 changes: 5 additions & 0 deletions datafusion/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ harness = false
name = "to_timestamp"
required-features = ["datetime_expressions"]

[[bench]]
harness = false
name = "encoding"
required-features = ["encoding_expressions"]

[[bench]]
harness = false
name = "regx"
Expand Down
53 changes: 53 additions & 0 deletions datafusion/functions/benches/encoding.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

extern crate criterion;

use arrow::util::bench_util::create_string_array_with_len;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion_expr::ColumnarValue;
use datafusion_functions::encoding;
use std::sync::Arc;

fn criterion_benchmark(c: &mut Criterion) {
let decode = encoding::decode();
for size in [1024, 4096, 8192] {
let str_array = Arc::new(create_string_array_with_len::<i32>(size, 0.2, 32));
c.bench_function(&format!("base64_decode/{size}"), |b| {
let method = ColumnarValue::Scalar("base64".into());
let encoded = encoding::encode()
.invoke(&[ColumnarValue::Array(str_array.clone()), method.clone()])
.unwrap();

let args = vec![encoded, method];
b.iter(|| black_box(decode.invoke(&args).unwrap()))
});

c.bench_function(&format!("hex_decode/{size}"), |b| {
let method = ColumnarValue::Scalar("hex".into());
let encoded = encoding::encode()
.invoke(&[ColumnarValue::Array(str_array.clone()), method.clone()])
.unwrap();

let args = vec![encoded, method];
b.iter(|| black_box(decode.invoke(&args).unwrap()))
});
}
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
94 changes: 69 additions & 25 deletions datafusion/functions/src/encoding/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
//! Encoding expressions
use arrow::{
array::{Array, ArrayRef, BinaryArray, OffsetSizeTrait, StringArray},
datatypes::DataType,
array::{
Array, ArrayRef, BinaryArray, GenericByteArray, OffsetSizeTrait, StringArray,
},
datatypes::{ByteArrayType, DataType},
};
use arrow_buffer::{Buffer, OffsetBufferBuilder};
use base64::{engine::general_purpose, Engine as _};
use datafusion_common::{
cast::{as_generic_binary_array, as_generic_string_array},
Expand Down Expand Up @@ -245,16 +248,22 @@ fn base64_encode(input: &[u8]) -> String {
general_purpose::STANDARD_NO_PAD.encode(input)
}

fn hex_decode(input: &[u8]) -> Result<Vec<u8>> {
hex::decode(input).map_err(|e| {
fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
// only write input / 2 bytes to buf
let out_len = input.len() / 2;
let buf = &mut buf[..out_len];
hex::decode_to_slice(input, buf).map_err(|e| {
DataFusionError::Internal(format!("Failed to decode from hex: {}", e))
})
})?;
Ok(out_len)
}

fn base64_decode(input: &[u8]) -> Result<Vec<u8>> {
general_purpose::STANDARD_NO_PAD.decode(input).map_err(|e| {
DataFusionError::Internal(format!("Failed to decode from base64: {}", e))
})
fn base64_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
general_purpose::STANDARD_NO_PAD
.decode_slice(input, buf)
.map_err(|e| {
DataFusionError::Internal(format!("Failed to decode from base64: {}", e))
})
}

macro_rules! encode_to_array {
Expand All @@ -267,14 +276,35 @@ macro_rules! encode_to_array {
}};
}

macro_rules! decode_to_array {
($METHOD: ident, $INPUT:expr) => {{
let binary_array: BinaryArray = $INPUT
.iter()
.map(|x| x.map(|x| $METHOD(x.as_ref())).transpose())
.collect::<Result<_>>()?;
Arc::new(binary_array)
}};
fn decode_to_array<F, T: ByteArrayType>(
method: F,
input: &GenericByteArray<T>,
conservative_upper_bound_size: usize,
) -> Result<ArrayRef>
where
F: Fn(&[u8], &mut [u8]) -> Result<usize>,
{
let mut values = vec![0; conservative_upper_bound_size];
let mut offsets = OffsetBufferBuilder::new(input.len());
let mut total_bytes_decoded = 0;
for v in input {
if let Some(v) = v {
let cursor = &mut values[total_bytes_decoded..];
let decoded = method(v.as_ref(), cursor)?;
total_bytes_decoded += decoded;
offsets.push_length(decoded);
} else {
offsets.push_length(0);
}
}
// We reserved an upper bound size for the values buffer, but we only use the actual size
values.truncate(total_bytes_decoded);
let binary_array = BinaryArray::try_new(
offsets.finish(),
Buffer::from_vec(values),
input.nulls().cloned(),
)?;
Ok(Arc::new(binary_array))
}

impl Encoding {
Expand Down Expand Up @@ -381,10 +411,7 @@ impl Encoding {
T: OffsetSizeTrait,
{
let input_value = as_generic_binary_array::<T>(value)?;
let array: ArrayRef = match self {
Self::Base64 => decode_to_array!(base64_decode, input_value),
Self::Hex => decode_to_array!(hex_decode, input_value),
};
let array = self.decode_byte_array(input_value)?;
Ok(ColumnarValue::Array(array))
}

Expand All @@ -393,12 +420,29 @@ impl Encoding {
T: OffsetSizeTrait,
{
let input_value = as_generic_string_array::<T>(value)?;
let array: ArrayRef = match self {
Self::Base64 => decode_to_array!(base64_decode, input_value),
Self::Hex => decode_to_array!(hex_decode, input_value),
};
let array = self.decode_byte_array(input_value)?;
Ok(ColumnarValue::Array(array))
}

fn decode_byte_array<T: ByteArrayType>(
&self,
input_value: &GenericByteArray<T>,
) -> Result<ArrayRef> {
match self {
Self::Base64 => {
let upper_bound =
base64::decoded_len_estimate(input_value.values().len());
decode_to_array(base64_decode, input_value, upper_bound)
}
Self::Hex => {
// Calculate the upper bound for decoded byte size
// For hex encoding, each pair of hex characters (2 bytes) represents 1 byte when decoded
// So the upper bound is half the length of the input values.
let upper_bound = input_value.values().len() / 2;
decode_to_array(hex_decode, input_value, upper_bound)
}
}
}
}

impl fmt::Display for Encoding {
Expand Down

0 comments on commit 5a318cd

Please sign in to comment.