Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into feat-impl-df-filters
Browse files Browse the repository at this point in the history
  • Loading branch information
Alon Agmon committed Sep 13, 2024
2 parents 21a38f5 + cde35ab commit c6cfa68
Show file tree
Hide file tree
Showing 34 changed files with 4,437 additions and 505 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/bindings_python_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,4 @@ jobs:
set -e
pip install hatch==1.12.0
hatch run dev:pip install dist/pyiceberg_core-*.whl --force-reinstall
hatch run dev:test
hatch run dev:test
2 changes: 1 addition & 1 deletion .github/workflows/ci_typos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Check typos
uses: crate-ci/typos@v1.23.6
uses: crate-ci/typos@v1.24.5
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ dist/*
**/venv
*.so
*.pyc
*.whl
*.tar.gz
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ iceberg = { version = "0.3.0", path = "./crates/iceberg" }
iceberg-catalog-rest = { version = "0.3.0", path = "./crates/catalog/rest" }
iceberg-catalog-hms = { version = "0.3.0", path = "./crates/catalog/hms" }
iceberg-catalog-memory = { version = "0.3.0", path = "./crates/catalog/memory" }
iceberg-datafusion = { version = "0.3.0", path = "./crates/integrations/datafusion" }
itertools = "0.13"
log = "0.4"
mockito = "1"
Expand All @@ -72,9 +73,11 @@ once_cell = "1"
opendal = "0.49"
ordered-float = "4"
parquet = "52"
paste = "1"
pilota = "0.11.2"
pretty_assertions = "1.4"
port_scanner = "0.1.5"
rand = "0.8"
regex = "1.10.5"
reqwest = { version = "0.12", default-features = false, features = ["json"] }
rust_decimal = "1.31"
Expand All @@ -86,7 +89,7 @@ serde_repr = "0.1.16"
serde_with = "3.4"
tempfile = "3.8"
tokio = { version = "1", default-features = false }
typed-builder = "0.19"
typed-builder = "0.20"
url = "2"
urlencoding = "2"
uuid = { version = "1.6.1", features = ["v7"] }
Expand Down
3 changes: 2 additions & 1 deletion bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ crate-type = ["cdylib"]

[dependencies]
iceberg = { path = "../../crates/iceberg" }
pyo3 = { version = "0.22", features = ["extension-module"] }
pyo3 = { version = "0.21", features = ["extension-module"] }
arrow = { version = "52", features = ["pyarrow"] }
1 change: 1 addition & 0 deletions bindings/python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ ignore = ["F403", "F405"]
dependencies = [
"maturin>=1.0,<2.0",
"pytest>=8.3.2",
"pyarrow>=17.0.0",
]

[tool.hatch.envs.dev.scripts]
Expand Down
24 changes: 24 additions & 0 deletions bindings/python/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use pyo3::exceptions::PyValueError;
use pyo3::PyErr;

/// Convert an iceberg error to a python error
pub fn to_py_err(err: iceberg::Error) -> PyErr {
PyValueError::new_err(err.to_string())
}
12 changes: 4 additions & 8 deletions bindings/python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use iceberg::io::FileIOBuilder;
use pyo3::prelude::*;

#[pyfunction]
fn hello_world() -> PyResult<String> {
let _ = FileIOBuilder::new_fs_io().build().unwrap();
Ok("Hello, world!".to_string())
}
mod error;
mod transform;

#[pymodule]
fn pyiceberg_core_rust(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(hello_world, m)?)?;
fn pyiceberg_core_rust(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
transform::register_module(py, m)?;
Ok(())
}
93 changes: 93 additions & 0 deletions bindings/python/src/transform.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::{make_array, Array, ArrayData};
use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use iceberg::spec::Transform;
use iceberg::transform::create_transform_function;
use pyo3::prelude::*;

use crate::error::to_py_err;

#[pyfunction]
pub fn identity(py: Python, array: PyObject) -> PyResult<PyObject> {
apply(py, array, Transform::Identity)
}

#[pyfunction]
pub fn void(py: Python, array: PyObject) -> PyResult<PyObject> {
apply(py, array, Transform::Void)
}

#[pyfunction]
pub fn year(py: Python, array: PyObject) -> PyResult<PyObject> {
apply(py, array, Transform::Year)
}

#[pyfunction]
pub fn month(py: Python, array: PyObject) -> PyResult<PyObject> {
apply(py, array, Transform::Month)
}

#[pyfunction]
pub fn day(py: Python, array: PyObject) -> PyResult<PyObject> {
apply(py, array, Transform::Day)
}

#[pyfunction]
pub fn hour(py: Python, array: PyObject) -> PyResult<PyObject> {
apply(py, array, Transform::Hour)
}

#[pyfunction]
pub fn bucket(py: Python, array: PyObject, num_buckets: u32) -> PyResult<PyObject> {
apply(py, array, Transform::Bucket(num_buckets))
}

#[pyfunction]
pub fn truncate(py: Python, array: PyObject, width: u32) -> PyResult<PyObject> {
apply(py, array, Transform::Truncate(width))
}

fn apply(py: Python, array: PyObject, transform: Transform) -> PyResult<PyObject> {
// import
let array = ArrayData::from_pyarrow_bound(array.bind(py))?;
let array = make_array(array);
let transform_function = create_transform_function(&transform).map_err(to_py_err)?;
let array = transform_function.transform(array).map_err(to_py_err)?;
// export
let array = array.into_data();
array.to_pyarrow(py)
}

pub fn register_module(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
let this = PyModule::new_bound(py, "transform")?;

this.add_function(wrap_pyfunction!(identity, &this)?)?;
this.add_function(wrap_pyfunction!(void, &this)?)?;
this.add_function(wrap_pyfunction!(year, &this)?)?;
this.add_function(wrap_pyfunction!(month, &this)?)?;
this.add_function(wrap_pyfunction!(day, &this)?)?;
this.add_function(wrap_pyfunction!(hour, &this)?)?;
this.add_function(wrap_pyfunction!(bucket, &this)?)?;
this.add_function(wrap_pyfunction!(truncate, &this)?)?;

m.add_submodule(&this)?;
py.import_bound("sys")?
.getattr("modules")?
.set_item("pyiceberg_core.transform", this)
}
22 changes: 0 additions & 22 deletions bindings/python/tests/test_basic.py

This file was deleted.

99 changes: 99 additions & 0 deletions bindings/python/tests/test_transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from datetime import date, datetime

import pyarrow as pa
import pytest
from pyiceberg_core import transform


def test_identity_transform():
arr = pa.array([1, 2])
result = transform.identity(arr)
assert result == arr


def test_bucket_transform():
arr = pa.array([1, 2])
result = transform.bucket(arr, 10)
expected = pa.array([6, 2], type=pa.int32())
assert result == expected


def test_bucket_transform_fails_for_list_type_input():
arr = pa.array([[1, 2], [3, 4]])
with pytest.raises(
ValueError,
match=r"FeatureUnsupported => Unsupported data type for bucket transform",
):
transform.bucket(arr, 10)


def test_bucket_chunked_array():
chunked = pa.chunked_array([pa.array([1, 2]), pa.array([3, 4])])
result_chunks = []
for arr in chunked.iterchunks():
result_chunks.append(transform.bucket(arr, 10))

expected = pa.chunked_array(
[pa.array([6, 2], type=pa.int32()), pa.array([5, 0], type=pa.int32())]
)
assert pa.chunked_array(result_chunks).equals(expected)


def test_year_transform():
arr = pa.array([date(1970, 1, 1), date(2000, 1, 1)])
result = transform.year(arr)
expected = pa.array([0, 30], type=pa.int32())
assert result == expected


def test_month_transform():
arr = pa.array([date(1970, 1, 1), date(2000, 4, 1)])
result = transform.month(arr)
expected = pa.array([0, 30 * 12 + 3], type=pa.int32())
assert result == expected


def test_day_transform():
arr = pa.array([date(1970, 1, 1), date(2000, 4, 1)])
result = transform.day(arr)
expected = pa.array([0, 11048], type=pa.int32())
assert result == expected


def test_hour_transform():
arr = pa.array([datetime(1970, 1, 1, 19, 1, 23), datetime(2000, 3, 1, 12, 1, 23)])
result = transform.hour(arr)
expected = pa.array([19, 264420], type=pa.int32())
assert result == expected


def test_truncate_transform():
arr = pa.array(["this is a long string", "hi my name is sung"])
result = transform.truncate(arr, 5)
expected = pa.array(["this ", "hi my"])
assert result == expected


def test_identity_transform_with_direct_import():
from pyiceberg_core.transform import identity

arr = pa.array([1, 2])
result = identity(arr)
assert result == arr
2 changes: 1 addition & 1 deletion crates/catalog/memory/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ mod tests {
let expected_sorted_order = SortOrder::builder()
.with_order_id(0)
.with_fields(vec![])
.build(expected_schema.clone())
.build(expected_schema)
.unwrap();

assert_eq!(
Expand Down
8 changes: 2 additions & 6 deletions crates/catalog/rest/tests/rest_catalog_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,8 @@ async fn test_create_table() {
assert_eq!(table.metadata().format_version(), FormatVersion::V2);
assert!(table.metadata().current_snapshot().is_none());
assert!(table.metadata().history().is_empty());
assert!(table.metadata().default_sort_order().unwrap().is_unsorted());
assert!(table
.metadata()
.default_partition_spec()
.unwrap()
.is_unpartitioned());
assert!(table.metadata().default_sort_order().is_unsorted());
assert!(table.metadata().default_partition_spec().is_unpartitioned());
}

#[tokio::test]
Expand Down
Loading

0 comments on commit c6cfa68

Please sign in to comment.