Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support transform function #42

Merged
merged 10 commits into from
Sep 22, 2023
Merged

feat: support transform function #42

merged 10 commits into from
Sep 22, 2023

Conversation

ZENOTME
Copy link
Contributor

@ZENOTME ZENOTME commented Aug 28, 2023

This PR add transform function trait and following implementation:

  1. identity
  2. void
  3. temporal

related issue: #32

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Aug 28, 2023

cc @liurenjie1024 @Xuanwo @JanKaul @nastra @Fokko PTAL

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ZENOTME . Great work, and I left some comments.

crates/iceberg/src/transform/temporal.rs Outdated Show resolved Hide resolved
crates/iceberg/src/transform/temporal.rs Outdated Show resolved Hide resolved
crates/iceberg/src/transform/temporal.rs Outdated Show resolved Hide resolved
crates/iceberg/src/transform/temporal.rs Outdated Show resolved Hide resolved
crates/iceberg/src/transform/temporal.rs Show resolved Hide resolved
crates/iceberg/src/transform/temporal.rs Outdated Show resolved Hide resolved
crates/iceberg/src/transform/temporal.rs Outdated Show resolved Hide resolved
crates/iceberg/src/error.rs Outdated Show resolved Hide resolved
@liurenjie1024 liurenjie1024 requested review from Fokko and JanKaul August 29, 2023 03:45

impl TransformFunction for Year {
fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
let array = year_dyn(&input).map_err(|err| {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add check before it for unsuport type like Date64 @liurenjie1024

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM, I think it's better to return error rather than panic when invalid input is passed.

const EPOCH_DAY_FROM_CE: i32 = 719163;
const DAY_PER_SECOND: f64 = 0.0000115741;
const HOUR_PER_SECOND: f64 = 1_f64 / 3600.0;
const BASE_YEAR: i32 = 1970;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const BASE_YEAR: i32 = 1970;
const EPOCH_YEAR: i32 = 1970;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although the name itself is clear, I believe adding a brief explanation of its meaning would be beneficial.

Copy link
Member

@Xuanwo Xuanwo Aug 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, I spent some time to understand what's DAY_PER_SECOND.

Besides, const means rustc will help us calculate the value during compile time. So we can use 1_f64 / 24.0 / 3600.0 here to make it more clear.

If there are some limiation for the calculation (we do need the value to be 0.0000115741), we can add the 1_f64 / 24.0 / 3600.0 as comment above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I try to make it more clean and self-explain without comment. Please let me know is it still not clear enogh and need comment. @Xuanwo

crates/iceberg/src/transform/temporal.rs Outdated Show resolved Hide resolved
crates/iceberg/src/transform/temporal.rs Outdated Show resolved Hide resolved
crates/iceberg/src/transform/temporal.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for your effort!

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work @ZENOTME

@@ -28,6 +28,7 @@ keywords = ["iceberg"]

[dependencies]
apache-avro = "0.15.0"
arrow = { version = ">=46" }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In PyIceberg we make Arrow an optional dependency because it is so big, idk if the rust implementation has the same problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, we should use arrow-array only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems compute module is not seperate as a crate and we have to import arrow if we want to use it.🥵 https://docs.rs/arrow/latest/arrow/compute/index.html

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are several arrow modules:
image

I guess the methods are in arrow-arith

crate::ErrorKind::FeatureUnsupported,
format!("Transform {:?} is not implemented", transform),
)),
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Truncate and Bucket are missing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's left to implement in following pr? cc @ZENOTME

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this PR is mainly the framework of transform. Actually I've implement the Bucket in icelake.Is ok to implement them together in next PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have completed all transform function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to have them all in one PR, thanks! 👍🏻

/// transform will take an input array and transform it into a new array.
/// The implementation of this function will need to check and downcast the input to specific
/// type.
fn transform(&self, input: ArrayRef) -> Result<ArrayRef>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a rust'er, but is this passing in an array?

Maybe some context on how this is implemented in Python (and the Java implementation also followed this path eventually). In PyIceberg we initialize a transform, but that one might be unbound. A common place to find a transform is in the partition spec: https://iceberg.apache.org/spec/#partition-specs
Here we have a reference to a field by the source-id. This means that we don't know yet what kind the input is. For example, you can bucket a string, uuid, long, int, datetime, etc. Once we bind the transform to the schema, we return the actual function as a callable (we pass a function):

    def transform(self, source: IcebergType, bucket: bool = True) -> Callable[[Optional[Any]], Optional[int]]:
        source_type = type(source)
        if source_type in {IntegerType, LongType, DateType, TimeType, TimestampType, TimestamptzType}:

            def hash_func(v: Any) -> int:
                return mmh3.hash(struct.pack("<q", v))

        elif source_type == DecimalType:

            def hash_func(v: Any) -> int:
                return mmh3.hash(decimal_to_bytes(v))

        elif source_type in {StringType, FixedType, BinaryType}:

            def hash_func(v: Any) -> int:
                return mmh3.hash(v)

        elif source_type == UUIDType:

            def hash_func(v: Any) -> int:
                if isinstance(v, UUID):
                    return mmh3.hash(v.bytes)
                return mmh3.hash(v)

        else:
            raise ValueError(f"Unknown type {source}")

        if bucket:
            return lambda v: (hash_func(v) & IntegerType.max) % self._num_buckets if v else None
        return hash_func

This way we can initialize the transform, even when we don't know the type of the source type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two differences here:

  1. The transform function is the actual implementation of transform logic, it's similar to hash_func above in python.
  2. The bind process happens in following create_transform_function. The difference is that in rust we don't accept a Type parameter, since it's included in ArrayRef.
  3. We implement the partition function in arrow array rather iceberg value. This is because currently the most important use case for us it to implement PartitionedWriter, which accepts arrow RecordBatch. And implement it on arrow array let use do it in vectorized approach(much faster than row based approach). I guess eventuall we will need to implement an other version in iceberg value and it would be useful in partition pruning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently this transform logic is specific for arrow record batch. Maybe we need to consider a interface more generic?

Actually I think the partition writer is used by external system, and arrow is a good middle layer between iceberg and external system. If we want to consider iceberg value, we need to take a effor to maintain convert of iceberg value. Is there benefit for using iceberg value?🤔

Copy link
Contributor

@liurenjie1024 liurenjie1024 Aug 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example when we want to execute select * from t where a = 1 and b=2, and table t is partitioned by a, we can prune unnecessary partitions by calculating a=1. In this case we need to use iceberg value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I think my question should be whether we need to provide write interface use iceberg value. We should maintain iceberg value internally for patition and convert between arrow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For writer, I don't think we should provide it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Transforms are used in many places.

For example when we want to execute select * from t where a = 1 and b=2, and table t is partitioned by a, we can prune unnecessary partitions by calculating a=1. In this case, we need to use iceberg value.

This brings me back to my original question. It is just a single value, and vectorization would not help much. Wouldn't it be easier to operate it on a single value?

Keep in mind that transforms are also used when writing data. When a file is being written, Iceberg keeps the upper and lower bounds of each of the columns. By default, a truncate(16) transform is applied before storing the metrics in the manifest.

In PyIceberg, we use Arrow to write: apache/iceberg#7831 Then we use Arrows' metadata collector to get the lower and upper bound of each row group. Which we then store in the manifest file. I'm not saying that it should be the same in 🦀 , but just for the context.

Copy link
Contributor Author

@ZENOTME ZENOTME Sep 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a file is being written, Iceberg keeps the upper and lower bounds of each of the columns. By default, a truncate(16) transform is applied before storing the metrics in the manifest.

Did you mean the lower bound(upper bound) in field summary? So it's the lower bound of original column and then apply transform to it transform(lower_bound(col)) rather than lower bound of transform column lower_bound(transform(col))?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @Fokko For more background on why we use an array can be found here: #32 (comment).

But I do think transform functions accepting single value would also be useful, maybe we can implement it in following pr?

Copy link
Contributor Author

@ZENOTME ZENOTME Sep 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I do think transform functions accepting single value would also be useful, maybe we can implement it in following pr?

+1. And it's better to integrate transform array and single value, I can do it when implementation of single value version.

use crate::transform::TransformFunction;

#[test]
fn test_transform_years() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are great, thanks!

fn test_transform_years() {
let year = super::Year;
let ori_date = vec![
NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add timestamp, timestamptz?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. And I think here NaiveDateTime(timestamp) and DataTime(timestamptz) have the same effect. I add the NaiveDateTime to cover more case. 🤔 Please let me know if something I miss.

@ZENOTME ZENOTME force-pushed the transform branch 2 times, most recently from 64ddf16 to 52b9492 Compare September 1, 2023 04:50
@ZENOTME
Copy link
Contributor Author

ZENOTME commented Sep 2, 2023

Hi, I have implemented the bucket and truncate transform in icelake and I'm ready to port them. Do you prefer to port them in this PR or another PR. I'm concern whether that cause this PR so big that hard to review. Anyway, I'm ok with both way and it's up to you. @liurenjie1024 @Xuanwo @JanKaul @nastra @Fokko

@liurenjie1024
Copy link
Contributor

Hi, I have implemented the bucket and truncate transform in icelake and I'm ready to port them. Do you prefer to port them in this PR or another PR. I'm concern whether that cause this PR so big that hard to review. Anyway, I'm ok with both way and it's up to you. @liurenjie1024 @Xuanwo @JanKaul @nastra @Fokko

Both are good to me. I prefer to put them in one pr so that it's easier to review.

Copy link
Collaborator

@JanKaul JanKaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for my late review. Very nice work. I have one minor commont but it looks fine to me.

.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.unwrap()
.unary(|v| v - (((v % width) + width) % width));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use v - v.rem_euclid(width) instead of v - (((v % width) + width) % width). But it's also fine as it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! v.rem_euclid is more clear!

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good @ZENOTME I left some comments. Handling Unicode correctly when truncating the string is an important one.


use super::Bucket;
#[test]
fn test_hash() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks or adding these! 👍🏻

/// transform will take an input array and transform it into a new array.
/// The implementation of this function will need to check and downcast the input to specific
/// type.
fn transform(&self, input: ArrayRef) -> Result<ArrayRef>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Transforms are used in many places.

For example when we want to execute select * from t where a = 1 and b=2, and table t is partitioned by a, we can prune unnecessary partitions by calculating a=1. In this case, we need to use iceberg value.

This brings me back to my original question. It is just a single value, and vectorization would not help much. Wouldn't it be easier to operate it on a single value?

Keep in mind that transforms are also used when writing data. When a file is being written, Iceberg keeps the upper and lower bounds of each of the columns. By default, a truncate(16) transform is applied before storing the metrics in the manifest.

In PyIceberg, we use Arrow to write: apache/iceberg#7831 Then we use Arrows' metadata collector to get the lower and upper bound of each row group. Which we then store in the manifest file. I'm not saying that it should be the same in 🦀 , but just for the context.

crate::ErrorKind::FeatureUnsupported,
format!("Transform {:?} is not implemented", transform),
)),
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to have them all in one PR, thanks! 👍🏻

.downcast_ref::<arrow_array::StringArray>()
.unwrap()
.iter()
.map(|v| v.map(|v| &v[..len])),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your info!

test1_3_expected
);

let test2 = "щщаεはчωいにπάほхεろへσκζ";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@Fokko Fokko merged commit a03000d into apache:main Sep 22, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants