From da50983494d0247c0e073dabe63b3f52ea0d2288 Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Mon, 14 Oct 2024 00:44:40 -0500 Subject: [PATCH] refactor: improve builder API for taking single option --- crates/core/src/table/builder.rs | 190 ++++++++++++++++++++--------- python/hudi/__init__.py | 10 +- python/hudi/table/builder.py | 58 ++++++++- python/tests/test_table_builder.py | 52 +++++++- python/tests/test_table_read.py | 10 -- 5 files changed, 237 insertions(+), 83 deletions(-) diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index ff34571b..c037a0c5 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -49,11 +49,23 @@ pub struct TableBuilder { } macro_rules! impl_with_options { - ($struct_name:ident, $($field:ident),+) => { + ($struct_name:ident, $($field:ident, $singular:ident),+) => { impl $struct_name { $( paste! { - #[doc = "Add " $field " to the builder. Subsequent calls overwrite the previous values if the key already exists."] + #[doc = "Add " $singular " to the builder."] + #[doc = "Subsequent calls overwrite the previous values if the key already exists."] + pub fn [](mut self, k: K, v: V) -> Self + where + K: AsRef, + V: Into, + { + self.$field.insert(k.as_ref().to_string(), v.into()); + self + } + + #[doc = "Add " $field " to the builder."] + #[doc = "Subsequent calls overwrite the previous values if the key already exists."] pub fn [](mut self, options: I) -> Self where I: IntoIterator, @@ -69,7 +81,15 @@ macro_rules! impl_with_options { }; } -impl_with_options!(TableBuilder, hudi_options, storage_options, options); +impl_with_options!( + TableBuilder, + hudi_options, + hudi_option, + storage_options, + storage_option, + options, + option +); impl TableBuilder { /// Create Hudi table builder from base table uri @@ -119,6 +139,18 @@ impl TableBuilder { /// /// [note] Error may occur when 1 and 2 have conflicts. async fn resolve_options(&mut self) -> Result<()> { + self.resolve_user_provided_options(); + + // if any user-provided options are intended for cloud storage and in uppercase, + // convert them to lowercase. This is to allow `object_store` to pick them up. + self.resolve_cloud_env_vars(); + + // At this point, we have resolved the storage options needed for accessing the storage layer. + // We can now resolve the hudi options + self.resolve_hudi_options().await + } + + fn resolve_user_provided_options(&mut self) { // Insert the base path into hudi options since it is explicitly provided self.hudi_options.insert( HudiTableConfig::BasePath.as_ref().to_string(), @@ -131,39 +163,29 @@ impl TableBuilder { // Combine generic options (lower precedence) with explicit options. // Note that we treat all non-Hudi options as storage options Self::extend_if_absent(&mut self.hudi_options, &generic_hudi_opts); - Self::extend_if_absent(&mut self.storage_options, &generic_other_opts); - - // if any user-provided options are intended for cloud storage and in uppercase, - // convert them to lowercase. This is to allow `object_store` to pick them up. - Self::imbue_cloud_env_vars(&mut self.storage_options); - - // At this point, we have resolved the storage options needed for accessing the storage layer. - // We can now resolve the hudi options - Self::resolve_hudi_options(&self.storage_options, &mut self.hudi_options).await + Self::extend_if_absent(&mut self.storage_options, &generic_other_opts) } - fn imbue_cloud_env_vars(options: &mut HashMap) { + fn resolve_cloud_env_vars(&mut self) { for (key, value) in env::vars() { if Storage::CLOUD_STORAGE_PREFIXES .iter() .any(|prefix| key.starts_with(prefix)) - && !options.contains_key(&key.to_ascii_lowercase()) + && !self.storage_options.contains_key(&key.to_ascii_lowercase()) { - options.insert(key.to_ascii_lowercase(), value); + self.storage_options.insert(key.to_ascii_lowercase(), value); } } } - async fn resolve_hudi_options( - storage_options: &HashMap, - hudi_options: &mut HashMap, - ) -> Result<()> { + async fn resolve_hudi_options(&mut self) -> Result<()> { // create a [Storage] instance to load properties from storage layer. let storage = Storage::new( - Arc::new(storage_options.clone()), - Arc::new(HudiConfigs::new(hudi_options.iter())), + Arc::new(self.storage_options.clone()), + Arc::new(HudiConfigs::new(self.hudi_options.iter())), )?; + let hudi_options = &mut self.hudi_options; Self::imbue_table_properties(hudi_options, storage.clone()).await?; // TODO load Hudi configs from env vars here before loading global configs @@ -267,47 +289,101 @@ impl TableBuilder { mod tests { use super::*; + fn create_table_builder() -> TableBuilder { + TableBuilder { + base_uri: "test_uri".to_string(), + hudi_options: HashMap::new(), + storage_options: HashMap::new(), + options: HashMap::new(), + } + } + #[test] - fn test_build_from_explicit_options() { - let hudi_options = [("hoodie.option1", "value1"), ("hoodie.option3", "value3")]; - let storage_options = [ - ("AWS_REGION", "us-east-1"), - ("AWS_ENDPOINT", "s3.us-east-1.amazonaws.com"), - ]; - let builder = TableBuilder::from_base_uri("/tmp/hudi_data") - .with_hudi_options(hudi_options) - .with_storage_options(storage_options); - let hudi_options = &builder.hudi_options; - let storage_options = &builder.storage_options; - assert_eq!(hudi_options.len(), 2); - assert_eq!(hudi_options["hoodie.option1"], "value1"); - assert_eq!(hudi_options["hoodie.option3"], "value3"); - assert_eq!(storage_options.len(), 2); - assert_eq!(storage_options["AWS_REGION"], "us-east-1"); - assert_eq!( - storage_options["AWS_ENDPOINT"], - "s3.us-east-1.amazonaws.com" - ); + fn test_with_hudi_option() { + let builder = create_table_builder(); + + let updated = builder.with_hudi_option("key", "value"); + assert_eq!(updated.hudi_options["key"], "value") + } + + #[test] + fn test_with_hudi_options() { + let builder = create_table_builder(); + + let options = [("key1", "value1"), ("key2", "value2")]; + let updated = builder.with_hudi_options(options); + assert_eq!(updated.hudi_options["key1"], "value1"); + assert_eq!(updated.hudi_options["key2"], "value2") + } + + #[test] + fn test_with_storage_option() { + let builder = create_table_builder(); + + let updated = builder.with_storage_option("key", "value"); + assert_eq!(updated.storage_options["key"], "value") + } + + #[test] + fn test_with_storage_options() { + let builder = create_table_builder(); + + let options = [("key1", "value1"), ("key2", "value2")]; + let updated = builder.with_storage_options(options); + assert_eq!(updated.storage_options["key1"], "value1"); + assert_eq!(updated.storage_options["key2"], "value2"); + } + + #[test] + fn test_with_option() { + let builder = create_table_builder(); + + let updated = builder.with_option("key", "value"); + assert_eq!(updated.options["key"], "value") + } + + #[test] + fn test_with_options() { + let builder = create_table_builder(); + + let options = [("key1", "value1"), ("key2", "value2")]; + let updated = builder.with_options(options); + assert_eq!(updated.options["key1"], "value1"); + assert_eq!(updated.options["key2"], "value2") } #[test] - fn test_build_from_explicit_options_chained() { - let builder = TableBuilder::from_base_uri("/tmp/hudi_data") - .with_hudi_options([("hoodie.option1", "value1")]) - .with_hudi_options([("hoodie.option1", "value1-1")]) - .with_hudi_options([("hoodie.option3", "value3")]) - .with_storage_options([("AWS_REGION", "us-east-2")]) - .with_storage_options([("AWS_REGION", "us-east-1")]) - .with_storage_options([("AWS_ENDPOINT", "s3.us-east-1.amazonaws.com")]); - let hudi_options = &builder.hudi_options.clone(); - let storage_options = &builder.storage_options.clone(); - assert_eq!(hudi_options.len(), 2); - assert_eq!(hudi_options["hoodie.option1"], "value1-1"); - assert_eq!(hudi_options["hoodie.option3"], "value3"); - assert_eq!(storage_options.len(), 2); - assert_eq!(storage_options["AWS_REGION"], "us-east-1"); + fn test_builder_resolve_user_provided_options_should_apply_precedence_order() { + let mut builder = TableBuilder::from_base_uri("/tmp/hudi_data") + .with_hudi_option("hoodie.option1", "value1") + .with_option("hoodie.option2", "'value2") + .with_hudi_options([ + ("hoodie.option1", "value1-1"), + ("hoodie.option3", "value3"), + ("hoodie.option1", "value1-2"), + ]) + .with_storage_option("AWS_REGION", "us-east-2") + .with_storage_options([ + ("AWS_REGION", "us-east-1"), + ("AWS_ENDPOINT", "s3.us-east-1.amazonaws.com"), + ]) + .with_option("AWS_REGION", "us-west-1") + .with_options([ + ("hoodie.option3", "value3-1"), + ("hoodie.option2", "value2-1"), + ]); + + builder.resolve_user_provided_options(); + + assert_eq!(builder.hudi_options.len(), 4); + assert_eq!(builder.hudi_options["hoodie.base.path"], "/tmp/hudi_data"); + assert_eq!(builder.hudi_options["hoodie.option1"], "value1-2"); + assert_eq!(builder.hudi_options["hoodie.option2"], "value2-1"); + assert_eq!(builder.hudi_options["hoodie.option3"], "value3"); + assert_eq!(builder.storage_options.len(), 2); + assert_eq!(builder.storage_options["AWS_REGION"], "us-east-1"); assert_eq!( - storage_options["AWS_ENDPOINT"], + builder.storage_options["AWS_ENDPOINT"], "s3.us-east-1.amazonaws.com" ); } diff --git a/python/hudi/__init__.py b/python/hudi/__init__.py index b44835b2..fae5cab8 100644 --- a/python/hudi/__init__.py +++ b/python/hudi/__init__.py @@ -15,9 +15,9 @@ # specific language governing permissions and limitations # under the License. -from hudi.table.builder import HudiTableBuilder as HudiTableBuilder -from ._internal import HudiFileGroupReader as HudiFileGroupReader -from ._internal import HudiFileSlice as HudiFileSlice -from ._internal import HudiTable as HudiTable -from ._internal import __version__ as __version__ +from hudi._internal import HudiFileGroupReader as HudiFileGroupReader +from hudi._internal import HudiFileSlice as HudiFileSlice +from hudi._internal import HudiTable as HudiTable +from hudi._internal import __version__ as __version__ +from hudi.table.builder import HudiTableBuilder as HudiTableBuilder diff --git a/python/hudi/table/builder.py b/python/hudi/table/builder.py index 81790bfe..4ed9b943 100644 --- a/python/hudi/table/builder.py +++ b/python/hudi/table/builder.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. from dataclasses import dataclass, field -from typing import Dict +from typing import Dict, Optional from hudi._internal import HudiTable, build_hudi_table @@ -33,9 +33,9 @@ class HudiTableBuilder: """ base_uri: str - options: Dict[str, str] = field(default_factory=dict) hudi_options: Dict[str, str] = field(default_factory=dict) storage_options: Dict[str, str] = field(default_factory=dict) + options: Dict[str, str] = field(default_factory=dict) @classmethod def from_base_uri(cls, base_uri: str) -> "HudiTableBuilder": @@ -51,6 +51,26 @@ def from_base_uri(cls, base_uri: str) -> "HudiTableBuilder": builder = cls(base_uri) return builder + def _add_options( + self, options: Dict[str, str], category: Optional[str] = None + ) -> None: + target_attr = getattr(self, f"{category}_options") if category else self.options + target_attr.update(options) + + def with_hudi_option(self, k: str, v: str) -> "HudiTableBuilder": + """ + Adds a Hudi option to the builder. + + Parameters: + k (str): The key of the option. + v (str): The value of the option. + + Returns: + HudiTableBuilder: The builder instance. + """ + self._add_options({k: v}, "hudi") + return self + def with_hudi_options(self, hudi_options: Dict[str, str]) -> "HudiTableBuilder": """ Adds Hudi options to the builder. @@ -61,7 +81,21 @@ def with_hudi_options(self, hudi_options: Dict[str, str]) -> "HudiTableBuilder": Returns: HudiTableBuilder: The builder instance. """ - self.hudi_options.update(hudi_options) + self._add_options(hudi_options, "hudi") + return self + + def with_storage_option(self, k: str, v: str) -> "HudiTableBuilder": + """ + Adds a storage option to the builder. + + Parameters: + k (str): The key of the option. + v (str): The value of the option. + + Returns: + HudiTableBuilder: The builder instance. + """ + self._add_options({k: v}, "storage") return self def with_storage_options( @@ -76,7 +110,21 @@ def with_storage_options( Returns: HudiTableBuilder: The builder instance. """ - self.storage_options.update(storage_options) + self._add_options(storage_options, "storage") + return self + + def with_option(self, k: str, v: str) -> "HudiTableBuilder": + """ + Adds a generic option to the builder. + + Parameters: + k (str): The key of the option. + v (str): The value of the option. + + Returns: + HudiTableBuilder: The builder instance. + """ + self._add_options({k: v}) return self def with_options(self, options: Dict[str, str]) -> "HudiTableBuilder": @@ -89,7 +137,7 @@ def with_options(self, options: Dict[str, str]) -> "HudiTableBuilder": Returns: HudiTableBuilder: The builder instance. """ - self.options.update(options) + self._add_options(options) return self def build(self) -> "HudiTable": diff --git a/python/tests/test_table_builder.py b/python/tests/test_table_builder.py index 1c16f62b..1ae5099f 100644 --- a/python/tests/test_table_builder.py +++ b/python/tests/test_table_builder.py @@ -20,14 +20,54 @@ from hudi import HudiTableBuilder -PYARROW_LE_8_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) < ( - 8, - 0, - 0, + +@pytest.fixture +def base_uri(): + return "test://base/uri" + + +@pytest.fixture +def builder(base_uri): + return HudiTableBuilder(base_uri) + + +def test_initialization(builder, base_uri): + assert builder.base_uri == base_uri + assert builder.hudi_options == {} + assert builder.storage_options == {} + assert builder.options == {} + + +def test_from_base_uri(base_uri): + builder = HudiTableBuilder.from_base_uri(base_uri) + assert builder.base_uri == base_uri + + +@pytest.mark.parametrize( + "method,attr", + [ + ("with_hudi_option", "hudi_options"), + ("with_storage_option", "storage_options"), + ("with_option", "options"), + ], ) -pytestmark = pytest.mark.skipif( - PYARROW_LE_8_0_0, reason="hudi only supported if pyarrow >= 8.0.0" +def test_with_single_option(builder, method, attr): + getattr(builder, method)("key1", "value1") + assert getattr(builder, attr) == {"key1": "value1"} + + +@pytest.mark.parametrize( + "method,attr", + [ + ("with_hudi_options", "hudi_options"), + ("with_storage_options", "storage_options"), + ("with_options", "options"), + ], ) +def test_with_multiple_options(builder, method, attr): + options = {"key1": "value1", "key2": "value2"} + getattr(builder, method)(options) + assert getattr(builder, attr) == options def test_read_table_returns_correct_data(get_sample_table): diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index 6ee5ae58..cbcfaab1 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -16,19 +16,9 @@ # under the License. import pyarrow as pa -import pytest from hudi import HudiTable -PYARROW_LE_8_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) < ( - 8, - 0, - 0, -) -pytestmark = pytest.mark.skipif( - PYARROW_LE_8_0_0, reason="hudi only supported if pyarrow >= 8.0.0" -) - def test_read_table_has_correct_schema(get_sample_table): table_path = get_sample_table