From 32019e196628841462cadc7b6d8f018324dfc49a Mon Sep 17 00:00:00 2001 From: zzhpro Date: Sun, 18 Aug 2024 12:05:10 +0800 Subject: [PATCH 1/3] feat: support loading hudi global configs --- crates/core/src/config/mod.rs | 2 + crates/core/src/storage/mod.rs | 14 +- crates/core/src/table/mod.rs | 127 +++++++++++++++--- .../data/hudi_conf_dir/hudi-defaults.conf | 22 +++ .../.hoodie/hoodie.properties | 37 +++++ 5 files changed, 183 insertions(+), 19 deletions(-) create mode 100644 crates/core/tests/data/hudi_conf_dir/hudi-defaults.conf create mode 100644 crates/core/tests/data/table_props_partial/.hoodie/hoodie.properties diff --git a/crates/core/src/config/mod.rs b/crates/core/src/config/mod.rs index 2b37dc71..3ba57507 100644 --- a/crates/core/src/config/mod.rs +++ b/crates/core/src/config/mod.rs @@ -26,6 +26,8 @@ pub mod internal; pub mod read; pub mod table; +pub const HUDI_CONF_DIR: &str = "HUDI_CONF_DIR"; + pub trait ConfigParser: AsRef { type Output; diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index c7eb1ec5..ffcf3048 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -28,7 +28,7 @@ use async_recursion::async_recursion; use bytes::Bytes; use futures::StreamExt; use object_store::path::Path as ObjPath; -use object_store::{parse_url_opts, ObjectStore}; +use object_store::{parse_url, parse_url_opts, ObjectStore}; use parquet::arrow::async_reader::ParquetObjectReader; use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet::file::metadata::ParquetMetaData; @@ -201,6 +201,18 @@ pub async fn get_leaf_dirs(storage: &Storage, subdir: Option<&str>) -> Result Result { + let url = Url::from_file_path(location).unwrap(); + match parse_url(&url) { + Ok((object_store, _)) => { + let obj_path = ObjPath::from_url_path(url.path()).unwrap(); + let result = object_store.get(&obj_path).await?; + Ok(result.bytes().await?) + } + Err(e) => Err(anyhow!("Failed to create storage: {}", e)), + } +} + #[cfg(test)] mod tests { use std::collections::{HashMap, HashSet}; diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index e4e1a936..197e9999 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -20,12 +20,14 @@ use std::collections::HashMap; use std::env; use std::io::{BufRead, BufReader}; +use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; use anyhow::{anyhow, Context, Result}; use arrow::record_batch::RecordBatch; use arrow_schema::Schema; +use bytes::Bytes; use strum::IntoEnumIterator; use url::Url; @@ -38,9 +40,10 @@ use crate::config::read::HudiReadConfig; use crate::config::read::HudiReadConfig::AsOfTimestamp; use crate::config::table::{HudiTableConfig, TableTypeValue}; use crate::config::HudiConfigs; +use crate::config::HUDI_CONF_DIR; use crate::file_group::FileSlice; use crate::storage::utils::{empty_options, parse_uri}; -use crate::storage::Storage; +use crate::storage::{get_file_data, Storage}; use crate::table::fs_view::FileSystemView; use crate::table::timeline::Timeline; @@ -93,6 +96,42 @@ impl Table { }) } + async fn parse_config_file( + data: &Bytes, + split_chars: &str, + hudi_options: &mut HashMap, + ) { + let cursor = std::io::Cursor::new(data); + let lines = BufReader::new(cursor).lines(); + for line in lines { + let line = line.unwrap(); + let trimmed_line = line.trim(); + if trimmed_line.is_empty() || trimmed_line.starts_with('#') { + continue; + } + let mut parts = trimmed_line.splitn(2, |c| split_chars.contains(c)); + let key = parts.next().unwrap().to_owned(); + let value = parts.next().unwrap_or("").trim().to_owned(); + hudi_options.insert(key, value); + } + } + + async fn get_global_config_data() -> Option { + match env::var(HUDI_CONF_DIR) { + Ok(hudi_conf_dir) => { + let path_buf = PathBuf::new() + .join(hudi_conf_dir) + .join("hudi-defaults.conf"); + let file_path = path_buf.to_str().unwrap(); + match get_file_data(file_path).await { + Ok(bytes) => Some(bytes), + Err(_) => None, + } + } + Err(_) => None, + } + } + #[cfg(feature = "datafusion")] pub fn register_storage( &self, @@ -115,7 +154,6 @@ impl Table { K: AsRef, V: Into, { - // TODO: load hudi global config let mut hudi_options = HashMap::new(); let mut extra_options = HashMap::new(); @@ -128,22 +166,16 @@ impl Table { extra_options.insert(k.as_ref().to_string(), v.into()); } } - let storage = Storage::new(base_url, &extra_options)?; - let data = storage.get_file_data(".hoodie/hoodie.properties").await?; - let cursor = std::io::Cursor::new(data); - let lines = BufReader::new(cursor).lines(); - for line in lines { - let line = line?; - let trimmed_line = line.trim(); - if trimmed_line.is_empty() || trimmed_line.starts_with('#') { - continue; - } - let mut parts = trimmed_line.splitn(2, '='); - let key = parts.next().unwrap().to_owned(); - let value = parts.next().unwrap_or("").to_owned(); - // `hoodie.properties` takes precedence TODO handle conflicts where applicable - hudi_options.insert(key, value); + + let global_config_data = Self::get_global_config_data().await; + if let Some(global_config_data) = global_config_data { + Self::parse_config_file(&global_config_data, " \t=", &mut hudi_options).await; } + + let storage = Storage::new(base_url, &extra_options)?; + let hoodie_properties_data = storage.get_file_data(".hoodie/hoodie.properties").await?; + Self::parse_config_file(&hoodie_properties_data, "=", &mut hudi_options).await; + let hudi_configs = HudiConfigs::new(hudi_options); Self::validate_configs(&hudi_configs).map(|_| (hudi_configs, extra_options)) @@ -278,8 +310,8 @@ impl Table { mod tests { use std::collections::HashSet; use std::fs::canonicalize; - use std::panic; use std::path::Path; + use std::{env, panic}; use url::Url; @@ -292,6 +324,7 @@ mod tests { PrecombineField, RecordKeyFields, TableName, TableType, TableVersion, TimelineLayoutVersion, }; + use crate::config::HUDI_CONF_DIR; use crate::storage::utils::join_url_segments; use crate::table::Table; @@ -599,4 +632,62 @@ mod tests { assert_eq!(configs.get(TableVersion).unwrap().to::(), 6); assert_eq!(configs.get(TimelineLayoutVersion).unwrap().to::(), 1); } + + #[tokio::test] + async fn get_global_table_props() { + // Without the environment variable HUDI_CONF_DIR + let base_url = + Url::from_file_path(canonicalize(Path::new("tests/data/table_props_partial")).unwrap()) + .unwrap(); + let table = Table::new_with_options( + base_url.as_str(), + [("hoodie.internal.skip.config.validation", "true")], + ) + .await + .unwrap(); + let configs = table.configs; + assert!(configs.get(DatabaseName).is_err()); + assert!(configs.get(TableType).is_err()); + assert_eq!(configs.get(TableName).unwrap().to::(), "trips"); + + // Environment variable HUDI_CONF_DIR points to nothing + let base_path = env::current_dir().unwrap(); + let hudi_conf_dir = base_path.join("random/wrong/dir"); + env::set_var(HUDI_CONF_DIR, hudi_conf_dir.as_os_str()); + let base_url = + Url::from_file_path(canonicalize(Path::new("tests/data/table_props_partial")).unwrap()) + .unwrap(); + let table = Table::new_with_options( + base_url.as_str(), + [("hoodie.internal.skip.config.validation", "true")], + ) + .await + .unwrap(); + let configs = table.configs; + assert!(configs.get(DatabaseName).is_err()); + assert!(configs.get(TableType).is_err()); + assert_eq!(configs.get(TableName).unwrap().to::(), "trips"); + + // With global config + let base_path = env::current_dir().unwrap(); + let hudi_conf_dir = base_path.join("tests/data/hudi_conf_dir"); + env::set_var(HUDI_CONF_DIR, hudi_conf_dir.as_os_str()); + + let base_url = + Url::from_file_path(canonicalize(Path::new("tests/data/table_props_partial")).unwrap()) + .unwrap(); + let table = Table::new_with_options( + base_url.as_str(), + [("hoodie.internal.skip.config.validation", "true")], + ) + .await + .unwrap(); + let configs = table.configs; + assert_eq!(configs.get(DatabaseName).unwrap().to::(), "tmpdb"); + assert_eq!( + configs.get(TableType).unwrap().to::(), + "MERGE_ON_READ" + ); + assert_eq!(configs.get(TableName).unwrap().to::(), "trips"); + } } diff --git a/crates/core/tests/data/hudi_conf_dir/hudi-defaults.conf b/crates/core/tests/data/hudi_conf_dir/hudi-defaults.conf new file mode 100644 index 00000000..69796b3b --- /dev/null +++ b/crates/core/tests/data/hudi_conf_dir/hudi-defaults.conf @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Default system properties included when running Hudi jobs. +# This is useful for setting default environmental settings. + +hoodie.database.name tmpdb +hoodie.table.type mor \ No newline at end of file diff --git a/crates/core/tests/data/table_props_partial/.hoodie/hoodie.properties b/crates/core/tests/data/table_props_partial/.hoodie/hoodie.properties new file mode 100644 index 00000000..3ceea578 --- /dev/null +++ b/crates/core/tests/data/table_props_partial/.hoodie/hoodie.properties @@ -0,0 +1,37 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +hoodie.table.metadata.partitions=files +hoodie.table.precombine.field=ts +hoodie.table.partition.fields=city +hoodie.archivelog.folder=archived +hoodie.table.cdc.enabled=false +hoodie.timeline.layout.version=1 +hoodie.table.checksum=3761586722 +hoodie.datasource.write.drop.partition.columns=false +hoodie.table.recordkey.fields=uuid +hoodie.table.name=trips +hoodie.partition.metafile.use.base.format=false +hoodie.datasource.write.hive_style_partitioning=false +hoodie.table.metadata.partitions.inflight= +hoodie.populate.meta.fields=true +hoodie.table.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator +hoodie.table.base.file.format=PARQUET +hoodie.datasource.write.partitionpath.urlencode=false +hoodie.table.version=6 From f4a8ffd47d44b6b2a865db375c659b9207a80cd2 Mon Sep 17 00:00:00 2001 From: zzhpro Date: Thu, 22 Aug 2024 09:55:37 +0800 Subject: [PATCH 2/3] fixed broken unit test: table::tests::get_invalid_table_props --- crates/core/src/table/mod.rs | 1 + crates/core/tests/data/hudi_conf_dir/hudi-defaults.conf | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 197e9999..17e3dca1 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -689,5 +689,6 @@ mod tests { "MERGE_ON_READ" ); assert_eq!(configs.get(TableName).unwrap().to::(), "trips"); + env::remove_var(HUDI_CONF_DIR) } } diff --git a/crates/core/tests/data/hudi_conf_dir/hudi-defaults.conf b/crates/core/tests/data/hudi_conf_dir/hudi-defaults.conf index 69796b3b..d264bc0a 100644 --- a/crates/core/tests/data/hudi_conf_dir/hudi-defaults.conf +++ b/crates/core/tests/data/hudi_conf_dir/hudi-defaults.conf @@ -19,4 +19,4 @@ # This is useful for setting default environmental settings. hoodie.database.name tmpdb -hoodie.table.type mor \ No newline at end of file +hoodie.table.type= mor \ No newline at end of file From 42297eb8f0b4c8c5dda6d3955207739b6f9454ee Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Tue, 10 Sep 2024 01:51:27 -0500 Subject: [PATCH 3/3] refactor config parsing functions --- crates/core/src/storage/mod.rs | 21 +++--- crates/core/src/storage/utils.rs | 30 ++++++++- crates/core/src/table/mod.rs | 108 ++++++++++++++++--------------- 3 files changed, 91 insertions(+), 68 deletions(-) diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index ffcf3048..5ddfde4a 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -28,7 +28,7 @@ use async_recursion::async_recursion; use bytes::Bytes; use futures::StreamExt; use object_store::path::Path as ObjPath; -use object_store::{parse_url, parse_url_opts, ObjectStore}; +use object_store::{parse_url_opts, ObjectStore}; use parquet::arrow::async_reader::ParquetObjectReader; use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet::file::metadata::ParquetMetaData; @@ -101,6 +101,13 @@ impl Storage { Ok(bytes) } + pub async fn get_file_data_from_absolute_path(&self, absolute_path: &str) -> Result { + let obj_path = ObjPath::from_absolute_path(PathBuf::from(absolute_path))?; + let result = self.object_store.get(&obj_path).await?; + let bytes = result.bytes().await?; + Ok(bytes) + } + pub async fn get_parquet_file_data(&self, relative_path: &str) -> Result { let obj_url = join_url_segments(&self.base_url, &[relative_path])?; let obj_path = ObjPath::from_url_path(obj_url.path())?; @@ -201,18 +208,6 @@ pub async fn get_leaf_dirs(storage: &Storage, subdir: Option<&str>) -> Result Result { - let url = Url::from_file_path(location).unwrap(); - match parse_url(&url) { - Ok((object_store, _)) => { - let obj_path = ObjPath::from_url_path(url.path()).unwrap(); - let result = object_store.get(&obj_path).await?; - Ok(result.bytes().await?) - } - Err(e) => Err(anyhow!("Failed to create storage: {}", e)), - } -} - #[cfg(test)] mod tests { use std::collections::{HashMap, HashSet}; diff --git a/crates/core/src/storage/utils.rs b/crates/core/src/storage/utils.rs index a38f8134..80c86c67 100644 --- a/crates/core/src/storage/utils.rs +++ b/crates/core/src/storage/utils.rs @@ -16,11 +16,13 @@ * specific language governing permissions and limitations * under the License. */ - +use std::collections::HashMap; +use std::io::{BufRead, BufReader, Cursor}; use std::path::{Path, PathBuf}; use std::str::FromStr; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; +use bytes::Bytes; use url::{ParseError, Url}; pub fn split_filename(filename: &str) -> Result<(String, String)> { @@ -80,6 +82,30 @@ pub fn empty_options<'a>() -> std::iter::Empty<(&'a str, &'a str)> { std::iter::empty::<(&str, &str)>() } +pub async fn parse_config_data(data: &Bytes, split_chars: &str) -> Result> { + let cursor = Cursor::new(data); + let lines = BufReader::new(cursor).lines(); + let mut configs = HashMap::new(); + + for line in lines { + let line = line.context("Failed to read line")?; + let trimmed_line = line.trim(); + if trimmed_line.is_empty() || trimmed_line.starts_with('#') { + continue; + } + let mut parts = trimmed_line.splitn(2, |c| split_chars.contains(c)); + let key = parts + .next() + .context("Missing key in config line")? + .trim() + .to_owned(); + let value = parts.next().unwrap_or("").trim().to_owned(); + configs.insert(key, value); + } + + Ok(configs) +} + #[cfg(test)] mod tests { use std::str::FromStr; diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 17e3dca1..82df7ed4 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -19,7 +19,6 @@ use std::collections::HashMap; use std::env; -use std::io::{BufRead, BufReader}; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; @@ -27,7 +26,6 @@ use std::sync::Arc; use anyhow::{anyhow, Context, Result}; use arrow::record_batch::RecordBatch; use arrow_schema::Schema; -use bytes::Bytes; use strum::IntoEnumIterator; use url::Url; @@ -42,8 +40,8 @@ use crate::config::table::{HudiTableConfig, TableTypeValue}; use crate::config::HudiConfigs; use crate::config::HUDI_CONF_DIR; use crate::file_group::FileSlice; -use crate::storage::utils::{empty_options, parse_uri}; -use crate::storage::{get_file_data, Storage}; +use crate::storage::utils::{empty_options, parse_config_data, parse_uri}; +use crate::storage::Storage; use crate::table::fs_view::FileSystemView; use crate::table::timeline::Timeline; @@ -96,42 +94,6 @@ impl Table { }) } - async fn parse_config_file( - data: &Bytes, - split_chars: &str, - hudi_options: &mut HashMap, - ) { - let cursor = std::io::Cursor::new(data); - let lines = BufReader::new(cursor).lines(); - for line in lines { - let line = line.unwrap(); - let trimmed_line = line.trim(); - if trimmed_line.is_empty() || trimmed_line.starts_with('#') { - continue; - } - let mut parts = trimmed_line.splitn(2, |c| split_chars.contains(c)); - let key = parts.next().unwrap().to_owned(); - let value = parts.next().unwrap_or("").trim().to_owned(); - hudi_options.insert(key, value); - } - } - - async fn get_global_config_data() -> Option { - match env::var(HUDI_CONF_DIR) { - Ok(hudi_conf_dir) => { - let path_buf = PathBuf::new() - .join(hudi_conf_dir) - .join("hudi-defaults.conf"); - let file_path = path_buf.to_str().unwrap(); - match get_file_data(file_path).await { - Ok(bytes) => Some(bytes), - Err(_) => None, - } - } - Err(_) => None, - } - } - #[cfg(feature = "datafusion")] pub fn register_storage( &self, @@ -167,14 +129,11 @@ impl Table { } } - let global_config_data = Self::get_global_config_data().await; - if let Some(global_config_data) = global_config_data { - Self::parse_config_file(&global_config_data, " \t=", &mut hudi_options).await; - } - let storage = Storage::new(base_url, &extra_options)?; - let hoodie_properties_data = storage.get_file_data(".hoodie/hoodie.properties").await?; - Self::parse_config_file(&hoodie_properties_data, "=", &mut hudi_options).await; + + Self::imbue_table_properties(&mut hudi_options, storage.clone()).await?; + + Self::imbue_global_hudi_configs(&mut hudi_options, storage.clone()).await?; let hudi_configs = HudiConfigs::new(hudi_options); @@ -182,12 +141,55 @@ impl Table { } fn imbue_cloud_env_vars(options: &mut HashMap) { - let prefixes = ["AWS_", "AZURE_", "GOOGLE_"]; - options.extend( - env::vars() - .filter(|(key, _)| prefixes.iter().any(|prefix| key.starts_with(prefix))) - .map(|(k, v)| (k.to_ascii_lowercase(), v)), - ); + const PREFIXES: [&str; 3] = ["AWS_", "AZURE_", "GOOGLE_"]; + + for (key, value) in env::vars() { + if PREFIXES.iter().any(|prefix| key.starts_with(prefix)) + && !options.contains_key(&key.to_ascii_lowercase()) + { + options.insert(key.to_ascii_lowercase(), value); + } + } + } + + async fn imbue_table_properties( + options: &mut HashMap, + storage: Arc, + ) -> Result<()> { + let bytes = storage.get_file_data(".hoodie/hoodie.properties").await?; + let table_properties = parse_config_data(&bytes, "=").await?; + + // TODO: handle the case where the same key is present in both table properties and options + for (k, v) in table_properties { + options.insert(k.to_string(), v.to_string()); + } + + Ok(()) + } + + async fn imbue_global_hudi_configs( + options: &mut HashMap, + storage: Arc, + ) -> Result<()> { + let global_config_path = env::var(HUDI_CONF_DIR) + .map(PathBuf::from) + .unwrap_or_else(|_| PathBuf::from("/etc/hudi/conf")) + .join("hudi-defaults.conf"); + + if let Ok(bytes) = storage + .get_file_data_from_absolute_path(global_config_path.to_str().unwrap()) + .await + { + if let Ok(global_configs) = parse_config_data(&bytes, " \t=").await { + for (key, value) in global_configs { + if key.starts_with("hoodie.") && !options.contains_key(&key) { + options.insert(key.to_string(), value.to_string()); + } + } + } + } + + Ok(()) } fn validate_configs(hudi_configs: &HudiConfigs) -> Result<()> {