From fe50b421b8ea8a4e42c55f79740c1fb699d031af Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Sun, 26 Jan 2025 15:33:01 -0600 Subject: [PATCH 1/4] feat: align python table APIs with rust --- crates/core/src/file_group/reader.rs | 2 +- crates/core/src/table/mod.rs | 75 +++++++++++++++++++--------- python/hudi/_internal.pyi | 38 +++++++++----- python/src/internal.rs | 64 +++++++++++++++++++++++- 4 files changed, 141 insertions(+), 38 deletions(-) diff --git a/crates/core/src/file_group/reader.rs b/crates/core/src/file_group/reader.rs index 2a0610ec..113a27a3 100644 --- a/crates/core/src/file_group/reader.rs +++ b/crates/core/src/file_group/reader.rs @@ -121,7 +121,7 @@ impl FileGroupReader { .map_err(|e| ReadFileSliceError(format!("Failed to filter records: {e:?}"))) } - pub async fn read_file_slice( + pub(crate) async fn read_file_slice( &self, file_slice: &FileSlice, base_file_only: bool, diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 0a163b8c..97d39fb5 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -140,6 +140,27 @@ impl Table { .await } + pub fn hudi_options(&self) -> HashMap { + self.hudi_configs.as_options() + } + + pub fn storage_options(&self) -> HashMap { + self.storage_options.as_ref().clone() + } + + #[cfg(feature = "datafusion")] + pub fn register_storage( + &self, + runtime_env: Arc, + ) { + self.timeline + .storage + .register_object_store(runtime_env.clone()); + self.file_system_view + .storage + .register_object_store(runtime_env.clone()); + } + pub fn base_url(&self) -> Url { let err_msg = format!("{:?} is missing or invalid.", HudiTableConfig::BasePath); self.hudi_configs @@ -165,27 +186,6 @@ impl Table { .to::() } - pub fn hudi_options(&self) -> HashMap { - self.hudi_configs.as_options() - } - - pub fn storage_options(&self) -> HashMap { - self.storage_options.as_ref().clone() - } - - #[cfg(feature = "datafusion")] - pub fn register_storage( - &self, - runtime_env: Arc, - ) { - self.timeline - .storage - .register_object_store(runtime_env.clone()); - self.file_system_view - .storage - .register_object_store(runtime_env.clone()); - } - /// Get the latest [Schema] of the table. pub async fn get_schema(&self) -> Result { self.timeline.get_latest_schema().await @@ -222,7 +222,36 @@ impl Table { n: usize, filters: &[(&str, &str, &str)], ) -> Result>> { - let file_slices = self.get_file_slices(filters).await?; + let filters = from_str_tuples(filters)?; + if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) { + self.get_file_slices_splits_internal(n, timestamp.to::().as_str(), &filters) + .await + } else if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() { + self.get_file_slices_splits_internal(n, timestamp, &filters) + .await + } else { + Ok(Vec::new()) + } + } + + pub async fn get_file_slices_splits_as_of( + &self, + n: usize, + timestamp: &str, + filters: &[(&str, &str, &str)], + ) -> Result>> { + let filters = from_str_tuples(filters)?; + self.get_file_slices_splits_internal(n, timestamp, &filters) + .await + } + + async fn get_file_slices_splits_internal( + &self, + n: usize, + timestamp: &str, + filters: &[Filter], + ) -> Result>> { + let file_slices = self.get_file_slices_internal(timestamp, filters).await?; if file_slices.is_empty() { return Ok(Vec::new()); } @@ -282,7 +311,7 @@ impl Table { ) } - pub fn create_file_group_reader_with_filters( + fn create_file_group_reader_with_filters( &self, filters: &[(&str, &str, &str)], schema: &Schema, diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi index eb4b1108..58086e31 100644 --- a/python/hudi/_internal.pyi +++ b/python/hudi/_internal.pyi @@ -109,36 +109,36 @@ class HudiTable: options (Optional[Dict[str, str]]): Additional configuration options (optional). """ ... - def get_schema(self) -> "pyarrow.Schema": + def hudi_options(self) -> Dict[str, str]: """ - Returns the schema of the Hudi table. + Get hudi options for table. Returns: - pyarrow.Schema: The schema of the table. + Dict[str, str]: A dictionary of hudi options. """ ... - def get_partition_schema(self) -> "pyarrow.Schema": + def storage_options(self) -> Dict[str, str]: """ - Returns the partition schema of the Hudi table. + Get storage options set for table instance. Returns: - pyarrow.Schema: The schema used for partitioning the table. + Dict[str, str]: A dictionary of storage options. """ ... - def hudi_options(self) -> Dict[str, str]: + def get_schema(self) -> "pyarrow.Schema": """ - Get hudi options for table. + Returns the schema of the Hudi table. Returns: - Dict[str, str]: A dictionary of hudi options. + pyarrow.Schema: The schema of the table. """ ... - def storage_options(self) -> Dict[str, str]: + def get_partition_schema(self) -> "pyarrow.Schema": """ - Get storage options set for table instance. + Returns the partition schema of the Hudi table. Returns: - Dict[str, str]: A dictionary of storage options. + pyarrow.Schema: The schema used for partitioning the table. """ ... def get_file_slices_splits( @@ -168,6 +168,13 @@ class HudiTable: List[HudiFileSlice]: A list of file slices matching the filters. """ ... + def get_file_slices_as_of( + self, timestamp: str, filters: Optional[List[Tuple[str, str, str]]] + ) -> List[HudiFileSlice]: + """ + Retrieves all file slices in the Hudi table as of a timestamp, optionally filtered by the provided filters. + """ + ... def create_file_group_reader(self) -> HudiFileGroupReader: """ Creates a HudiFileGroupReader for reading records from file groups in the Hudi table. @@ -189,6 +196,13 @@ class HudiTable: List[pyarrow.RecordBatch]: A list of record batches from the snapshot of the table. """ ... + def read_snapshot_as_of( + self, timestamp: str, filters: Optional[List[Tuple[str, str, str]]] + ) -> List["pyarrow.RecordBatch"]: + """ + Reads the snapshot of the Hudi table as of a timestamp, optionally filtered by the provided filters. + """ + ... def read_incremental_records( self, start_timestamp: str, end_timestamp: Optional[str] ) -> List["pyarrow.RecordBatch"]: ... diff --git a/python/src/internal.rs b/python/src/internal.rs index 1bfb953c..b09ea12f 100644 --- a/python/src/internal.rs +++ b/python/src/internal.rs @@ -17,12 +17,11 @@ * under the License. */ +use arrow::pyarrow::ToPyArrow; use std::collections::HashMap; use std::convert::From; use std::path::PathBuf; use std::sync::OnceLock; - -use arrow::pyarrow::ToPyArrow; use tokio::runtime::Runtime; use hudi::error::CoreError; @@ -208,6 +207,30 @@ impl HudiTable { }) } + #[pyo3(signature = (n, timestamp, filters=None))] + fn get_file_slices_splits_as_of( + &self, + n: usize, + timestamp: &str, + filters: Option>, + py: Python, + ) -> PyResult>> { + let filters = filters.unwrap_or_default(); + + py.allow_threads(|| { + let file_slices = rt() + .block_on( + self.inner + .get_file_slices_splits_as_of(n, timestamp, &filters.as_strs()), + ) + .map_err(PythonError::from)?; + Ok(file_slices + .iter() + .map(|inner_vec| inner_vec.iter().map(convert_file_slice).collect()) + .collect()) + }) + } + #[pyo3(signature = (filters=None))] fn get_file_slices( &self, @@ -224,6 +247,26 @@ impl HudiTable { }) } + #[pyo3(signature = (timestamp, filters=None))] + fn get_file_slices_as_of( + &self, + timestamp: &str, + filters: Option>, + py: Python, + ) -> PyResult> { + let filters = filters.unwrap_or_default(); + + py.allow_threads(|| { + let file_slices = rt() + .block_on( + self.inner + .get_file_slices_as_of(timestamp, &filters.as_strs()), + ) + .map_err(PythonError::from)?; + Ok(file_slices.iter().map(convert_file_slice).collect()) + }) + } + fn create_file_group_reader(&self) -> PyResult { let fg_reader = self.inner.create_file_group_reader(); Ok(HudiFileGroupReader { inner: fg_reader }) @@ -242,6 +285,23 @@ impl HudiTable { .to_pyarrow(py) } + #[pyo3(signature = (timestamp, filters=None))] + fn read_snapshot_as_of( + &self, + timestamp: &str, + filters: Option>, + py: Python, + ) -> PyResult { + let filters = filters.unwrap_or_default(); + + rt().block_on( + self.inner + .read_snapshot_as_of(timestamp, &filters.as_strs()), + ) + .map_err(PythonError::from)? + .to_pyarrow(py) + } + #[pyo3(signature = (start_timestamp, end_timestamp=None))] fn read_incremental_records( &self, From 8acd2d6aa69003137ae2ebd9c777ac86a2e27b06 Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Sun, 26 Jan 2025 20:05:51 -0600 Subject: [PATCH 2/4] refactor demo apps --- .github/workflows/ci.yml | 2 +- demo/{app => }/.gitignore | 0 demo/compose.yaml | 2 +- demo/{run_app.sh => run_demo.sh} | 8 +-- demo/{app/rust => sql-datafusion}/Cargo.toml | 4 +- demo/sql-datafusion/run.sh | 21 ++++++ demo/sql-datafusion/src/main.rs | 66 +++++++++++++++++++ demo/table-api-python/run.sh | 21 ++++++ .../src/__init__.py | 0 .../python => table-api-python}/src/main.py | 9 +-- demo/table-api-rust/Cargo.toml | 30 +++++++++ demo/table-api-rust/run.sh | 21 ++++++ demo/{app/rust => table-api-rust}/src/main.rs | 43 ++++++------ 13 files changed, 196 insertions(+), 31 deletions(-) rename demo/{app => }/.gitignore (100%) rename demo/{run_app.sh => run_demo.sh} (86%) rename demo/{app/rust => sql-datafusion}/Cargo.toml (91%) create mode 100755 demo/sql-datafusion/run.sh create mode 100644 demo/sql-datafusion/src/main.rs create mode 100755 demo/table-api-python/run.sh rename demo/{app/python => table-api-python}/src/__init__.py (100%) rename demo/{app/python => table-api-python}/src/main.py (91%) create mode 100644 demo/table-api-rust/Cargo.toml create mode 100755 demo/table-api-rust/run.sh rename demo/{app/rust => table-api-rust}/src/main.rs (66%) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6fc487ce..80861c07 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -120,7 +120,7 @@ jobs: - name: Integration tests run: | cd demo - ./run_app.sh + ./run_demo.sh publish-coverage: name: Publish coverage reports to codecov.io diff --git a/demo/app/.gitignore b/demo/.gitignore similarity index 100% rename from demo/app/.gitignore rename to demo/.gitignore diff --git a/demo/compose.yaml b/demo/compose.yaml index f56cd641..ab299c95 100644 --- a/demo/compose.yaml +++ b/demo/compose.yaml @@ -64,5 +64,5 @@ services: AWS_REGION: us-east-1 # minio default networks: - app_network: + demo_network: driver: bridge diff --git a/demo/run_app.sh b/demo/run_demo.sh similarity index 86% rename from demo/run_app.sh rename to demo/run_demo.sh index 839eeba3..cec677b9 100755 --- a/demo/run_app.sh +++ b/demo/run_demo.sh @@ -34,11 +34,11 @@ if [ $attempt -eq $max_attempts ]; then exit 1 fi -# install dependencies and run the app +# install dependencies and run the demo apps docker compose exec -T runner /bin/bash -c " cd /opt/hudi-rs/python && \ make setup develop && \ - cd /opt/hudi-rs/demo/app && \ - cargo run --manifest-path=rust/Cargo.toml && \ - python -m python.src.main + cd /opt/hudi-rs/demo/sql-datafusion && ./run.sh &&\ + cd /opt/hudi-rs/demo/table-api-python && ./run.sh && \ + cd /opt/hudi-rs/demo/table-api-rust && ./run.sh " diff --git a/demo/app/rust/Cargo.toml b/demo/sql-datafusion/Cargo.toml similarity index 91% rename from demo/app/rust/Cargo.toml rename to demo/sql-datafusion/Cargo.toml index 349a2d2c..9cce9250 100644 --- a/demo/app/rust/Cargo.toml +++ b/demo/sql-datafusion/Cargo.toml @@ -19,11 +19,11 @@ # keep this empty such that it won't be linked to the repo workspace [package] -name = "app" +name = "demo-sql-datafusion" version = "0.1.0" edition = "2021" [dependencies] tokio = "^1" datafusion = "^43" -hudi = { path = "../../../crates/hudi", features = ["datafusion"] } +hudi = { path = "../../crates/hudi", features = ["datafusion"] } diff --git a/demo/sql-datafusion/run.sh b/demo/sql-datafusion/run.sh new file mode 100755 index 00000000..241a60bc --- /dev/null +++ b/demo/sql-datafusion/run.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +cargo run diff --git a/demo/sql-datafusion/src/main.rs b/demo/sql-datafusion/src/main.rs new file mode 100644 index 00000000..c9623ba3 --- /dev/null +++ b/demo/sql-datafusion/src/main.rs @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use std::sync::Arc; + +use datafusion::error::Result; +use datafusion::prelude::{DataFrame, SessionContext}; +use hudi::HudiDataSource; + +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); + let hudi = HudiDataSource::new("s3://hudi-demo/cow/v6_complexkeygen_hivestyle").await?; + ctx.register_table("cow_v6_table", Arc::new(hudi))?; + let df: DataFrame = ctx.sql("SELECT * from cow_v6_table").await?; + assert_eq!( + df.schema() + .columns() + .iter() + .map(|c| c.name()) + .collect::>(), + vec![ + "_hoodie_commit_time", + "_hoodie_commit_seqno", + "_hoodie_record_key", + "_hoodie_partition_path", + "_hoodie_file_name", + "id", + "name", + "isActive", + "intField", + "longField", + "floatField", + "doubleField", + "decimalField", + "dateField", + "timestampField", + "binaryField", + "arrayField", + "mapField", + "structField", + "byteField", + "shortField", + ] + ); + assert_eq!(df.count().await?, 4); + + println!("SQL (DataFusion): read snapshot successfully!"); + Ok(()) +} diff --git a/demo/table-api-python/run.sh b/demo/table-api-python/run.sh new file mode 100755 index 00000000..e80a0578 --- /dev/null +++ b/demo/table-api-python/run.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +python -m src.main diff --git a/demo/app/python/src/__init__.py b/demo/table-api-python/src/__init__.py similarity index 100% rename from demo/app/python/src/__init__.py rename to demo/table-api-python/src/__init__.py diff --git a/demo/app/python/src/main.py b/demo/table-api-python/src/main.py similarity index 91% rename from demo/app/python/src/main.py rename to demo/table-api-python/src/main.py index cc870682..826584f3 100644 --- a/demo/app/python/src/main.py +++ b/demo/table-api-python/src/main.py @@ -15,17 +15,18 @@ # specific language governing permissions and limitations # under the License. -from hudi import HudiTableBuilder import pyarrow as pa +from hudi import HudiTableBuilder + for url in [ "s3://hudi-demo/cow/v6_complexkeygen_hivestyle", "s3://hudi-demo/mor/v6_complexkeygen_hivestyle", ]: hudi_table = HudiTableBuilder.from_base_uri(url).build() - records = hudi_table.read_snapshot() + batches = hudi_table.read_snapshot() - arrow_table = pa.Table.from_batches(records) + arrow_table = pa.Table.from_batches(batches) assert arrow_table.schema.names == [ "_hoodie_commit_time", "_hoodie_commit_seqno", @@ -51,4 +52,4 @@ ] assert arrow_table.num_rows == 4 -print("Python API: read snapshot successfully!") +print("Table API (Python): read snapshot successfully!") diff --git a/demo/table-api-rust/Cargo.toml b/demo/table-api-rust/Cargo.toml new file mode 100644 index 00000000..06fd6235 --- /dev/null +++ b/demo/table-api-rust/Cargo.toml @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[workspace] +# keep this empty such that it won't be linked to the repo workspace + +[package] +name = "demo-table-api-rust" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio = "^1" +arrow = { version = "= 53.3.0", features = ["pyarrow"] } + +hudi = { path = "../../crates/hudi" } diff --git a/demo/table-api-rust/run.sh b/demo/table-api-rust/run.sh new file mode 100755 index 00000000..241a60bc --- /dev/null +++ b/demo/table-api-rust/run.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +cargo run diff --git a/demo/app/rust/src/main.rs b/demo/table-api-rust/src/main.rs similarity index 66% rename from demo/app/rust/src/main.rs rename to demo/table-api-rust/src/main.rs index 1bfbfd3a..e9209dc1 100644 --- a/demo/app/rust/src/main.rs +++ b/demo/table-api-rust/src/main.rs @@ -17,25 +17,28 @@ * under the License. */ -use std::sync::Arc; - -use datafusion::error::Result; -use datafusion::prelude::{DataFrame, SessionContext}; -use hudi::HudiDataSource; +use arrow::compute::concat_batches; +use hudi::error::Result; +use hudi::table::builder::TableBuilder as HudiTableBuilder; #[tokio::main] async fn main() -> Result<()> { - let ctx = SessionContext::new(); - let hudi = HudiDataSource::new("s3://hudi-demo/cow/v6_complexkeygen_hivestyle").await?; - ctx.register_table("cow_v6_table", Arc::new(hudi))?; - let df: DataFrame = ctx.sql("SELECT * from cow_v6_table").await?; - assert!( - df.schema() - .columns() - .iter() - .map(|c| c.name()) - .collect::>() - == vec![ + for url in [ + "s3://hudi-demo/cow/v6_complexkeygen_hivestyle", + "s3://hudi-demo/mor/v6_complexkeygen_hivestyle", + ] { + let hudi_table = HudiTableBuilder::from_base_uri(url).build().await?; + let batches = hudi_table.read_snapshot(&[]).await?; + + let batch = concat_batches(&batches[0].schema(), &batches)?; + assert_eq!( + batch + .schema() + .fields() + .iter() + .map(|f| f.name()) + .collect::>(), + vec![ "_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", @@ -58,8 +61,10 @@ async fn main() -> Result<()> { "byteField", "shortField", ] - ); - assert!(df.count().await.unwrap() == 4); - println!("Rust API: read snapshot successfully!"); + ); + assert_eq!(batch.num_rows(), 4); + } + + println!("Table API (Rust): read snapshot successfully!"); Ok(()) } From 024dd34f5c0622a04a0b3980f87549698b8b745a Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Sun, 26 Jan 2025 20:31:45 -0600 Subject: [PATCH 3/4] add ut --- python/hudi/_internal.pyi | 7 +++++++ python/tests/test_table_read.py | 21 +++++++++++++++++---- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi index 58086e31..f5b06af4 100644 --- a/python/hudi/_internal.pyi +++ b/python/hudi/_internal.pyi @@ -155,6 +155,13 @@ class HudiTable: List[List[HudiFileSlice]]: A list of file slice groups, each group being a list of HudiFileSlice objects. """ ... + def get_file_slices_splits_as_of( + self, n: int, timestamp: str, filters: Optional[List[Tuple[str, str, str]]] + ) -> List[List[HudiFileSlice]]: + """ + Retrieves all file slices in the Hudi table as of a timestamp in 'n' splits, optionally filtered by given filters. + """ + ... def get_file_slices( self, filters: Optional[List[Tuple[str, str, str]]] ) -> List[HudiFileSlice]: diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index c35be59c..c2931aa1 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. +from itertools import chain + import pyarrow as pa from hudi import HudiTable @@ -153,13 +155,24 @@ def test_read_table_for_partition(get_sample_table): ] -def test_read_table_as_of_timestamp(get_sample_table): +def test_table_apis_as_of_timestamp(get_sample_table): table_path = get_sample_table - table = HudiTable( - table_path, options={"hoodie.read.as.of.timestamp": "20240402123035233"} + table = HudiTable(table_path) + timestamp = "20240402123035233" + + file_slices_gen = table.get_file_slices_splits_as_of(2, timestamp) + file_slices_base_paths = set( + f.base_file_relative_path() for f in chain.from_iterable(file_slices_gen) ) + assert file_slices_base_paths == { + "san_francisco/780b8586-3ad0-48ef-a6a1-d2217845ce4a-0_0-8-0_20240402123035233.parquet", + "san_francisco/d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0_1-9-0_20240402123035233.parquet", + "san_francisco/5a226868-2934-4f84-a16f-55124630c68d-0_2-10-0_20240402123035233.parquet", + "sao_paulo/ee915c68-d7f8-44f6-9759-e691add290d8-0_3-11-0_20240402123035233.parquet", + "chennai/68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0_4-12-0_20240402123035233.parquet", + } - batches = table.read_snapshot() + batches = table.read_snapshot_as_of(timestamp) t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts") assert t.to_pylist() == [ { From 754c770f3d2adfb580324942ceed17ba85a7d0e8 Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Sun, 26 Jan 2025 20:59:24 -0600 Subject: [PATCH 4/4] add ut --- crates/core/src/table/mod.rs | 13 +++++++++++++ python/hudi/_internal.pyi | 13 ++++++++++++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 97d39fb5..ee8114d9 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -826,6 +826,19 @@ mod tests { assert_eq!(file_slices_splits[1].len(), 1); } + #[tokio::test] + async fn hudi_table_get_file_slices_splits_as_of() { + let base_url = SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor(); + + let hudi_table = Table::new(base_url.path()).await.unwrap(); + let file_slices_splits = hudi_table + .get_file_slices_splits_as_of(2, "20250121000702475", &[]) + .await + .unwrap(); + assert_eq!(file_slices_splits.len(), 1); + assert_eq!(file_slices_splits[0].len(), 1); + } + #[tokio::test] async fn hudi_table_get_file_slices_as_of_timestamps() { let base_url = SampleTable::V6Nonpartitioned.url_to_cow(); diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi index f5b06af4..d6f66231 100644 --- a/python/hudi/_internal.pyi +++ b/python/hudi/_internal.pyi @@ -212,7 +212,18 @@ class HudiTable: ... def read_incremental_records( self, start_timestamp: str, end_timestamp: Optional[str] - ) -> List["pyarrow.RecordBatch"]: ... + ) -> List["pyarrow.RecordBatch"]: + """ + Reads incremental records from the Hudi table between the given timestamps. + + Parameters: + start_timestamp (str): The start timestamp (exclusive). + end_timestamp (Optional[str]): The end timestamp (inclusive). + + Returns: + List[pyarrow.RecordBatch]: A list of record batches containing incremental records. + """ + ... def build_hudi_table( base_uri: str,