Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: Update file_group crates #214

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ pub struct HudiConfigs {
}

impl HudiConfigs {
/// Create [HudiConfigs] using opitons in the form of key-value pairs.
/// Create [HudiConfigs] using options in the form of key-value pairs.
pub fn new<I, K, V>(options: I) -> Self
where
I: IntoIterator<Item = (K, V)>,
Expand Down
41 changes: 35 additions & 6 deletions crates/core/src/file_group/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,27 @@ use crate::storage::file_stats::FileStats;
use crate::storage::Storage;

/// Represents common metadata about a Hudi Base File.
///
/// A **Base File** is the primary data file in a Hudi table, containing the actual records.
/// This struct captures all essential information about a base file, including its unique
/// identifier, commit timestamp, detailed file information, and statistics(optional).
#[derive(Clone, Debug)]
pub struct BaseFile {
/// The file group id that is unique across the table.
/// The file group ID that is unique across the table.
pub file_group_id: String,

/// Timestamp indicating when the file was committed.
pub commit_time: String,

/// Detailed information about the file; contains name, uri, and size.
pub info: FileInfo,

/// Optional statistics about the file; contains the number of records and size in bytes.
pub stats: Option<FileStats>,
}

impl BaseFile {
/// Parse file name and extract file_group_id and commit_time.
/// Parses the file name and extract `file_group_id` and `commit_time`.
fn parse_file_name(file_name: &str) -> Result<(String, String)> {
let err_msg = format!("Failed to parse file name '{}' for base file.", file_name);
let (name, _) = file_name.rsplit_once('.').ok_or(anyhow!(err_msg.clone()))?;
Expand All @@ -58,7 +65,7 @@ impl BaseFile {
Ok((file_group_id, commit_time))
}

/// Construct [BaseFile] with the base file name.
/// Construct `BaseFile` with the base file name.
pub fn from_file_name(file_name: &str) -> Result<Self> {
let (file_group_id, commit_time) = Self::parse_file_name(file_name)?;
Ok(Self {
Expand All @@ -69,7 +76,7 @@ impl BaseFile {
})
}

/// Construct [BaseFile] with the [FileInfo].
/// Construct `BaseFile` with `FileInfo`
pub fn from_file_info(info: FileInfo) -> Result<Self> {
let (file_group_id, commit_time) = Self::parse_file_name(&info.name)?;
Ok(Self {
Expand All @@ -84,18 +91,25 @@ impl BaseFile {
/// Within a file group, a slice is a combination of data file written at a commit time and list of log files,
/// containing changes to the data file from that commit time.
///
/// [note] The log files are not yet supported.
/// A **File Slice** consists of a `BaseFile` (the main data file) and its associated `partition_path`.
/// Log files, which contain incremental changes to the data file.
///
/// [Note] The log files are not yet supported.
#[derive(Clone, Debug)]
pub struct FileSlice {
pub base_file: BaseFile,
pub partition_path: Option<String>,
}

impl FileSlice {
/// Retrieves the URI of the base file.
pub fn base_file_path(&self) -> &str {
self.base_file.info.uri.as_str()
}

/// Constructs the relative path of the base file by combining the partition path and file name.
///
/// If `partition_path` is `None`, only the file name is used.
pub fn base_file_relative_path(&self) -> String {
let ptn = self.partition_path.as_deref().unwrap_or_default();
let file_name = &self.base_file.info.name;
Expand All @@ -106,10 +120,12 @@ impl FileSlice {
.to_string()
}

/// Retrieves the file group ID associated with the base file.
pub fn file_group_id(&self) -> &str {
&self.base_file.file_group_id
}

/// Updates the `base_file` within the `FileSlice`.
pub fn set_base_file(&mut self, base_file: BaseFile) {
self.base_file = base_file
}
Expand All @@ -136,15 +152,25 @@ impl FileSlice {
}
}

/// Hudi File Group.
/// A group of file slices within a Hudi table, uniquely identified by an ID and may be
/// associated with a specific partition path.
///
/// A **File Group** aggregates multiple `FileSlice` instances, each corresponding to a commit
/// time.
#[derive(Clone, Debug)]
pub struct FileGroup {
/// The unique identifier for the file group.
pub id: String,

/// The path of the partition to which this file group belongs (optional).
pub partition_path: Option<String>,

/// `FileSlice`s for each `FileGroup`. Collection of `FileSlices` based on commit time.
pub file_slices: BTreeMap<String, FileSlice>,
}

impl PartialEq for FileGroup {
/// Checks if equal based on `id` and `partition_path`.
fn eq(&self, other: &Self) -> bool {
self.id == other.id && self.partition_path == other.partition_path
}
Expand Down Expand Up @@ -172,6 +198,7 @@ impl fmt::Display for FileGroup {
}

impl FileGroup {
/// Creates a new `FileGroup` instance with the specified ID and optional partition path.
pub fn new(id: String, partition_path: Option<String>) -> Self {
Self {
id,
Expand Down Expand Up @@ -206,6 +233,7 @@ impl FileGroup {
}
}

/// Retrieves the latest `FileSlice` based on timestamp.
pub fn get_file_slice_as_of(&self, timestamp: &str) -> Option<&FileSlice> {
let as_of = timestamp.to_string();
if let Some((_, file_slice)) = self.file_slices.range(..=as_of).next_back() {
Expand All @@ -215,6 +243,7 @@ impl FileGroup {
}
}

/// Retrieves a mutable reference to the latest `FileSlice` based on timestamp.
pub fn get_file_slice_mut_as_of(&mut self, timestamp: &str) -> Option<&mut FileSlice> {
let as_of = timestamp.to_string();
if let Some((_, file_slice)) = self.file_slices.range_mut(..=as_of).next_back() {
Expand Down