-
Notifications
You must be signed in to change notification settings - Fork 189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support transform function #42
Conversation
cc @liurenjie1024 @Xuanwo @JanKaul @nastra @Fokko PTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ZENOTME . Great work, and I left some comments.
|
||
impl TransformFunction for Year { | ||
fn transform(&self, input: ArrayRef) -> Result<ArrayRef> { | ||
let array = year_dyn(&input).map_err(|err| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also add check before it for unsuport type like Date64
@liurenjie1024
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM, I think it's better to return error rather than panic when invalid input is passed.
const EPOCH_DAY_FROM_CE: i32 = 719163; | ||
const DAY_PER_SECOND: f64 = 0.0000115741; | ||
const HOUR_PER_SECOND: f64 = 1_f64 / 3600.0; | ||
const BASE_YEAR: i32 = 1970; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const BASE_YEAR: i32 = 1970; | |
const EPOCH_YEAR: i32 = 1970; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although the name itself is clear, I believe adding a brief explanation of its meaning would be beneficial.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, I spent some time to understand what's DAY_PER_SECOND
.
Besides, const
means rustc will help us calculate the value during compile time. So we can use 1_f64 / 24.0 / 3600.0
here to make it more clear.
If there are some limiation for the calculation (we do need the value to be 0.0000115741
), we can add the 1_f64 / 24.0 / 3600.0
as comment above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I try to make it more clean and self-explain without comment. Please let me know is it still not clear enogh and need comment. @Xuanwo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for your effort!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work @ZENOTME
crates/iceberg/Cargo.toml
Outdated
@@ -28,6 +28,7 @@ keywords = ["iceberg"] | |||
|
|||
[dependencies] | |||
apache-avro = "0.15.0" | |||
arrow = { version = ">=46" } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In PyIceberg we make Arrow an optional dependency because it is so big, idk if the rust implementation has the same problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, we should use arrow-array
only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems compute module is not seperate as a crate and we have to import arrow if we want to use it.🥵 https://docs.rs/arrow/latest/arrow/compute/index.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
crate::ErrorKind::FeatureUnsupported, | ||
format!("Transform {:?} is not implemented", transform), | ||
)), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Truncate
and Bucket
are missing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's left to implement in following pr? cc @ZENOTME
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this PR is mainly the framework of transform. Actually I've implement the Bucket in icelake.Is ok to implement them together in next PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have completed all transform function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to have them all in one PR, thanks! 👍🏻
/// transform will take an input array and transform it into a new array. | ||
/// The implementation of this function will need to check and downcast the input to specific | ||
/// type. | ||
fn transform(&self, input: ArrayRef) -> Result<ArrayRef>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a rust'er, but is this passing in an array?
Maybe some context on how this is implemented in Python (and the Java implementation also followed this path eventually). In PyIceberg we initialize a transform, but that one might be unbound. A common place to find a transform is in the partition spec: https://iceberg.apache.org/spec/#partition-specs
Here we have a reference to a field by the source-id
. This means that we don't know yet what kind the input is. For example, you can bucket a string, uuid, long, int, datetime, etc. Once we bind the transform to the schema, we return the actual function as a callable (we pass a function):
def transform(self, source: IcebergType, bucket: bool = True) -> Callable[[Optional[Any]], Optional[int]]:
source_type = type(source)
if source_type in {IntegerType, LongType, DateType, TimeType, TimestampType, TimestamptzType}:
def hash_func(v: Any) -> int:
return mmh3.hash(struct.pack("<q", v))
elif source_type == DecimalType:
def hash_func(v: Any) -> int:
return mmh3.hash(decimal_to_bytes(v))
elif source_type in {StringType, FixedType, BinaryType}:
def hash_func(v: Any) -> int:
return mmh3.hash(v)
elif source_type == UUIDType:
def hash_func(v: Any) -> int:
if isinstance(v, UUID):
return mmh3.hash(v.bytes)
return mmh3.hash(v)
else:
raise ValueError(f"Unknown type {source}")
if bucket:
return lambda v: (hash_func(v) & IntegerType.max) % self._num_buckets if v else None
return hash_func
This way we can initialize the transform, even when we don't know the type of the source type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two differences here:
- The transform function is the actual implementation of transform logic, it's similar to
hash_func
above in python. - The bind process happens in following
create_transform_function
. The difference is that in rust we don't accept aType
parameter, since it's included inArrayRef
. - We implement the partition function in arrow array rather iceberg value. This is because currently the most important use case for us it to implement
PartitionedWriter
, which accepts arrowRecordBatch
. And implement it on arrow array let use do it in vectorized approach(much faster than row based approach). I guess eventuall we will need to implement an other version in iceberg value and it would be useful in partition pruning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently this transform logic is specific for arrow record batch. Maybe we need to consider a interface more generic?
Actually I think the partition writer is used by external system, and arrow is a good middle layer between iceberg and external system. If we want to consider iceberg value, we need to take a effor to maintain convert of iceberg value. Is there benefit for using iceberg value?🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example when we want to execute select * from t where a = 1 and b=2
, and table t
is partitioned by a, we can prune unnecessary partitions by calculating a=1
. In this case we need to use iceberg value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I think my question should be whether we need to provide write interface use iceberg value. We should maintain iceberg value internally for patition and convert between arrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For writer, I don't think we should provide it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Transforms are used in many places.
For example when we want to execute select * from t where a = 1 and b=2, and table t is partitioned by a, we can prune unnecessary partitions by calculating a=1. In this case, we need to use iceberg value.
This brings me back to my original question. It is just a single value, and vectorization would not help much. Wouldn't it be easier to operate it on a single value?
Keep in mind that transforms are also used when writing data. When a file is being written, Iceberg keeps the upper and lower bounds of each of the columns. By default, a truncate(16)
transform is applied before storing the metrics in the manifest.
In PyIceberg, we use Arrow to write: apache/iceberg#7831 Then we use Arrows' metadata collector to get the lower and upper bound of each row group. Which we then store in the manifest file. I'm not saying that it should be the same in 🦀 , but just for the context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a file is being written, Iceberg keeps the upper and lower bounds of each of the columns. By default, a truncate(16) transform is applied before storing the metrics in the manifest.
Did you mean the lower bound(upper bound) in field summary? So it's the lower bound of original column and then apply transform to it transform(lower_bound(col))
rather than lower bound of transform column lower_bound(transform(col))
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @Fokko For more background on why we use an array can be found here: #32 (comment).
But I do think transform functions accepting single value would also be useful, maybe we can implement it in following pr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I do think transform functions accepting single value would also be useful, maybe we can implement it in following pr?
+1. And it's better to integrate transform array and single value, I can do it when implementation of single value version.
use crate::transform::TransformFunction; | ||
|
||
#[test] | ||
fn test_transform_years() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests are great, thanks!
fn test_transform_years() { | ||
let year = super::Year; | ||
let ori_date = vec![ | ||
NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also add timestamp
, timestamptz
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. And I think here NaiveDateTime(timestamp) and DataTime(timestamptz) have the same effect. I add the NaiveDateTime to cover more case. 🤔 Please let me know if something I miss.
64ddf16
to
52b9492
Compare
Hi, I have implemented the bucket and truncate transform in icelake and I'm ready to port them. Do you prefer to port them in this PR or another PR. I'm concern whether that cause this PR so big that hard to review. Anyway, I'm ok with both way and it's up to you. @liurenjie1024 @Xuanwo @JanKaul @nastra @Fokko |
Both are good to me. I prefer to put them in one pr so that it's easier to review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for my late review. Very nice work. I have one minor commont but it looks fine to me.
.as_any() | ||
.downcast_ref::<arrow_array::Int32Array>() | ||
.unwrap() | ||
.unary(|v| v - (((v % width) + width) % width)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use v - v.rem_euclid(width)
instead of v - (((v % width) + width) % width)
. But it's also fine as it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! v.rem_euclid
is more clear!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good @ZENOTME I left some comments. Handling Unicode correctly when truncating the string is an important one.
|
||
use super::Bucket; | ||
#[test] | ||
fn test_hash() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks or adding these! 👍🏻
/// transform will take an input array and transform it into a new array. | ||
/// The implementation of this function will need to check and downcast the input to specific | ||
/// type. | ||
fn transform(&self, input: ArrayRef) -> Result<ArrayRef>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Transforms are used in many places.
For example when we want to execute select * from t where a = 1 and b=2, and table t is partitioned by a, we can prune unnecessary partitions by calculating a=1. In this case, we need to use iceberg value.
This brings me back to my original question. It is just a single value, and vectorization would not help much. Wouldn't it be easier to operate it on a single value?
Keep in mind that transforms are also used when writing data. When a file is being written, Iceberg keeps the upper and lower bounds of each of the columns. By default, a truncate(16)
transform is applied before storing the metrics in the manifest.
In PyIceberg, we use Arrow to write: apache/iceberg#7831 Then we use Arrows' metadata collector to get the lower and upper bound of each row group. Which we then store in the manifest file. I'm not saying that it should be the same in 🦀 , but just for the context.
crate::ErrorKind::FeatureUnsupported, | ||
format!("Transform {:?} is not implemented", transform), | ||
)), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to have them all in one PR, thanks! 👍🏻
.downcast_ref::<arrow_array::StringArray>() | ||
.unwrap() | ||
.iter() | ||
.map(|v| v.map(|v| &v[..len])), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately we have to make sure that we don't break the Unicode encoding:
https://github.com/apache/iceberg/blob/master/api/src/main/java/org/apache/iceberg/util/UnicodeUtil.java#L36-L55
Here are some tests that might help: https://github.com/apache/iceberg/blob/master/core/src/test/java/org/apache/iceberg/TestMetricsTruncation.java#L145
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your info!
test1_3_expected | ||
); | ||
|
||
let test2 = "щщаεはчωいにπάほхεろへσκζ"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
1. identity 2. void 3. temporal
2. refactor error handle
2. refine test
This PR add transform function trait and following implementation:
related issue: #32