Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: align python table APIs with rust #267

Merged
merged 4 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ jobs:
- name: Integration tests
run: |
cd demo
./run_app.sh
./run_demo.sh

publish-coverage:
name: Publish coverage reports to codecov.io
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/file_group/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl FileGroupReader {
.map_err(|e| ReadFileSliceError(format!("Failed to filter records: {e:?}")))
}

pub async fn read_file_slice(
pub(crate) async fn read_file_slice(
&self,
file_slice: &FileSlice,
base_file_only: bool,
Expand Down
88 changes: 65 additions & 23 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,27 @@
.await
}

pub fn hudi_options(&self) -> HashMap<String, String> {
self.hudi_configs.as_options()
}

pub fn storage_options(&self) -> HashMap<String, String> {
self.storage_options.as_ref().clone()

Check warning on line 148 in crates/core/src/table/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/mod.rs#L147-L148

Added lines #L147 - L148 were not covered by tests
}

#[cfg(feature = "datafusion")]
pub fn register_storage(
&self,
runtime_env: Arc<datafusion::execution::runtime_env::RuntimeEnv>,
) {
self.timeline
.storage
.register_object_store(runtime_env.clone());
self.file_system_view
.storage
.register_object_store(runtime_env.clone());
}

pub fn base_url(&self) -> Url {
let err_msg = format!("{:?} is missing or invalid.", HudiTableConfig::BasePath);
self.hudi_configs
Expand All @@ -165,27 +186,6 @@
.to::<String>()
}

pub fn hudi_options(&self) -> HashMap<String, String> {
self.hudi_configs.as_options()
}

pub fn storage_options(&self) -> HashMap<String, String> {
self.storage_options.as_ref().clone()
}

#[cfg(feature = "datafusion")]
pub fn register_storage(
&self,
runtime_env: Arc<datafusion::execution::runtime_env::RuntimeEnv>,
) {
self.timeline
.storage
.register_object_store(runtime_env.clone());
self.file_system_view
.storage
.register_object_store(runtime_env.clone());
}

/// Get the latest [Schema] of the table.
pub async fn get_schema(&self) -> Result<Schema> {
self.timeline.get_latest_schema().await
Expand Down Expand Up @@ -222,7 +222,36 @@
n: usize,
filters: &[(&str, &str, &str)],
) -> Result<Vec<Vec<FileSlice>>> {
let file_slices = self.get_file_slices(filters).await?;
let filters = from_str_tuples(filters)?;
if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
self.get_file_slices_splits_internal(n, timestamp.to::<String>().as_str(), &filters)
.await

Check warning on line 228 in crates/core/src/table/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/mod.rs#L226-L228

Added lines #L226 - L228 were not covered by tests
} else if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() {
self.get_file_slices_splits_internal(n, timestamp, &filters)

Check warning on line 230 in crates/core/src/table/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/mod.rs#L230

Added line #L230 was not covered by tests
.await
} else {
Ok(Vec::new())
}
}

pub async fn get_file_slices_splits_as_of(
&self,
n: usize,
timestamp: &str,
filters: &[(&str, &str, &str)],
) -> Result<Vec<Vec<FileSlice>>> {
let filters = from_str_tuples(filters)?;
self.get_file_slices_splits_internal(n, timestamp, &filters)

Check warning on line 244 in crates/core/src/table/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/mod.rs#L244

Added line #L244 was not covered by tests
.await
}

async fn get_file_slices_splits_internal(
&self,
n: usize,
timestamp: &str,
filters: &[Filter],
) -> Result<Vec<Vec<FileSlice>>> {
let file_slices = self.get_file_slices_internal(timestamp, filters).await?;
if file_slices.is_empty() {
return Ok(Vec::new());
}
Expand Down Expand Up @@ -282,7 +311,7 @@
)
}

pub fn create_file_group_reader_with_filters(
fn create_file_group_reader_with_filters(
&self,
filters: &[(&str, &str, &str)],
schema: &Schema,
Expand Down Expand Up @@ -797,6 +826,19 @@
assert_eq!(file_slices_splits[1].len(), 1);
}

#[tokio::test]
async fn hudi_table_get_file_slices_splits_as_of() {
let base_url = SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor();

let hudi_table = Table::new(base_url.path()).await.unwrap();
let file_slices_splits = hudi_table
.get_file_slices_splits_as_of(2, "20250121000702475", &[])
.await
.unwrap();
assert_eq!(file_slices_splits.len(), 1);
assert_eq!(file_slices_splits[0].len(), 1);
}

#[tokio::test]
async fn hudi_table_get_file_slices_as_of_timestamps() {
let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion demo/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,5 @@ services:
AWS_REGION: us-east-1 # minio default

networks:
app_network:
demo_network:
driver: bridge
8 changes: 4 additions & 4 deletions demo/run_app.sh → demo/run_demo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ if [ $attempt -eq $max_attempts ]; then
exit 1
fi

# install dependencies and run the app
# install dependencies and run the demo apps
docker compose exec -T runner /bin/bash -c "
cd /opt/hudi-rs/python && \
make setup develop && \
cd /opt/hudi-rs/demo/app && \
cargo run --manifest-path=rust/Cargo.toml && \
python -m python.src.main
cd /opt/hudi-rs/demo/sql-datafusion && ./run.sh &&\
cd /opt/hudi-rs/demo/table-api-python && ./run.sh && \
cd /opt/hudi-rs/demo/table-api-rust && ./run.sh
"
4 changes: 2 additions & 2 deletions demo/app/rust/Cargo.toml → demo/sql-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
# keep this empty such that it won't be linked to the repo workspace

[package]
name = "app"
name = "demo-sql-datafusion"
version = "0.1.0"
edition = "2021"

[dependencies]
tokio = "^1"
datafusion = "^43"
hudi = { path = "../../../crates/hudi", features = ["datafusion"] }
hudi = { path = "../../crates/hudi", features = ["datafusion"] }
21 changes: 21 additions & 0 deletions demo/sql-datafusion/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

cargo run
66 changes: 66 additions & 0 deletions demo/sql-datafusion/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

use std::sync::Arc;

use datafusion::error::Result;
use datafusion::prelude::{DataFrame, SessionContext};
use hudi::HudiDataSource;

#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
let hudi = HudiDataSource::new("s3://hudi-demo/cow/v6_complexkeygen_hivestyle").await?;
ctx.register_table("cow_v6_table", Arc::new(hudi))?;
let df: DataFrame = ctx.sql("SELECT * from cow_v6_table").await?;
assert_eq!(
df.schema()
.columns()
.iter()
.map(|c| c.name())
.collect::<Vec<_>>(),
vec![
"_hoodie_commit_time",
"_hoodie_commit_seqno",
"_hoodie_record_key",
"_hoodie_partition_path",
"_hoodie_file_name",
"id",
"name",
"isActive",
"intField",
"longField",
"floatField",
"doubleField",
"decimalField",
"dateField",
"timestampField",
"binaryField",
"arrayField",
"mapField",
"structField",
"byteField",
"shortField",
]
);
assert_eq!(df.count().await?, 4);

println!("SQL (DataFusion): read snapshot successfully!");
Ok(())
}
21 changes: 21 additions & 0 deletions demo/table-api-python/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

python -m src.main
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
# specific language governing permissions and limitations
# under the License.

from hudi import HudiTableBuilder
import pyarrow as pa

from hudi import HudiTableBuilder

for url in [
"s3://hudi-demo/cow/v6_complexkeygen_hivestyle",
"s3://hudi-demo/mor/v6_complexkeygen_hivestyle",
]:
hudi_table = HudiTableBuilder.from_base_uri(url).build()
records = hudi_table.read_snapshot()
batches = hudi_table.read_snapshot()

arrow_table = pa.Table.from_batches(records)
arrow_table = pa.Table.from_batches(batches)
assert arrow_table.schema.names == [
"_hoodie_commit_time",
"_hoodie_commit_seqno",
Expand All @@ -51,4 +52,4 @@
]
assert arrow_table.num_rows == 4

print("Python API: read snapshot successfully!")
print("Table API (Python): read snapshot successfully!")
30 changes: 30 additions & 0 deletions demo/table-api-rust/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[workspace]
# keep this empty such that it won't be linked to the repo workspace

[package]
name = "demo-table-api-rust"
version = "0.1.0"
edition = "2021"

[dependencies]
tokio = "^1"
arrow = { version = "= 53.3.0", features = ["pyarrow"] }

hudi = { path = "../../crates/hudi" }
21 changes: 21 additions & 0 deletions demo/table-api-rust/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

cargo run
Loading
Loading