diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index f31d64779..6f62f2902 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -673,6 +673,16 @@ impl Datum { )), } } + + /// Get the primitive literal from datum. + pub fn literal(&self) -> &PrimitiveLiteral { + &self.literal + } + + /// Get the primitive type from datum. + pub fn data_type(&self) -> &PrimitiveType { + &self.r#type + } } /// Values present in iceberg type diff --git a/crates/iceberg/src/transform/bucket.rs b/crates/iceberg/src/transform/bucket.rs index beff0be96..015aceaf4 100644 --- a/crates/iceberg/src/transform/bucket.rs +++ b/crates/iceberg/src/transform/bucket.rs @@ -20,6 +20,8 @@ use std::sync::Arc; use arrow_array::ArrayRef; use arrow_schema::{DataType, TimeUnit}; +use crate::spec::{Datum, PrimitiveLiteral}; + use super::TransformFunction; #[derive(Debug)] @@ -35,39 +37,47 @@ impl Bucket { impl Bucket { /// When switch the hash function, we only need to change this function. + #[inline] fn hash_bytes(mut v: &[u8]) -> i32 { murmur3::murmur3_32(&mut v, 0).unwrap() as i32 } + #[inline] fn hash_int(v: i32) -> i32 { Self::hash_long(v as i64) } + #[inline] fn hash_long(v: i64) -> i32 { Self::hash_bytes(v.to_le_bytes().as_slice()) } /// v is days from unix epoch + #[inline] fn hash_date(v: i32) -> i32 { Self::hash_int(v) } /// v is microseconds from midnight + #[inline] fn hash_time(v: i64) -> i32 { Self::hash_long(v) } /// v is microseconds from unix epoch + #[inline] fn hash_timestamp(v: i64) -> i32 { Self::hash_long(v) } + #[inline] fn hash_str(s: &str) -> i32 { Self::hash_bytes(s.as_bytes()) } /// Decimal values are hashed using the minimum number of bytes required to hold the unscaled value as a two’s complement big-endian /// ref: https://iceberg.apache.org/spec/#appendix-b-32-bit-hash-requirements + #[inline] fn hash_decimal(v: i128) -> i32 { let bytes = v.to_be_bytes(); if let Some(start) = bytes.iter().position(|&x| x != 0) { @@ -79,9 +89,50 @@ impl Bucket { /// def bucket_N(x) = (murmur3_x86_32_hash(x) & Integer.MAX_VALUE) % N /// ref: https://iceberg.apache.org/spec/#partitioning + #[inline] fn bucket_n(&self, v: i32) -> i32 { (v & i32::MAX) % (self.mod_n as i32) } + + #[inline] + fn bucket_int(&self, v: i32) -> i32 { + self.bucket_n(Self::hash_int(v)) + } + + #[inline] + fn bucket_long(&self, v: i64) -> i32 { + self.bucket_n(Self::hash_long(v)) + } + + #[inline] + fn bucket_decimal(&self, v: i128) -> i32 { + self.bucket_n(Self::hash_decimal(v)) + } + + #[inline] + fn bucket_date(&self, v: i32) -> i32 { + self.bucket_n(Self::hash_date(v)) + } + + #[inline] + fn bucket_time(&self, v: i64) -> i32 { + self.bucket_n(Self::hash_time(v)) + } + + #[inline] + fn bucket_timestamp(&self, v: i64) -> i32 { + self.bucket_n(Self::hash_timestamp(v)) + } + + #[inline] + fn bucket_str(&self, v: &str) -> i32 { + self.bucket_n(Self::hash_str(v)) + } + + #[inline] + fn bucket_bytes(&self, v: &[u8]) -> i32 { + self.bucket_n(Self::hash_bytes(v)) + } } impl TransformFunction for Bucket { @@ -91,39 +142,39 @@ impl TransformFunction for Bucket { .as_any() .downcast_ref::() .unwrap() - .unary(|v| self.bucket_n(Self::hash_int(v))), + .unary(|v| self.bucket_int(v)), DataType::Int64 => input .as_any() .downcast_ref::() .unwrap() - .unary(|v| self.bucket_n(Self::hash_long(v))), + .unary(|v| self.bucket_long(v)), DataType::Decimal128(_, _) => input .as_any() .downcast_ref::() .unwrap() - .unary(|v| self.bucket_n(Self::hash_decimal(v))), + .unary(|v| self.bucket_decimal(v)), DataType::Date32 => input .as_any() .downcast_ref::() .unwrap() - .unary(|v| self.bucket_n(Self::hash_date(v))), + .unary(|v| self.bucket_date(v)), DataType::Time64(TimeUnit::Microsecond) => input .as_any() .downcast_ref::() .unwrap() - .unary(|v| self.bucket_n(Self::hash_time(v))), + .unary(|v| self.bucket_time(v)), DataType::Timestamp(TimeUnit::Microsecond, _) => input .as_any() .downcast_ref::() .unwrap() - .unary(|v| self.bucket_n(Self::hash_timestamp(v))), + .unary(|v| self.bucket_timestamp(v)), DataType::Utf8 => arrow_array::Int32Array::from_iter( input .as_any() .downcast_ref::() .unwrap() .iter() - .map(|v| self.bucket_n(Self::hash_str(v.unwrap()))), + .map(|v| v.map(|v| self.bucket_str(v))), ), DataType::LargeUtf8 => arrow_array::Int32Array::from_iter( input @@ -131,7 +182,7 @@ impl TransformFunction for Bucket { .downcast_ref::() .unwrap() .iter() - .map(|v| self.bucket_n(Self::hash_str(v.unwrap()))), + .map(|v| v.map(|v| self.bucket_str(v))), ), DataType::Binary => arrow_array::Int32Array::from_iter( input @@ -139,7 +190,7 @@ impl TransformFunction for Bucket { .downcast_ref::() .unwrap() .iter() - .map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))), + .map(|v| v.map(|v| self.bucket_bytes(v))), ), DataType::LargeBinary => arrow_array::Int32Array::from_iter( input @@ -147,7 +198,7 @@ impl TransformFunction for Bucket { .downcast_ref::() .unwrap() .iter() - .map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))), + .map(|v| v.map(|v| self.bucket_bytes(v))), ), DataType::FixedSizeBinary(_) => arrow_array::Int32Array::from_iter( input @@ -155,18 +206,53 @@ impl TransformFunction for Bucket { .downcast_ref::() .unwrap() .iter() - .map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))), + .map(|v| v.map(|v| self.bucket_bytes(v))), ), - _ => unreachable!("Unsupported data type: {:?}", input.data_type()), + _ => { + return Err(crate::Error::new( + crate::ErrorKind::FeatureUnsupported, + format!( + "Unsupported data type for bucket transform: {:?}", + input.data_type() + ), + )) + } }; Ok(Arc::new(res)) } + + fn transform_literal(&self, input: &Datum) -> crate::Result> { + let val = match input.literal() { + PrimitiveLiteral::Int(v) => self.bucket_int(*v), + PrimitiveLiteral::Long(v) => self.bucket_long(*v), + PrimitiveLiteral::Decimal(v) => self.bucket_decimal(*v), + PrimitiveLiteral::Date(v) => self.bucket_date(*v), + PrimitiveLiteral::Time(v) => self.bucket_time(*v), + PrimitiveLiteral::Timestamp(v) => self.bucket_timestamp(*v), + PrimitiveLiteral::String(v) => self.bucket_str(v.as_str()), + PrimitiveLiteral::UUID(v) => self.bucket_bytes(v.as_ref()), + PrimitiveLiteral::Binary(v) => self.bucket_bytes(v.as_ref()), + PrimitiveLiteral::Fixed(v) => self.bucket_bytes(v.as_ref()), + _ => { + return Err(crate::Error::new( + crate::ErrorKind::FeatureUnsupported, + format!( + "Unsupported data type for bucket transform: {:?}", + input.data_type() + ), + )) + } + }; + Ok(Some(Datum::int(val))) + } } #[cfg(test)] mod test { use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime}; + use crate::{spec::Datum, transform::TransformFunction}; + use super::Bucket; #[test] fn test_hash() { @@ -242,4 +328,120 @@ mod test { -188683207 ); } + + #[test] + fn test_int_literal() { + let bucket = Bucket::new(10); + assert_eq!( + bucket.transform_literal(&Datum::int(34)).unwrap().unwrap(), + Datum::int(9) + ); + } + + #[test] + fn test_long_literal() { + let bucket = Bucket::new(10); + assert_eq!( + bucket.transform_literal(&Datum::long(34)).unwrap().unwrap(), + Datum::int(9) + ); + } + + #[test] + fn test_decimal_literal() { + let bucket = Bucket::new(10); + assert_eq!( + bucket + .transform_literal(&Datum::decimal(1420).unwrap()) + .unwrap() + .unwrap(), + Datum::int(9) + ); + } + + #[test] + fn test_date_literal() { + let bucket = Bucket::new(100); + assert_eq!( + bucket + .transform_literal(&Datum::date(17486)) + .unwrap() + .unwrap(), + Datum::int(26) + ); + } + + #[test] + fn test_time_literal() { + let bucket = Bucket::new(100); + assert_eq!( + bucket + .transform_literal(&Datum::time_micros(81068000000).unwrap()) + .unwrap() + .unwrap(), + Datum::int(59) + ); + } + + #[test] + fn test_timestamp_literal() { + let bucket = Bucket::new(100); + assert_eq!( + bucket + .transform_literal(&Datum::timestamp_micros(1510871468000000)) + .unwrap() + .unwrap(), + Datum::int(7) + ); + } + + #[test] + fn test_str_literal() { + let bucket = Bucket::new(100); + assert_eq!( + bucket + .transform_literal(&Datum::string("iceberg")) + .unwrap() + .unwrap(), + Datum::int(89) + ); + } + + #[test] + fn test_uuid_literal() { + let bucket = Bucket::new(100); + assert_eq!( + bucket + .transform_literal(&Datum::uuid( + "F79C3E09-677C-4BBD-A479-3F349CB785E7".parse().unwrap() + )) + .unwrap() + .unwrap(), + Datum::int(40) + ); + } + + #[test] + fn test_binary_literal() { + let bucket = Bucket::new(128); + assert_eq!( + bucket + .transform_literal(&Datum::binary(b"\x00\x01\x02\x03".to_vec())) + .unwrap() + .unwrap(), + Datum::int(57) + ); + } + + #[test] + fn test_fixed_literal() { + let bucket = Bucket::new(128); + assert_eq!( + bucket + .transform_literal(&Datum::fixed(b"foo".to_vec())) + .unwrap() + .unwrap(), + Datum::int(32) + ); + } } diff --git a/crates/iceberg/src/transform/identity.rs b/crates/iceberg/src/transform/identity.rs index d22c28fde..49ab612aa 100644 --- a/crates/iceberg/src/transform/identity.rs +++ b/crates/iceberg/src/transform/identity.rs @@ -28,4 +28,8 @@ impl TransformFunction for Identity { fn transform(&self, input: ArrayRef) -> Result { Ok(input) } + + fn transform_literal(&self, input: &crate::spec::Datum) -> Result> { + Ok(Some(input.clone())) + } } diff --git a/crates/iceberg/src/transform/mod.rs b/crates/iceberg/src/transform/mod.rs index dead9db89..7effdbec3 100644 --- a/crates/iceberg/src/transform/mod.rs +++ b/crates/iceberg/src/transform/mod.rs @@ -16,7 +16,10 @@ // under the License. //! Transform function used to compute partition values. -use crate::{spec::Transform, Result}; +use crate::{ + spec::{Datum, Transform}, + Result, +}; use arrow_array::ArrayRef; mod bucket; @@ -31,6 +34,8 @@ pub trait TransformFunction: Send { /// The implementation of this function will need to check and downcast the input to specific /// type. fn transform(&self, input: ArrayRef) -> Result; + /// transform_literal will take an input literal and transform it into a new literal. + fn transform_literal(&self, input: &Datum) -> Result>; } /// BoxedTransformFunction is a boxed trait object of TransformFunction. diff --git a/crates/iceberg/src/transform/temporal.rs b/crates/iceberg/src/transform/temporal.rs index 4556543ae..2a79db300 100644 --- a/crates/iceberg/src/transform/temporal.rs +++ b/crates/iceberg/src/transform/temporal.rs @@ -16,6 +16,7 @@ // under the License. use super::TransformFunction; +use crate::spec::{Datum, PrimitiveLiteral}; use crate::{Error, ErrorKind, Result}; use arrow_arith::temporal::DatePart; use arrow_arith::{arity::binary, temporal::date_part}; @@ -23,11 +24,9 @@ use arrow_array::{ types::Date32Type, Array, ArrayRef, Date32Array, Int32Array, TimestampMicrosecondArray, }; use arrow_schema::{DataType, TimeUnit}; -use chrono::Datelike; +use chrono::{DateTime, Datelike}; use std::sync::Arc; -/// The number of days since unix epoch. -const DAY_SINCE_UNIX_EPOCH: i32 = 719163; /// Hour in one second. const HOUR_PER_SECOND: f64 = 1.0_f64 / 3600.0_f64; /// Day in one second. @@ -39,6 +38,21 @@ const UNIX_EPOCH_YEAR: i32 = 1970; #[derive(Debug)] pub struct Year; +impl Year { + #[inline] + fn timestamp_to_year(timestamp: i64) -> Result { + Ok(DateTime::from_timestamp_micros(timestamp) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Fail to convert timestamp to date in year transform", + ) + })? + .year() + - UNIX_EPOCH_YEAR) + } +} + impl TransformFunction for Year { fn transform(&self, input: ArrayRef) -> Result { let array = date_part(&input, DatePart::Year) @@ -51,12 +65,54 @@ impl TransformFunction for Year { .unary(|v| v - UNIX_EPOCH_YEAR), )) } + + fn transform_literal(&self, input: &crate::spec::Datum) -> Result> { + let val = match input.literal() { + PrimitiveLiteral::Date(v) => Date32Type::to_naive_date(*v).year() - UNIX_EPOCH_YEAR, + PrimitiveLiteral::Timestamp(v) => Self::timestamp_to_year(*v)?, + PrimitiveLiteral::TimestampTZ(v) => Self::timestamp_to_year(*v)?, + _ => { + return Err(crate::Error::new( + crate::ErrorKind::FeatureUnsupported, + format!( + "Unsupported data type for year transform: {:?}", + input.data_type() + ), + )) + } + }; + Ok(Some(Datum::int(val))) + } } /// Extract a date or timestamp month, as months from 1970-01-01 #[derive(Debug)] pub struct Month; +impl Month { + #[inline] + fn timestamp_to_month(timestamp: i64) -> Result { + // date: aaaa-aa-aa + // unix epoch date: 1970-01-01 + // if date > unix epoch date, delta month = (aa - 1) + 12 * (aaaa-1970) + // if date < unix epoch date, delta month = (12 - (aa - 1)) + 12 * (1970-aaaa-1) + let date = DateTime::from_timestamp_micros(timestamp).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Fail to convert timestamp to date in month transform", + ) + })?; + let unix_epoch_date = DateTime::from_timestamp_micros(0) + .expect("0 timestamp from unix epoch should be valid"); + if date > unix_epoch_date { + Ok((date.month0() as i32) + 12 * (date.year() - UNIX_EPOCH_YEAR)) + } else { + let delta = (12 - date.month0() as i32) + 12 * (UNIX_EPOCH_YEAR - date.year() - 1); + Ok(-delta) + } + } +} + impl TransformFunction for Month { fn transform(&self, input: ArrayRef) -> Result { let year_array = date_part(&input, DatePart::Year) @@ -78,12 +134,40 @@ impl TransformFunction for Month { .unwrap(), )) } + + fn transform_literal(&self, input: &crate::spec::Datum) -> Result> { + let val = match input.literal() { + PrimitiveLiteral::Date(v) => { + (Date32Type::to_naive_date(*v).year() - UNIX_EPOCH_YEAR) * 12 + + Date32Type::to_naive_date(*v).month0() as i32 + } + PrimitiveLiteral::Timestamp(v) => Self::timestamp_to_month(*v)?, + PrimitiveLiteral::TimestampTZ(v) => Self::timestamp_to_month(*v)?, + _ => { + return Err(crate::Error::new( + crate::ErrorKind::FeatureUnsupported, + format!( + "Unsupported data type for month transform: {:?}", + input.data_type() + ), + )) + } + }; + Ok(Some(Datum::int(val))) + } } /// Extract a date or timestamp day, as days from 1970-01-01 #[derive(Debug)] pub struct Day; +impl Day { + #[inline] + fn day_timestamp_micro(v: i64) -> i32 { + (v as f64 / 1000.0 / 1000.0 * DAY_PER_SECOND) as i32 + } +} + impl TransformFunction for Day { fn transform(&self, input: ArrayRef) -> Result { let res: Int32Array = match input.data_type() { @@ -91,16 +175,12 @@ impl TransformFunction for Day { .as_any() .downcast_ref::() .unwrap() - .unary(|v| -> i32 { (v as f64 / 1000.0 / 1000.0 * DAY_PER_SECOND) as i32 }), - DataType::Date32 => { - input - .as_any() - .downcast_ref::() - .unwrap() - .unary(|v| -> i32 { - Date32Type::to_naive_date(v).num_days_from_ce() - DAY_SINCE_UNIX_EPOCH - }) - } + .unary(|v| -> i32 { Self::day_timestamp_micro(v) }), + DataType::Date32 => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| -> i32 { v }), _ => { return Err(Error::new( ErrorKind::Unexpected, @@ -113,12 +193,37 @@ impl TransformFunction for Day { }; Ok(Arc::new(res)) } + + fn transform_literal(&self, input: &crate::spec::Datum) -> Result> { + let val = match input.literal() { + PrimitiveLiteral::Date(v) => *v, + PrimitiveLiteral::Timestamp(v) => Self::day_timestamp_micro(*v), + PrimitiveLiteral::TimestampTZ(v) => Self::day_timestamp_micro(*v), + _ => { + return Err(crate::Error::new( + crate::ErrorKind::FeatureUnsupported, + format!( + "Unsupported data type for day transform: {:?}", + input.data_type() + ), + )) + } + }; + Ok(Some(Datum::int(val))) + } } /// Extract a timestamp hour, as hours from 1970-01-01 00:00:00 #[derive(Debug)] pub struct Hour; +impl Hour { + #[inline] + fn hour_timestamp_micro(v: i64) -> i32 { + (v as f64 / 1000.0 / 1000.0 * HOUR_PER_SECOND) as i32 + } +} + impl TransformFunction for Hour { fn transform(&self, input: ArrayRef) -> Result { let res: Int32Array = match input.data_type() { @@ -126,19 +231,36 @@ impl TransformFunction for Hour { .as_any() .downcast_ref::() .unwrap() - .unary(|v| -> i32 { (v as f64 * HOUR_PER_SECOND / 1000.0 / 1000.0) as i32 }), + .unary(|v| -> i32 { Self::hour_timestamp_micro(v) }), _ => { - return Err(Error::new( - ErrorKind::Unexpected, + return Err(crate::Error::new( + crate::ErrorKind::FeatureUnsupported, format!( - "Should not call internally for unsupported data type {:?}", + "Unsupported data type for hour transform: {:?}", input.data_type() ), - )) + )); } }; Ok(Arc::new(res)) } + + fn transform_literal(&self, input: &crate::spec::Datum) -> Result> { + let val = match input.literal() { + PrimitiveLiteral::Timestamp(v) => Self::hour_timestamp_micro(*v), + PrimitiveLiteral::TimestampTZ(v) => Self::hour_timestamp_micro(*v), + _ => { + return Err(crate::Error::new( + crate::ErrorKind::FeatureUnsupported, + format!( + "Unsupported data type for hour transform: {:?}", + input.data_type() + ), + )) + } + }; + Ok(Some(Datum::int(val))) + } } #[cfg(test)] @@ -147,7 +269,10 @@ mod test { use chrono::{NaiveDate, NaiveDateTime}; use std::sync::Arc; - use crate::transform::TransformFunction; + use crate::{ + spec::Datum, + transform::{BoxedTransformFunction, TransformFunction}, + }; #[test] fn test_transform_years() { @@ -159,6 +284,7 @@ mod test { NaiveDate::from_ymd_opt(2000, 1, 1).unwrap(), NaiveDate::from_ymd_opt(2030, 1, 1).unwrap(), NaiveDate::from_ymd_opt(2060, 1, 1).unwrap(), + NaiveDate::from_ymd_opt(1969, 1, 1).unwrap(), ]; let date_array: ArrayRef = Arc::new(Date32Array::from( ori_date @@ -171,11 +297,12 @@ mod test { )); let res = year.transform(date_array).unwrap(); let res = res.as_any().downcast_ref::().unwrap(); - assert_eq!(res.len(), 4); + assert_eq!(res.len(), 5); assert_eq!(res.value(0), 0); assert_eq!(res.value(1), 30); assert_eq!(res.value(2), 60); assert_eq!(res.value(3), 90); + assert_eq!(res.value(4), -1); // Test TimestampMicrosecond let ori_timestamp = vec![ @@ -187,6 +314,8 @@ mod test { .unwrap(), NaiveDateTime::parse_from_str("2060-01-01 11:30:42.123", "%Y-%m-%d %H:%M:%S.%f") .unwrap(), + NaiveDateTime::parse_from_str("1969-01-01 00:00:00.00", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), ]; let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from( ori_timestamp @@ -207,11 +336,71 @@ mod test { )); let res = year.transform(date_array).unwrap(); let res = res.as_any().downcast_ref::().unwrap(); - assert_eq!(res.len(), 4); + assert_eq!(res.len(), 5); assert_eq!(res.value(0), 0); assert_eq!(res.value(1), 30); assert_eq!(res.value(2), 60); assert_eq!(res.value(3), 90); + assert_eq!(res.value(4), -1); + } + + fn test_timestamp_and_tz_transform( + time: &str, + transform: &BoxedTransformFunction, + expect: Datum, + ) { + let timestamp = Datum::timestamp_micros( + NaiveDateTime::parse_from_str(time, "%Y-%m-%d %H:%M:%S.%f") + .unwrap() + .and_utc() + .timestamp_micros(), + ); + let timestamp_tz = Datum::timestamptz_micros( + NaiveDateTime::parse_from_str(time, "%Y-%m-%d %H:%M:%S.%f") + .unwrap() + .and_utc() + .timestamp_micros(), + ); + let res = transform.transform_literal(×tamp).unwrap().unwrap(); + assert_eq!(res, expect); + let res = transform.transform_literal(×tamp_tz).unwrap().unwrap(); + assert_eq!(res, expect); + } + + fn test_timestamp_and_tz_transform_using_i64( + time: i64, + transform: &BoxedTransformFunction, + expect: Datum, + ) { + let timestamp = Datum::timestamp_micros(time); + let timestamp_tz = Datum::timestamptz_micros(time); + let res = transform.transform_literal(×tamp).unwrap().unwrap(); + assert_eq!(res, expect); + let res = transform.transform_literal(×tamp_tz).unwrap().unwrap(); + assert_eq!(res, expect); + } + + fn test_date(date: i32, transform: &BoxedTransformFunction, expect: Datum) { + let date = Datum::date(date); + let res = transform.transform_literal(&date).unwrap().unwrap(); + assert_eq!(res, expect); + } + + #[test] + fn test_transform_year_literal() { + let year = Box::new(super::Year) as BoxedTransformFunction; + + // Test Date32 + test_date(18628, &year, Datum::int(2021 - super::UNIX_EPOCH_YEAR)); + test_date(-365, &year, Datum::int(-1)); + + // Test TimestampMicrosecond + test_timestamp_and_tz_transform_using_i64( + 186280000000, + &year, + Datum::int(1970 - super::UNIX_EPOCH_YEAR), + ); + test_timestamp_and_tz_transform("1969-01-01 00:00:00.00", &year, Datum::int(-1)); } #[test] @@ -224,6 +413,7 @@ mod test { NaiveDate::from_ymd_opt(2000, 4, 1).unwrap(), NaiveDate::from_ymd_opt(2030, 7, 1).unwrap(), NaiveDate::from_ymd_opt(2060, 10, 1).unwrap(), + NaiveDate::from_ymd_opt(1969, 12, 1).unwrap(), ]; let date_array: ArrayRef = Arc::new(Date32Array::from( ori_date @@ -236,11 +426,12 @@ mod test { )); let res = month.transform(date_array).unwrap(); let res = res.as_any().downcast_ref::().unwrap(); - assert_eq!(res.len(), 4); + assert_eq!(res.len(), 5); assert_eq!(res.value(0), 0); assert_eq!(res.value(1), 30 * 12 + 3); assert_eq!(res.value(2), 60 * 12 + 6); assert_eq!(res.value(3), 90 * 12 + 9); + assert_eq!(res.value(4), -1); // Test TimestampMicrosecond let ori_timestamp = vec![ @@ -252,6 +443,8 @@ mod test { .unwrap(), NaiveDateTime::parse_from_str("2060-10-01 11:30:42.123", "%Y-%m-%d %H:%M:%S.%f") .unwrap(), + NaiveDateTime::parse_from_str("1969-12-01 00:00:00.00", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), ]; let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from( ori_timestamp @@ -272,11 +465,36 @@ mod test { )); let res = month.transform(date_array).unwrap(); let res = res.as_any().downcast_ref::().unwrap(); - assert_eq!(res.len(), 4); + assert_eq!(res.len(), 5); assert_eq!(res.value(0), 0); assert_eq!(res.value(1), 30 * 12 + 3); assert_eq!(res.value(2), 60 * 12 + 6); assert_eq!(res.value(3), 90 * 12 + 9); + assert_eq!(res.value(4), -1); + } + + #[test] + fn test_transform_month_literal() { + let month = Box::new(super::Month) as BoxedTransformFunction; + + // Test Date32 + test_date( + 18628, + &month, + Datum::int((2021 - super::UNIX_EPOCH_YEAR) * 12), + ); + test_date(-31, &month, Datum::int(-1)); + + // Test TimestampMicrosecond + test_timestamp_and_tz_transform_using_i64( + 186280000000, + &month, + Datum::int((1970 - super::UNIX_EPOCH_YEAR) * 12), + ); + test_timestamp_and_tz_transform("1969-12-01 23:00:00.00", &month, Datum::int(-1)); + test_timestamp_and_tz_transform("2017-12-01 00:00:00.00", &month, Datum::int(575)); + test_timestamp_and_tz_transform("1970-01-01 00:00:00.00", &month, Datum::int(0)); + test_timestamp_and_tz_transform("1969-12-31 00:00:00.00", &month, Datum::int(-1)); } #[test] @@ -287,6 +505,7 @@ mod test { NaiveDate::from_ymd_opt(2000, 4, 1).unwrap(), NaiveDate::from_ymd_opt(2030, 7, 1).unwrap(), NaiveDate::from_ymd_opt(2060, 10, 1).unwrap(), + NaiveDate::from_ymd_opt(1969, 12, 31).unwrap(), ]; let expect_day = ori_date .clone() @@ -309,11 +528,12 @@ mod test { )); let res = day.transform(date_array).unwrap(); let res = res.as_any().downcast_ref::().unwrap(); - assert_eq!(res.len(), 4); + assert_eq!(res.len(), 5); assert_eq!(res.value(0), expect_day[0]); assert_eq!(res.value(1), expect_day[1]); assert_eq!(res.value(2), expect_day[2]); assert_eq!(res.value(3), expect_day[3]); + assert_eq!(res.value(4), -1); // Test TimestampMicrosecond let ori_timestamp = vec![ @@ -325,6 +545,8 @@ mod test { .unwrap(), NaiveDateTime::parse_from_str("2060-10-01 11:30:42.123", "%Y-%m-%d %H:%M:%S.%f") .unwrap(), + NaiveDateTime::parse_from_str("1969-12-31 00:00:00.00", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), ]; let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from( ori_timestamp @@ -345,11 +567,25 @@ mod test { )); let res = day.transform(date_array).unwrap(); let res = res.as_any().downcast_ref::().unwrap(); - assert_eq!(res.len(), 4); + assert_eq!(res.len(), 5); assert_eq!(res.value(0), expect_day[0]); assert_eq!(res.value(1), expect_day[1]); assert_eq!(res.value(2), expect_day[2]); assert_eq!(res.value(3), expect_day[3]); + assert_eq!(res.value(4), -1); + } + + #[test] + fn test_transform_days_literal() { + let day = Box::new(super::Day) as BoxedTransformFunction; + // Test Date32 + test_date(18628, &day, Datum::int(18628)); + test_date(-31, &day, Datum::int(-31)); + + // Test TimestampMicrosecond + test_timestamp_and_tz_transform_using_i64(1512151975038194, &day, Datum::int(17501)); + test_timestamp_and_tz_transform_using_i64(-115200000000, &day, Datum::int(-1)); + test_timestamp_and_tz_transform("2017-12-01 10:30:42.123", &day, Datum::int(17501)); } #[test] @@ -364,6 +600,8 @@ mod test { .unwrap(), NaiveDateTime::parse_from_str("2060-09-01 05:03:23.123", "%Y-%m-%d %H:%M:%S.%f") .unwrap(), + NaiveDateTime::parse_from_str("1969-12-31 23:00:00.00", "%Y-%m-%d %H:%M:%S.%f") + .unwrap(), ]; let expect_hour = ori_timestamp .clone() @@ -401,10 +639,20 @@ mod test { )); let res = hour.transform(date_array).unwrap(); let res = res.as_any().downcast_ref::().unwrap(); - assert_eq!(res.len(), 4); + assert_eq!(res.len(), 5); assert_eq!(res.value(0), expect_hour[0]); assert_eq!(res.value(1), expect_hour[1]); assert_eq!(res.value(2), expect_hour[2]); assert_eq!(res.value(3), expect_hour[3]); + assert_eq!(res.value(4), -1); + } + + #[test] + fn test_transform_hours_literal() { + let hour = Box::new(super::Hour) as BoxedTransformFunction; + + // Test TimestampMicrosecond + test_timestamp_and_tz_transform("2017-12-01 18:00:00.00", &hour, Datum::int(420042)); + test_timestamp_and_tz_transform("1969-12-31 23:00:00.00", &hour, Datum::int(-1)); } } diff --git a/crates/iceberg/src/transform/truncate.rs b/crates/iceberg/src/transform/truncate.rs index a8ebda8aa..767ca0036 100644 --- a/crates/iceberg/src/transform/truncate.rs +++ b/crates/iceberg/src/transform/truncate.rs @@ -20,7 +20,10 @@ use std::sync::Arc; use arrow_array::ArrayRef; use arrow_schema::DataType; -use crate::Error; +use crate::{ + spec::{Datum, PrimitiveLiteral}, + Error, +}; use super::TransformFunction; @@ -34,12 +37,28 @@ impl Truncate { Self { width } } - fn truncate_str_by_char(s: &str, max_chars: usize) -> &str { - match s.char_indices().nth(max_chars) { + #[inline] + fn truncate_str(s: &str, width: usize) -> &str { + match s.char_indices().nth(width) { None => s, Some((idx, _)) => &s[..idx], } } + + #[inline] + fn truncate_i32(v: i32, width: i32) -> i32 { + v - v.rem_euclid(width) + } + + #[inline] + fn truncate_i64(v: i64, width: i64) -> i64 { + v - (((v % width) + width) % width) + } + + #[inline] + fn truncate_decimal_i128(v: i128, width: i128) -> i128 { + v - (((v % width) + width) % width) + } } impl TransformFunction for Truncate { @@ -56,7 +75,7 @@ impl TransformFunction for Truncate { .as_any() .downcast_ref::() .unwrap() - .unary(|v| v - v.rem_euclid(width)); + .unary(|v| Self::truncate_i32(v, width)); Ok(Arc::new(res)) } DataType::Int64 => { @@ -65,7 +84,7 @@ impl TransformFunction for Truncate { .as_any() .downcast_ref::() .unwrap() - .unary(|v| v - (((v % width) + width) % width)); + .unary(|v| Self::truncate_i64(v, width)); Ok(Arc::new(res)) } DataType::Decimal128(precision, scale) => { @@ -74,7 +93,7 @@ impl TransformFunction for Truncate { .as_any() .downcast_ref::() .unwrap() - .unary(|v| v - (((v % width) + width) % width)) + .unary(|v| Self::truncate_decimal_i128(v, width)) .with_precision_and_scale(*precision, *scale) .map_err(|err| Error::new(crate::ErrorKind::Unexpected, format!("{err}")))?; Ok(Arc::new(res)) @@ -87,7 +106,7 @@ impl TransformFunction for Truncate { .downcast_ref::() .unwrap() .iter() - .map(|v| v.map(|v| Self::truncate_str_by_char(v, len))), + .map(|v| v.map(|v| Self::truncate_str(v, len))), ); Ok(Arc::new(res)) } @@ -99,11 +118,50 @@ impl TransformFunction for Truncate { .downcast_ref::() .unwrap() .iter() - .map(|v| v.map(|v| Self::truncate_str_by_char(v, len))), + .map(|v| v.map(|v| Self::truncate_str(v, len))), ); Ok(Arc::new(res)) } - _ => unreachable!("Truncate transform only supports (int,long,decimal,string) types"), + _ => Err(crate::Error::new( + crate::ErrorKind::FeatureUnsupported, + format!( + "Unsupported data type for truncate transform: {:?}", + input.data_type() + ), + )), + } + } + + fn transform_literal(&self, input: &Datum) -> crate::Result> { + match input.literal() { + PrimitiveLiteral::Int(v) => Ok(Some({ + let width: i32 = self.width.try_into().map_err(|_| { + Error::new( + crate::ErrorKind::DataInvalid, + "width is failed to convert to i32 when truncate Int32Array", + ) + })?; + Datum::int(Self::truncate_i32(*v, width)) + })), + PrimitiveLiteral::Long(v) => Ok(Some({ + let width = self.width as i64; + Datum::long(Self::truncate_i64(*v, width)) + })), + PrimitiveLiteral::Decimal(v) => Ok(Some({ + let width = self.width as i128; + Datum::decimal(Self::truncate_decimal_i128(*v, width))? + })), + PrimitiveLiteral::String(v) => Ok(Some({ + let len = self.width as usize; + Datum::string(Self::truncate_str(v, len).to_string()) + })), + _ => Err(crate::Error::new( + crate::ErrorKind::FeatureUnsupported, + format!( + "Unsupported data type for truncate transform: {:?}", + input.data_type() + ), + )), } } } @@ -116,7 +174,7 @@ mod test { builder::PrimitiveBuilder, types::Decimal128Type, Decimal128Array, Int32Array, Int64Array, }; - use crate::transform::TransformFunction; + use crate::{spec::Datum, transform::TransformFunction}; // Test case ref from: https://iceberg.apache.org/spec/#truncate-transform-details #[test] @@ -187,32 +245,74 @@ mod test { fn test_string_truncate() { let test1 = "イロハニホヘト"; let test1_2_expected = "イロ"; - assert_eq!( - super::Truncate::truncate_str_by_char(test1, 2), - test1_2_expected - ); + assert_eq!(super::Truncate::truncate_str(test1, 2), test1_2_expected); let test1_3_expected = "イロハ"; - assert_eq!( - super::Truncate::truncate_str_by_char(test1, 3), - test1_3_expected - ); + assert_eq!(super::Truncate::truncate_str(test1, 3), test1_3_expected); let test2 = "щщаεはчωいにπάほхεろへσκζ"; let test2_7_expected = "щщаεはчω"; - assert_eq!( - super::Truncate::truncate_str_by_char(test2, 7), - test2_7_expected - ); + assert_eq!(super::Truncate::truncate_str(test2, 7), test2_7_expected); let test3 = "\u{FFFF}\u{FFFF}"; - assert_eq!(super::Truncate::truncate_str_by_char(test3, 2), test3); + assert_eq!(super::Truncate::truncate_str(test3, 2), test3); let test4 = "\u{10000}\u{10000}"; let test4_1_expected = "\u{10000}"; - assert_eq!( - super::Truncate::truncate_str_by_char(test4, 1), - test4_1_expected - ); + assert_eq!(super::Truncate::truncate_str(test4, 1), test4_1_expected); + } + + #[test] + fn test_literal_int() { + let input = Datum::int(1); + let res = super::Truncate::new(10) + .transform_literal(&input) + .unwrap() + .unwrap(); + assert_eq!(res, Datum::int(0),); + + let input = Datum::int(-1); + let res = super::Truncate::new(10) + .transform_literal(&input) + .unwrap() + .unwrap(); + assert_eq!(res, Datum::int(-10),); + } + + #[test] + fn test_literal_long() { + let input = Datum::long(1); + let res = super::Truncate::new(10) + .transform_literal(&input) + .unwrap() + .unwrap(); + assert_eq!(res, Datum::long(0),); + + let input = Datum::long(-1); + let res = super::Truncate::new(10) + .transform_literal(&input) + .unwrap() + .unwrap(); + assert_eq!(res, Datum::long(-10),); + } + + #[test] + fn test_decimal_literal() { + let input = Datum::decimal(1065).unwrap(); + let res = super::Truncate::new(50) + .transform_literal(&input) + .unwrap() + .unwrap(); + assert_eq!(res, Datum::decimal(1050).unwrap(),); + } + + #[test] + fn test_string_literal() { + let input = Datum::string("iceberg".to_string()); + let res = super::Truncate::new(3) + .transform_literal(&input) + .unwrap() + .unwrap(); + assert_eq!(res, Datum::string("ice".to_string()),); } } diff --git a/crates/iceberg/src/transform/void.rs b/crates/iceberg/src/transform/void.rs index d419430ba..7cbee27ca 100644 --- a/crates/iceberg/src/transform/void.rs +++ b/crates/iceberg/src/transform/void.rs @@ -27,4 +27,8 @@ impl TransformFunction for Void { fn transform(&self, input: ArrayRef) -> Result { Ok(new_null_array(input.data_type(), input.len())) } + + fn transform_literal(&self, _input: &crate::spec::Datum) -> Result> { + Ok(None) + } }