Skip to content

Commit

Permalink
feat: add TimelineSelector to support timeline loading (apache#233)
Browse files Browse the repository at this point in the history
  • Loading branch information
xushiyan authored Jan 4, 2025
1 parent 0766f51 commit f02cabf
Show file tree
Hide file tree
Showing 10 changed files with 802 additions and 124 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ serde_json = { version = "1" }
# "stdlib"
thiserror = { version = "2.0.3" }
bytes = { version = "1" }
chrono = { version = "0.4" }
paste = { version = "1.0.15" }
once_cell = { version = "1.19.0" }
strum = { version = "0.26.3", features = ["derive"] }
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ serde_json = { workspace = true }
# "stdlib"
thiserror = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
paste = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
Expand Down
59 changes: 59 additions & 0 deletions crates/core/src/config/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ pub enum HudiTableConfig {

/// Version of timeline used, by the table.
TimelineLayoutVersion,

/// Timezone of the timeline timestamps. Default to UTC.
TimelineTimezone,
}

impl AsRef<str> for HudiTableConfig {
Expand All @@ -124,6 +127,7 @@ impl AsRef<str> for HudiTableConfig {
Self::TableType => "hoodie.table.type",
Self::TableVersion => "hoodie.table.version",
Self::TimelineLayoutVersion => "hoodie.timeline.layout.version",
Self::TimelineTimezone => "hoodie.table.timeline.timezone",
}
}
}
Expand All @@ -137,6 +141,9 @@ impl ConfigParser for HudiTableConfig {
Self::DropsPartitionFields => Some(HudiConfigValue::Boolean(false)),
Self::PartitionFields => Some(HudiConfigValue::List(vec![])),
Self::PopulatesMetaFields => Some(HudiConfigValue::Boolean(true)),
Self::TimelineTimezone => Some(HudiConfigValue::String(
TimelineTimezoneValue::UTC.as_ref().to_string(),
)),
_ => None,
}
}
Expand Down Expand Up @@ -202,6 +209,9 @@ impl ConfigParser for HudiTableConfig {
isize::from_str(v).map_err(|e| ParseInt(self.key(), v.to_string(), e))
})
.map(HudiConfigValue::Integer),
Self::TimelineTimezone => get_result
.and_then(TimelineTimezoneValue::from_str)
.map(|v| HudiConfigValue::String(v.as_ref().to_string())),
}
}
}
Expand Down Expand Up @@ -246,6 +256,27 @@ impl FromStr for BaseFileFormatValue {
}
}

/// Config value for [HudiTableConfig::TimelineTimezone].
#[derive(Clone, Debug, PartialEq, AsRefStr)]
pub enum TimelineTimezoneValue {
#[strum(serialize = "utc")]
UTC,
#[strum(serialize = "local")]
Local,
}

impl FromStr for TimelineTimezoneValue {
type Err = ConfigError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"utc" => Ok(Self::UTC),
"local" => Ok(Self::Local),
v => Err(InvalidValue(v.to_string())),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -313,4 +344,32 @@ mod tests {
UnsupportedValue(_)
));
}

#[test]
fn create_timeline_timezone() {
assert_eq!(
TimelineTimezoneValue::from_str("utc").unwrap(),
TimelineTimezoneValue::UTC
);
assert_eq!(
TimelineTimezoneValue::from_str("uTc").unwrap(),
TimelineTimezoneValue::UTC
);
assert_eq!(
TimelineTimezoneValue::from_str("local").unwrap(),
TimelineTimezoneValue::Local
);
assert_eq!(
TimelineTimezoneValue::from_str("LOCAL").unwrap(),
TimelineTimezoneValue::Local
);
assert!(matches!(
TimelineTimezoneValue::from_str("").unwrap_err(),
InvalidValue(_)
));
assert!(matches!(
TimelineTimezoneValue::from_str("foo").unwrap_err(),
InvalidValue(_)
));
}
}
1 change: 1 addition & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub mod expr;
pub mod file_group;
pub mod storage;
pub mod table;
pub mod timeline;
pub mod util;

use error::Result;
20 changes: 0 additions & 20 deletions crates/core/src/storage/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,11 @@
*/

//! Utility functions for storage.
use std::path::Path;
use url::Url;

use crate::storage::error::StorageError::{InvalidPath, UrlParseError};
use crate::storage::Result;

/// Splits a filename into a stem and an extension.
pub fn split_filename(filename: &str) -> Result<(String, String)> {
let path = Path::new(filename);

let stem = path
.file_stem()
.and_then(|s| s.to_str())
.ok_or(InvalidPath(format!("No file stem found in {}", filename)))?
.to_string();

let extension = path
.extension()
.and_then(|e| e.to_str())
.unwrap_or_default()
.to_string();

Ok((stem, extension))
}

/// Parses a URI string into a URL.
pub fn parse_uri(uri: &str) -> Result<Url> {
let mut url = match Url::parse(uri) {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/table/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ use crate::config::{HudiConfigs, HUDI_CONF_DIR};
use crate::error::CoreError;
use crate::storage::Storage;
use crate::table::fs_view::FileSystemView;
use crate::table::timeline::Timeline;
use crate::table::Table;
use crate::timeline::Timeline;
use crate::Result;

/// Builder for creating a [Table] instance.
Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@
pub mod builder;
mod fs_view;
pub mod partition;
mod timeline;

use crate::config::read::HudiReadConfig::AsOfTimestamp;
use crate::config::table::HudiTableConfig;
Expand All @@ -100,7 +99,7 @@ use crate::file_group::FileSlice;
use crate::table::builder::TableBuilder;
use crate::table::fs_view::FileSystemView;
use crate::table::partition::PartitionPruner;
use crate::table::timeline::Timeline;
use crate::timeline::Timeline;
use crate::Result;

use arrow::record_batch::RecordBatch;
Expand Down
Loading

0 comments on commit f02cabf

Please sign in to comment.