Skip to content

Commit

Permalink
add transform_literal
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Mar 25, 2024
1 parent fd9aa71 commit 48c744b
Show file tree
Hide file tree
Showing 6 changed files with 807 additions and 56 deletions.
258 changes: 247 additions & 11 deletions crates/iceberg/src/transform/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use std::sync::Arc;
use arrow_array::ArrayRef;
use arrow_schema::{DataType, TimeUnit};

use crate::spec::{Literal, PrimitiveLiteral};

use super::TransformFunction;

#[derive(Debug)]
Expand All @@ -35,39 +37,47 @@ impl Bucket {

impl Bucket {
/// When switch the hash function, we only need to change this function.
#[inline]
fn hash_bytes(mut v: &[u8]) -> i32 {
murmur3::murmur3_32(&mut v, 0).unwrap() as i32
}

#[inline]
fn hash_int(v: i32) -> i32 {
Self::hash_long(v as i64)
}

#[inline]
fn hash_long(v: i64) -> i32 {
Self::hash_bytes(v.to_le_bytes().as_slice())
}

/// v is days from unix epoch
#[inline]
fn hash_date(v: i32) -> i32 {
Self::hash_int(v)
}

/// v is microseconds from midnight
#[inline]
fn hash_time(v: i64) -> i32 {
Self::hash_long(v)
}

/// v is microseconds from unix epoch
#[inline]
fn hash_timestamp(v: i64) -> i32 {
Self::hash_long(v)
}

#[inline]
fn hash_str(s: &str) -> i32 {
Self::hash_bytes(s.as_bytes())
}

/// Decimal values are hashed using the minimum number of bytes required to hold the unscaled value as a two’s complement big-endian
/// ref: https://iceberg.apache.org/spec/#appendix-b-32-bit-hash-requirements
#[inline]
fn hash_decimal(v: i128) -> i32 {
let bytes = v.to_be_bytes();
if let Some(start) = bytes.iter().position(|&x| x != 0) {
Expand All @@ -79,9 +89,50 @@ impl Bucket {

/// def bucket_N(x) = (murmur3_x86_32_hash(x) & Integer.MAX_VALUE) % N
/// ref: https://iceberg.apache.org/spec/#partitioning
#[inline]
fn bucket_n(&self, v: i32) -> i32 {
(v & i32::MAX) % (self.mod_n as i32)
}

#[inline]
fn bucket_int(&self, v: i32) -> i32 {
self.bucket_n(Self::hash_int(v))
}

#[inline]
fn bucket_long(&self, v: i64) -> i32 {
self.bucket_n(Self::hash_long(v))
}

#[inline]
fn bucket_decimal(&self, v: i128) -> i32 {
self.bucket_n(Self::hash_decimal(v))
}

#[inline]
fn bucket_date(&self, v: i32) -> i32 {
self.bucket_n(Self::hash_date(v))
}

#[inline]
fn bucket_time(&self, v: i64) -> i32 {
self.bucket_n(Self::hash_time(v))
}

#[inline]
fn bucket_timestamp(&self, v: i64) -> i32 {
self.bucket_n(Self::hash_timestamp(v))
}

#[inline]
fn bucket_str(&self, v: &str) -> i32 {
self.bucket_n(Self::hash_str(v))
}

#[inline]
fn bucket_bytes(&self, v: &[u8]) -> i32 {
self.bucket_n(Self::hash_bytes(v))
}
}

impl TransformFunction for Bucket {
Expand All @@ -91,84 +142,127 @@ impl TransformFunction for Bucket {
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.unwrap()
.unary(|v| self.bucket_n(Self::hash_int(v))),
.unary(|v| self.bucket_int(v)),
DataType::Int64 => input
.as_any()
.downcast_ref::<arrow_array::Int64Array>()
.unwrap()
.unary(|v| self.bucket_n(Self::hash_long(v))),
.unary(|v| self.bucket_long(v)),
DataType::Decimal128(_, _) => input
.as_any()
.downcast_ref::<arrow_array::Decimal128Array>()
.unwrap()
.unary(|v| self.bucket_n(Self::hash_decimal(v))),
.unary(|v| self.bucket_decimal(v)),
DataType::Date32 => input
.as_any()
.downcast_ref::<arrow_array::Date32Array>()
.unwrap()
.unary(|v| self.bucket_n(Self::hash_date(v))),
.unary(|v| self.bucket_date(v)),
DataType::Time64(TimeUnit::Microsecond) => input
.as_any()
.downcast_ref::<arrow_array::Time64MicrosecondArray>()
.unwrap()
.unary(|v| self.bucket_n(Self::hash_time(v))),
.unary(|v| self.bucket_time(v)),
DataType::Timestamp(TimeUnit::Microsecond, _) => input
.as_any()
.downcast_ref::<arrow_array::TimestampMicrosecondArray>()
.unwrap()
.unary(|v| self.bucket_n(Self::hash_timestamp(v))),
.unary(|v| self.bucket_timestamp(v)),
DataType::Utf8 => arrow_array::Int32Array::from_iter(
input
.as_any()
.downcast_ref::<arrow_array::StringArray>()
.unwrap()
.iter()
.map(|v| self.bucket_n(Self::hash_str(v.unwrap()))),
.map(|v| v.map(|v| self.bucket_str(v))),
),
DataType::LargeUtf8 => arrow_array::Int32Array::from_iter(
input
.as_any()
.downcast_ref::<arrow_array::LargeStringArray>()
.unwrap()
.iter()
.map(|v| self.bucket_n(Self::hash_str(v.unwrap()))),
.map(|v| v.map(|v| self.bucket_str(v))),
),
DataType::Binary => arrow_array::Int32Array::from_iter(
input
.as_any()
.downcast_ref::<arrow_array::BinaryArray>()
.unwrap()
.iter()
.map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))),
.map(|v| v.map(|v| self.bucket_bytes(v))),
),
DataType::LargeBinary => arrow_array::Int32Array::from_iter(
input
.as_any()
.downcast_ref::<arrow_array::LargeBinaryArray>()
.unwrap()
.iter()
.map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))),
.map(|v| v.map(|v| self.bucket_bytes(v))),
),
DataType::FixedSizeBinary(_) => arrow_array::Int32Array::from_iter(
input
.as_any()
.downcast_ref::<arrow_array::FixedSizeBinaryArray>()
.unwrap()
.iter()
.map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))),
.map(|v| v.map(|v| self.bucket_bytes(v))),
),
_ => unreachable!("Unsupported data type: {:?}", input.data_type()),
};
Ok(Arc::new(res))
}

fn transform_literal(&self, input: &Literal) -> crate::Result<Option<Literal>> {
match input {
Literal::Primitive(PrimitiveLiteral::Int(v)) => Ok(Some(Literal::Primitive(
PrimitiveLiteral::Int(self.bucket_int(*v)),
))),
Literal::Primitive(PrimitiveLiteral::Long(v)) => Ok(Some(Literal::Primitive(
PrimitiveLiteral::Int(self.bucket_long(*v)),
))),
Literal::Primitive(PrimitiveLiteral::Decimal(v)) => Ok(Some(Literal::Primitive(
PrimitiveLiteral::Int(self.bucket_decimal(*v)),
))),
Literal::Primitive(PrimitiveLiteral::Date(v)) => Ok(Some(Literal::Primitive(
PrimitiveLiteral::Int(self.bucket_date(*v)),
))),
Literal::Primitive(PrimitiveLiteral::Time(v)) => Ok(Some(Literal::Primitive(
PrimitiveLiteral::Int(self.bucket_time(*v)),
))),
Literal::Primitive(PrimitiveLiteral::Timestamp(v)) => Ok(Some(Literal::Primitive(
PrimitiveLiteral::Int(self.bucket_timestamp(*v)),
))),
Literal::Primitive(PrimitiveLiteral::String(v)) => Ok(Some(Literal::Primitive(
PrimitiveLiteral::Int(self.bucket_str(v)),
))),
Literal::Primitive(PrimitiveLiteral::UUID(v)) => Ok(Some(Literal::Primitive(
PrimitiveLiteral::Int(self.bucket_bytes(v.as_ref())),
))),
Literal::Primitive(PrimitiveLiteral::Binary(v)) => Ok(Some(Literal::Primitive(
PrimitiveLiteral::Int(self.bucket_bytes(v.as_ref())),
))),
Literal::Primitive(PrimitiveLiteral::Fixed(v)) => Ok(Some(Literal::Primitive(
PrimitiveLiteral::Int(self.bucket_bytes(v.as_ref())),
))),
_ => unreachable!("Unsupported literal: {:?}", input),
}
}
}

#[cfg(test)]
mod test {
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime};

use crate::transform::TransformFunction;

use super::Bucket;
#[test]
fn t() {
let bucket = Bucket::new(10);
println!("{}", bucket.bucket_n(-653330422));
}
#[test]
fn test_hash() {
// test int
assert_eq!(Bucket::hash_int(34), 2017239379);
Expand Down Expand Up @@ -242,4 +336,146 @@ mod test {
-188683207
);
}

#[test]
fn test_int_literal() {
let bucket = Bucket::new(10);
assert_eq!(
bucket
.transform_literal(&crate::spec::Literal::Primitive(
crate::spec::PrimitiveLiteral::Int(34)
))
.unwrap()
.unwrap(),
crate::spec::Literal::Primitive(crate::spec::PrimitiveLiteral::Int(9))
);
}

#[test]
fn test_long_literal() {
let bucket = Bucket::new(10);
assert_eq!(
bucket
.transform_literal(&crate::spec::Literal::Primitive(
crate::spec::PrimitiveLiteral::Long(34)
))
.unwrap()
.unwrap(),
crate::spec::Literal::Primitive(crate::spec::PrimitiveLiteral::Int(9))
);
}

#[test]
fn test_decimal_literal() {
let bucket = Bucket::new(10);
assert_eq!(
bucket
.transform_literal(&crate::spec::Literal::Primitive(
crate::spec::PrimitiveLiteral::Decimal(1420)
))
.unwrap()
.unwrap(),
crate::spec::Literal::Primitive(crate::spec::PrimitiveLiteral::Int(9))
);
}

#[test]
fn test_date_literal() {
let bucket = Bucket::new(100);
assert_eq!(
bucket
.transform_literal(&crate::spec::Literal::Primitive(
crate::spec::PrimitiveLiteral::Date(17486)
))
.unwrap()
.unwrap(),
crate::spec::Literal::Primitive(crate::spec::PrimitiveLiteral::Int(26))
);
}

#[test]
fn test_time_literal() {
let bucket = Bucket::new(100);
assert_eq!(
bucket
.transform_literal(&crate::spec::Literal::Primitive(
crate::spec::PrimitiveLiteral::Time(81068000000)
))
.unwrap()
.unwrap(),
crate::spec::Literal::Primitive(crate::spec::PrimitiveLiteral::Int(59))
);
}

#[test]
fn test_timestamp_literal() {
let bucket = Bucket::new(100);
assert_eq!(
bucket
.transform_literal(&crate::spec::Literal::Primitive(
crate::spec::PrimitiveLiteral::Timestamp(1510871468000000)
))
.unwrap()
.unwrap(),
crate::spec::Literal::Primitive(crate::spec::PrimitiveLiteral::Int(7))
);
}

#[test]
fn test_str_literal() {
let bucket = Bucket::new(100);
assert_eq!(
bucket
.transform_literal(&crate::spec::Literal::Primitive(
crate::spec::PrimitiveLiteral::String("iceberg".to_string())
))
.unwrap()
.unwrap(),
crate::spec::Literal::Primitive(crate::spec::PrimitiveLiteral::Int(89))
);
}

#[test]
fn test_uuid_literal() {
let bucket = Bucket::new(100);
assert_eq!(
bucket
.transform_literal(&crate::spec::Literal::Primitive(
crate::spec::PrimitiveLiteral::UUID(
"F79C3E09-677C-4BBD-A479-3F349CB785E7".parse().unwrap()
)
))
.unwrap()
.unwrap(),
crate::spec::Literal::Primitive(crate::spec::PrimitiveLiteral::Int(40))
);
}

#[test]
fn test_binary_literal() {
let bucket = Bucket::new(128);
assert_eq!(
bucket
.transform_literal(&crate::spec::Literal::Primitive(
crate::spec::PrimitiveLiteral::Binary(b"\x00\x01\x02\x03".to_vec())
))
.unwrap()
.unwrap(),
crate::spec::Literal::Primitive(crate::spec::PrimitiveLiteral::Int(57))
);
}

#[test]
fn test_fixed_literal() {
let bucket = Bucket::new(128);
assert_eq!(
bucket
.transform_literal(&crate::spec::Literal::Primitive(
crate::spec::PrimitiveLiteral::Fixed(b"foo".to_vec())
))
.unwrap()
.unwrap(),
crate::spec::Literal::Primitive(crate::spec::PrimitiveLiteral::Int(32))
);
}
}
Loading

0 comments on commit 48c744b

Please sign in to comment.