Skip to content

Commit

Permalink
feat: add transform_literal (#287)
Browse files Browse the repository at this point in the history
* add transform_literal

* refine

* fix unwrap

---------

Co-authored-by: ZENOTME <[email protected]>
  • Loading branch information
ZENOTME and ZENOTME authored Mar 27, 2024
1 parent 2018ffc commit bbc8578
Show file tree
Hide file tree
Showing 7 changed files with 639 additions and 66 deletions.
10 changes: 10 additions & 0 deletions crates/iceberg/src/spec/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,16 @@ impl Datum {
)),
}
}

/// Get the primitive literal from datum.
pub fn literal(&self) -> &PrimitiveLiteral {
&self.literal
}

/// Get the primitive type from datum.
pub fn data_type(&self) -> &PrimitiveType {
&self.r#type
}
}

/// Values present in iceberg type
Expand Down
226 changes: 214 additions & 12 deletions crates/iceberg/src/transform/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use std::sync::Arc;
use arrow_array::ArrayRef;
use arrow_schema::{DataType, TimeUnit};

use crate::spec::{Datum, PrimitiveLiteral};

use super::TransformFunction;

#[derive(Debug)]
Expand All @@ -35,39 +37,47 @@ impl Bucket {

impl Bucket {
/// When switch the hash function, we only need to change this function.
#[inline]
fn hash_bytes(mut v: &[u8]) -> i32 {
murmur3::murmur3_32(&mut v, 0).unwrap() as i32
}

#[inline]
fn hash_int(v: i32) -> i32 {
Self::hash_long(v as i64)
}

#[inline]
fn hash_long(v: i64) -> i32 {
Self::hash_bytes(v.to_le_bytes().as_slice())
}

/// v is days from unix epoch
#[inline]
fn hash_date(v: i32) -> i32 {
Self::hash_int(v)
}

/// v is microseconds from midnight
#[inline]
fn hash_time(v: i64) -> i32 {
Self::hash_long(v)
}

/// v is microseconds from unix epoch
#[inline]
fn hash_timestamp(v: i64) -> i32 {
Self::hash_long(v)
}

#[inline]
fn hash_str(s: &str) -> i32 {
Self::hash_bytes(s.as_bytes())
}

/// Decimal values are hashed using the minimum number of bytes required to hold the unscaled value as a two’s complement big-endian
/// ref: https://iceberg.apache.org/spec/#appendix-b-32-bit-hash-requirements
#[inline]
fn hash_decimal(v: i128) -> i32 {
let bytes = v.to_be_bytes();
if let Some(start) = bytes.iter().position(|&x| x != 0) {
Expand All @@ -79,9 +89,50 @@ impl Bucket {

/// def bucket_N(x) = (murmur3_x86_32_hash(x) & Integer.MAX_VALUE) % N
/// ref: https://iceberg.apache.org/spec/#partitioning
#[inline]
fn bucket_n(&self, v: i32) -> i32 {
(v & i32::MAX) % (self.mod_n as i32)
}

#[inline]
fn bucket_int(&self, v: i32) -> i32 {
self.bucket_n(Self::hash_int(v))
}

#[inline]
fn bucket_long(&self, v: i64) -> i32 {
self.bucket_n(Self::hash_long(v))
}

#[inline]
fn bucket_decimal(&self, v: i128) -> i32 {
self.bucket_n(Self::hash_decimal(v))
}

#[inline]
fn bucket_date(&self, v: i32) -> i32 {
self.bucket_n(Self::hash_date(v))
}

#[inline]
fn bucket_time(&self, v: i64) -> i32 {
self.bucket_n(Self::hash_time(v))
}

#[inline]
fn bucket_timestamp(&self, v: i64) -> i32 {
self.bucket_n(Self::hash_timestamp(v))
}

#[inline]
fn bucket_str(&self, v: &str) -> i32 {
self.bucket_n(Self::hash_str(v))
}

#[inline]
fn bucket_bytes(&self, v: &[u8]) -> i32 {
self.bucket_n(Self::hash_bytes(v))
}
}

impl TransformFunction for Bucket {
Expand All @@ -91,82 +142,117 @@ impl TransformFunction for Bucket {
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.unwrap()
.unary(|v| self.bucket_n(Self::hash_int(v))),
.unary(|v| self.bucket_int(v)),
DataType::Int64 => input
.as_any()
.downcast_ref::<arrow_array::Int64Array>()
.unwrap()
.unary(|v| self.bucket_n(Self::hash_long(v))),
.unary(|v| self.bucket_long(v)),
DataType::Decimal128(_, _) => input
.as_any()
.downcast_ref::<arrow_array::Decimal128Array>()
.unwrap()
.unary(|v| self.bucket_n(Self::hash_decimal(v))),
.unary(|v| self.bucket_decimal(v)),
DataType::Date32 => input
.as_any()
.downcast_ref::<arrow_array::Date32Array>()
.unwrap()
.unary(|v| self.bucket_n(Self::hash_date(v))),
.unary(|v| self.bucket_date(v)),
DataType::Time64(TimeUnit::Microsecond) => input
.as_any()
.downcast_ref::<arrow_array::Time64MicrosecondArray>()
.unwrap()
.unary(|v| self.bucket_n(Self::hash_time(v))),
.unary(|v| self.bucket_time(v)),
DataType::Timestamp(TimeUnit::Microsecond, _) => input
.as_any()
.downcast_ref::<arrow_array::TimestampMicrosecondArray>()
.unwrap()
.unary(|v| self.bucket_n(Self::hash_timestamp(v))),
.unary(|v| self.bucket_timestamp(v)),
DataType::Utf8 => arrow_array::Int32Array::from_iter(
input
.as_any()
.downcast_ref::<arrow_array::StringArray>()
.unwrap()
.iter()
.map(|v| self.bucket_n(Self::hash_str(v.unwrap()))),
.map(|v| v.map(|v| self.bucket_str(v))),
),
DataType::LargeUtf8 => arrow_array::Int32Array::from_iter(
input
.as_any()
.downcast_ref::<arrow_array::LargeStringArray>()
.unwrap()
.iter()
.map(|v| self.bucket_n(Self::hash_str(v.unwrap()))),
.map(|v| v.map(|v| self.bucket_str(v))),
),
DataType::Binary => arrow_array::Int32Array::from_iter(
input
.as_any()
.downcast_ref::<arrow_array::BinaryArray>()
.unwrap()
.iter()
.map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))),
.map(|v| v.map(|v| self.bucket_bytes(v))),
),
DataType::LargeBinary => arrow_array::Int32Array::from_iter(
input
.as_any()
.downcast_ref::<arrow_array::LargeBinaryArray>()
.unwrap()
.iter()
.map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))),
.map(|v| v.map(|v| self.bucket_bytes(v))),
),
DataType::FixedSizeBinary(_) => arrow_array::Int32Array::from_iter(
input
.as_any()
.downcast_ref::<arrow_array::FixedSizeBinaryArray>()
.unwrap()
.iter()
.map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))),
.map(|v| v.map(|v| self.bucket_bytes(v))),
),
_ => unreachable!("Unsupported data type: {:?}", input.data_type()),
_ => {
return Err(crate::Error::new(
crate::ErrorKind::FeatureUnsupported,
format!(
"Unsupported data type for bucket transform: {:?}",
input.data_type()
),
))
}
};
Ok(Arc::new(res))
}

fn transform_literal(&self, input: &Datum) -> crate::Result<Option<Datum>> {
let val = match input.literal() {
PrimitiveLiteral::Int(v) => self.bucket_int(*v),
PrimitiveLiteral::Long(v) => self.bucket_long(*v),
PrimitiveLiteral::Decimal(v) => self.bucket_decimal(*v),
PrimitiveLiteral::Date(v) => self.bucket_date(*v),
PrimitiveLiteral::Time(v) => self.bucket_time(*v),
PrimitiveLiteral::Timestamp(v) => self.bucket_timestamp(*v),
PrimitiveLiteral::String(v) => self.bucket_str(v.as_str()),
PrimitiveLiteral::UUID(v) => self.bucket_bytes(v.as_ref()),
PrimitiveLiteral::Binary(v) => self.bucket_bytes(v.as_ref()),
PrimitiveLiteral::Fixed(v) => self.bucket_bytes(v.as_ref()),
_ => {
return Err(crate::Error::new(
crate::ErrorKind::FeatureUnsupported,
format!(
"Unsupported data type for bucket transform: {:?}",
input.data_type()
),
))
}
};
Ok(Some(Datum::int(val)))
}
}

#[cfg(test)]
mod test {
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime};

use crate::{spec::Datum, transform::TransformFunction};

use super::Bucket;
#[test]
fn test_hash() {
Expand Down Expand Up @@ -242,4 +328,120 @@ mod test {
-188683207
);
}

#[test]
fn test_int_literal() {
let bucket = Bucket::new(10);
assert_eq!(
bucket.transform_literal(&Datum::int(34)).unwrap().unwrap(),
Datum::int(9)
);
}

#[test]
fn test_long_literal() {
let bucket = Bucket::new(10);
assert_eq!(
bucket.transform_literal(&Datum::long(34)).unwrap().unwrap(),
Datum::int(9)
);
}

#[test]
fn test_decimal_literal() {
let bucket = Bucket::new(10);
assert_eq!(
bucket
.transform_literal(&Datum::decimal(1420).unwrap())
.unwrap()
.unwrap(),
Datum::int(9)
);
}

#[test]
fn test_date_literal() {
let bucket = Bucket::new(100);
assert_eq!(
bucket
.transform_literal(&Datum::date(17486))
.unwrap()
.unwrap(),
Datum::int(26)
);
}

#[test]
fn test_time_literal() {
let bucket = Bucket::new(100);
assert_eq!(
bucket
.transform_literal(&Datum::time_micros(81068000000).unwrap())
.unwrap()
.unwrap(),
Datum::int(59)
);
}

#[test]
fn test_timestamp_literal() {
let bucket = Bucket::new(100);
assert_eq!(
bucket
.transform_literal(&Datum::timestamp_micros(1510871468000000))
.unwrap()
.unwrap(),
Datum::int(7)
);
}

#[test]
fn test_str_literal() {
let bucket = Bucket::new(100);
assert_eq!(
bucket
.transform_literal(&Datum::string("iceberg"))
.unwrap()
.unwrap(),
Datum::int(89)
);
}

#[test]
fn test_uuid_literal() {
let bucket = Bucket::new(100);
assert_eq!(
bucket
.transform_literal(&Datum::uuid(
"F79C3E09-677C-4BBD-A479-3F349CB785E7".parse().unwrap()
))
.unwrap()
.unwrap(),
Datum::int(40)
);
}

#[test]
fn test_binary_literal() {
let bucket = Bucket::new(128);
assert_eq!(
bucket
.transform_literal(&Datum::binary(b"\x00\x01\x02\x03".to_vec()))
.unwrap()
.unwrap(),
Datum::int(57)
);
}

#[test]
fn test_fixed_literal() {
let bucket = Bucket::new(128);
assert_eq!(
bucket
.transform_literal(&Datum::fixed(b"foo".to_vec()))
.unwrap()
.unwrap(),
Datum::int(32)
);
}
}
4 changes: 4 additions & 0 deletions crates/iceberg/src/transform/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ impl TransformFunction for Identity {
fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
Ok(input)
}

fn transform_literal(&self, input: &crate::spec::Datum) -> Result<Option<crate::spec::Datum>> {
Ok(Some(input.clone()))
}
}
Loading

0 comments on commit bbc8578

Please sign in to comment.