Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: improve TableBuilder API for taking single option #171

Merged
merged 1 commit into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 133 additions & 57 deletions crates/core/src/table/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,23 @@
}

macro_rules! impl_with_options {
($struct_name:ident, $($field:ident),+) => {
($struct_name:ident, $($field:ident, $singular:ident),+) => {
impl $struct_name {
$(
paste! {
#[doc = "Add " $field " to the builder. Subsequent calls overwrite the previous values if the key already exists."]
#[doc = "Add " $singular " to the builder."]
#[doc = "Subsequent calls overwrite the previous values if the key already exists."]
pub fn [<with_ $singular>]<K, V>(mut self, k: K, v: V) -> Self
where
K: AsRef<str>,
V: Into<String>,
{
self.$field.insert(k.as_ref().to_string(), v.into());
self
}

#[doc = "Add " $field " to the builder."]
#[doc = "Subsequent calls overwrite the previous values if the key already exists."]
pub fn [<with_ $field>]<I, K, V>(mut self, options: I) -> Self
where
I: IntoIterator<Item = (K, V)>,
Expand All @@ -69,7 +81,15 @@
};
}

impl_with_options!(TableBuilder, hudi_options, storage_options, options);
impl_with_options!(
TableBuilder,
hudi_options,
hudi_option,
storage_options,
storage_option,
options,
option
);

impl TableBuilder {
/// Create Hudi table builder from base table uri
Expand Down Expand Up @@ -119,6 +139,18 @@
///
/// [note] Error may occur when 1 and 2 have conflicts.
async fn resolve_options(&mut self) -> Result<()> {
self.resolve_user_provided_options();

// if any user-provided options are intended for cloud storage and in uppercase,
// convert them to lowercase. This is to allow `object_store` to pick them up.
self.resolve_cloud_env_vars();

// At this point, we have resolved the storage options needed for accessing the storage layer.
// We can now resolve the hudi options
self.resolve_hudi_options().await
}

fn resolve_user_provided_options(&mut self) {
// Insert the base path into hudi options since it is explicitly provided
self.hudi_options.insert(
HudiTableConfig::BasePath.as_ref().to_string(),
Expand All @@ -131,39 +163,29 @@
// Combine generic options (lower precedence) with explicit options.
// Note that we treat all non-Hudi options as storage options
Self::extend_if_absent(&mut self.hudi_options, &generic_hudi_opts);
Self::extend_if_absent(&mut self.storage_options, &generic_other_opts);

// if any user-provided options are intended for cloud storage and in uppercase,
// convert them to lowercase. This is to allow `object_store` to pick them up.
Self::imbue_cloud_env_vars(&mut self.storage_options);

// At this point, we have resolved the storage options needed for accessing the storage layer.
// We can now resolve the hudi options
Self::resolve_hudi_options(&self.storage_options, &mut self.hudi_options).await
Self::extend_if_absent(&mut self.storage_options, &generic_other_opts)
}

fn imbue_cloud_env_vars(options: &mut HashMap<String, String>) {
fn resolve_cloud_env_vars(&mut self) {
for (key, value) in env::vars() {
if Storage::CLOUD_STORAGE_PREFIXES
.iter()
.any(|prefix| key.starts_with(prefix))
&& !options.contains_key(&key.to_ascii_lowercase())
&& !self.storage_options.contains_key(&key.to_ascii_lowercase())

Check warning on line 174 in crates/core/src/table/builder.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/builder.rs#L174

Added line #L174 was not covered by tests
{
options.insert(key.to_ascii_lowercase(), value);
self.storage_options.insert(key.to_ascii_lowercase(), value);

Check warning on line 176 in crates/core/src/table/builder.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/builder.rs#L176

Added line #L176 was not covered by tests
}
}
}

async fn resolve_hudi_options(
storage_options: &HashMap<String, String>,
hudi_options: &mut HashMap<String, String>,
) -> Result<()> {
async fn resolve_hudi_options(&mut self) -> Result<()> {
// create a [Storage] instance to load properties from storage layer.
let storage = Storage::new(
Arc::new(storage_options.clone()),
Arc::new(HudiConfigs::new(hudi_options.iter())),
Arc::new(self.storage_options.clone()),
Arc::new(HudiConfigs::new(self.hudi_options.iter())),
)?;

let hudi_options = &mut self.hudi_options;

Check warning on line 188 in crates/core/src/table/builder.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/builder.rs#L188

Added line #L188 was not covered by tests
Self::imbue_table_properties(hudi_options, storage.clone()).await?;

// TODO load Hudi configs from env vars here before loading global configs
Expand Down Expand Up @@ -267,47 +289,101 @@
mod tests {
use super::*;

fn create_table_builder() -> TableBuilder {
TableBuilder {
base_uri: "test_uri".to_string(),
hudi_options: HashMap::new(),
storage_options: HashMap::new(),
options: HashMap::new(),
}
}

#[test]
fn test_build_from_explicit_options() {
let hudi_options = [("hoodie.option1", "value1"), ("hoodie.option3", "value3")];
let storage_options = [
("AWS_REGION", "us-east-1"),
("AWS_ENDPOINT", "s3.us-east-1.amazonaws.com"),
];
let builder = TableBuilder::from_base_uri("/tmp/hudi_data")
.with_hudi_options(hudi_options)
.with_storage_options(storage_options);
let hudi_options = &builder.hudi_options;
let storage_options = &builder.storage_options;
assert_eq!(hudi_options.len(), 2);
assert_eq!(hudi_options["hoodie.option1"], "value1");
assert_eq!(hudi_options["hoodie.option3"], "value3");
assert_eq!(storage_options.len(), 2);
assert_eq!(storage_options["AWS_REGION"], "us-east-1");
assert_eq!(
storage_options["AWS_ENDPOINT"],
"s3.us-east-1.amazonaws.com"
);
fn test_with_hudi_option() {
let builder = create_table_builder();

let updated = builder.with_hudi_option("key", "value");
assert_eq!(updated.hudi_options["key"], "value")
}

#[test]
fn test_with_hudi_options() {
let builder = create_table_builder();

let options = [("key1", "value1"), ("key2", "value2")];
let updated = builder.with_hudi_options(options);
assert_eq!(updated.hudi_options["key1"], "value1");
assert_eq!(updated.hudi_options["key2"], "value2")
}

#[test]
fn test_with_storage_option() {
let builder = create_table_builder();

let updated = builder.with_storage_option("key", "value");
assert_eq!(updated.storage_options["key"], "value")
}

#[test]
fn test_with_storage_options() {
let builder = create_table_builder();

let options = [("key1", "value1"), ("key2", "value2")];
let updated = builder.with_storage_options(options);
assert_eq!(updated.storage_options["key1"], "value1");
assert_eq!(updated.storage_options["key2"], "value2");
}

#[test]
fn test_with_option() {
let builder = create_table_builder();

let updated = builder.with_option("key", "value");
assert_eq!(updated.options["key"], "value")
}

#[test]
fn test_with_options() {
let builder = create_table_builder();

let options = [("key1", "value1"), ("key2", "value2")];
let updated = builder.with_options(options);
assert_eq!(updated.options["key1"], "value1");
assert_eq!(updated.options["key2"], "value2")
}

#[test]
fn test_build_from_explicit_options_chained() {
let builder = TableBuilder::from_base_uri("/tmp/hudi_data")
.with_hudi_options([("hoodie.option1", "value1")])
.with_hudi_options([("hoodie.option1", "value1-1")])
.with_hudi_options([("hoodie.option3", "value3")])
.with_storage_options([("AWS_REGION", "us-east-2")])
.with_storage_options([("AWS_REGION", "us-east-1")])
.with_storage_options([("AWS_ENDPOINT", "s3.us-east-1.amazonaws.com")]);
let hudi_options = &builder.hudi_options.clone();
let storage_options = &builder.storage_options.clone();
assert_eq!(hudi_options.len(), 2);
assert_eq!(hudi_options["hoodie.option1"], "value1-1");
assert_eq!(hudi_options["hoodie.option3"], "value3");
assert_eq!(storage_options.len(), 2);
assert_eq!(storage_options["AWS_REGION"], "us-east-1");
fn test_builder_resolve_user_provided_options_should_apply_precedence_order() {
let mut builder = TableBuilder::from_base_uri("/tmp/hudi_data")
.with_hudi_option("hoodie.option1", "value1")
.with_option("hoodie.option2", "'value2")
.with_hudi_options([
("hoodie.option1", "value1-1"),
("hoodie.option3", "value3"),
("hoodie.option1", "value1-2"),
])
.with_storage_option("AWS_REGION", "us-east-2")
.with_storage_options([
("AWS_REGION", "us-east-1"),
("AWS_ENDPOINT", "s3.us-east-1.amazonaws.com"),
])
.with_option("AWS_REGION", "us-west-1")
.with_options([
("hoodie.option3", "value3-1"),
("hoodie.option2", "value2-1"),
]);

builder.resolve_user_provided_options();

assert_eq!(builder.hudi_options.len(), 4);
assert_eq!(builder.hudi_options["hoodie.base.path"], "/tmp/hudi_data");
assert_eq!(builder.hudi_options["hoodie.option1"], "value1-2");
assert_eq!(builder.hudi_options["hoodie.option2"], "value2-1");
assert_eq!(builder.hudi_options["hoodie.option3"], "value3");
assert_eq!(builder.storage_options.len(), 2);
assert_eq!(builder.storage_options["AWS_REGION"], "us-east-1");
assert_eq!(
storage_options["AWS_ENDPOINT"],
builder.storage_options["AWS_ENDPOINT"],
"s3.us-east-1.amazonaws.com"
);
}
Expand Down
10 changes: 5 additions & 5 deletions python/hudi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
# specific language governing permissions and limitations
# under the License.

from hudi.table.builder import HudiTableBuilder as HudiTableBuilder

from ._internal import HudiFileGroupReader as HudiFileGroupReader
from ._internal import HudiFileSlice as HudiFileSlice
from ._internal import HudiTable as HudiTable
from ._internal import __version__ as __version__
from hudi._internal import HudiFileGroupReader as HudiFileGroupReader
from hudi._internal import HudiFileSlice as HudiFileSlice
from hudi._internal import HudiTable as HudiTable
from hudi._internal import __version__ as __version__
from hudi.table.builder import HudiTableBuilder as HudiTableBuilder
58 changes: 53 additions & 5 deletions python/hudi/table/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
from dataclasses import dataclass, field
from typing import Dict
from typing import Dict, Optional

from hudi._internal import HudiTable, build_hudi_table

Expand All @@ -33,9 +33,9 @@ class HudiTableBuilder:
"""

base_uri: str
options: Dict[str, str] = field(default_factory=dict)
hudi_options: Dict[str, str] = field(default_factory=dict)
storage_options: Dict[str, str] = field(default_factory=dict)
options: Dict[str, str] = field(default_factory=dict)

@classmethod
def from_base_uri(cls, base_uri: str) -> "HudiTableBuilder":
Expand All @@ -51,6 +51,26 @@ def from_base_uri(cls, base_uri: str) -> "HudiTableBuilder":
builder = cls(base_uri)
return builder

def _add_options(
self, options: Dict[str, str], category: Optional[str] = None
) -> None:
target_attr = getattr(self, f"{category}_options") if category else self.options
target_attr.update(options)

def with_hudi_option(self, k: str, v: str) -> "HudiTableBuilder":
"""
Adds a Hudi option to the builder.

Parameters:
k (str): The key of the option.
v (str): The value of the option.

Returns:
HudiTableBuilder: The builder instance.
"""
self._add_options({k: v}, "hudi")
return self

def with_hudi_options(self, hudi_options: Dict[str, str]) -> "HudiTableBuilder":
"""
Adds Hudi options to the builder.
Expand All @@ -61,7 +81,21 @@ def with_hudi_options(self, hudi_options: Dict[str, str]) -> "HudiTableBuilder":
Returns:
HudiTableBuilder: The builder instance.
"""
self.hudi_options.update(hudi_options)
self._add_options(hudi_options, "hudi")
return self

def with_storage_option(self, k: str, v: str) -> "HudiTableBuilder":
"""
Adds a storage option to the builder.

Parameters:
k (str): The key of the option.
v (str): The value of the option.

Returns:
HudiTableBuilder: The builder instance.
"""
self._add_options({k: v}, "storage")
return self

def with_storage_options(
Expand All @@ -76,7 +110,21 @@ def with_storage_options(
Returns:
HudiTableBuilder: The builder instance.
"""
self.storage_options.update(storage_options)
self._add_options(storage_options, "storage")
return self

def with_option(self, k: str, v: str) -> "HudiTableBuilder":
"""
Adds a generic option to the builder.

Parameters:
k (str): The key of the option.
v (str): The value of the option.

Returns:
HudiTableBuilder: The builder instance.
"""
self._add_options({k: v})
return self

def with_options(self, options: Dict[str, str]) -> "HudiTableBuilder":
Expand All @@ -89,7 +137,7 @@ def with_options(self, options: Dict[str, str]) -> "HudiTableBuilder":
Returns:
HudiTableBuilder: The builder instance.
"""
self.options.update(options)
self._add_options(options)
return self

def build(self) -> "HudiTable":
Expand Down
Loading
Loading