Skip to content

Commit

Permalink
fix: make partition loading more efficient (apache#152)
Browse files Browse the repository at this point in the history
Fix the partition loading logic such that it

- does not skip relevant partitions 
- makes file group loading parallelized
  • Loading branch information
xushiyan authored Oct 7, 2024
1 parent 65d8ed4 commit 4836a37
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 86 deletions.
81 changes: 45 additions & 36 deletions crates/core/src/table/fs_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ use crate::file_group::{BaseFile, FileGroup, FileSlice};
use crate::storage::file_info::FileInfo;
use crate::storage::{get_leaf_dirs, Storage};
use crate::table::partition::PartitionPruner;
use anyhow::{anyhow, Result};
use anyhow::Result;
use arrow::record_batch::RecordBatch;
use dashmap::DashMap;
use futures::stream::{self, StreamExt, TryStreamExt};
use url::Url;

#[derive(Clone, Debug)]
Expand All @@ -53,6 +54,10 @@ impl FileSystemView {
})
}

async fn load_all_partition_paths(storage: &Storage) -> Result<Vec<String>> {
Self::load_partition_paths(storage, &PartitionPruner::empty()).await
}

async fn load_partition_paths(
storage: &Storage,
partition_pruner: &PartitionPruner,
Expand Down Expand Up @@ -80,22 +85,6 @@ impl FileSystemView {
.collect())
}

async fn load_file_groups_for_partitions(
storage: &Storage,
partition_paths: Vec<String>,
) -> Result<HashMap<String, Vec<FileGroup>>> {
let mut partition_to_file_groups = HashMap::new();
for p in partition_paths {
match Self::load_file_groups_for_partition(storage, p.as_str()).await {
Ok(file_groups) => {
partition_to_file_groups.insert(p, file_groups);
}
Err(e) => return Err(anyhow!("Failed to load partitions: {}", e)),
}
}
Ok(partition_to_file_groups)
}

async fn load_file_groups_for_partition(
storage: &Storage,
partition_path: &str,
Expand Down Expand Up @@ -133,32 +122,52 @@ impl FileSystemView {
timestamp: &str,
partition_pruner: &PartitionPruner,
excluding_file_groups: &HashSet<FileGroup>,
) -> Result<Vec<FileSlice>> {
let all_partition_paths = Self::load_all_partition_paths(&self.storage).await?;

let partition_paths_to_load = all_partition_paths
.into_iter()
.filter(|p| !self.partition_to_file_groups.contains_key(p))
.filter(|p| partition_pruner.should_include(p))
.collect::<HashSet<_>>();

stream::iter(partition_paths_to_load)
.map(|path| async move {
let file_groups =
Self::load_file_groups_for_partition(&self.storage, &path).await?;
Ok::<_, anyhow::Error>((path, file_groups))
})
// TODO parameterize the parallelism for partition loading
.buffer_unordered(10)
.try_for_each(|(path, file_groups)| async move {
self.partition_to_file_groups.insert(path, file_groups);
Ok(())
})
.await?;

self.collect_file_slices_as_of(timestamp, partition_pruner, excluding_file_groups)
.await
}

async fn collect_file_slices_as_of(
&self,
timestamp: &str,
partition_pruner: &PartitionPruner,
excluding_file_groups: &HashSet<FileGroup>,
) -> Result<Vec<FileSlice>> {
let mut file_slices = Vec::new();
if self.partition_to_file_groups.is_empty() {
let partition_paths =
Self::load_partition_paths(&self.storage, partition_pruner).await?;
let partition_to_file_groups =
Self::load_file_groups_for_partitions(&self.storage, partition_paths).await?;
partition_to_file_groups.into_iter().for_each(|pair| {
self.partition_to_file_groups.insert(pair.0, pair.1);
});
}
for mut fgs in self
.partition_to_file_groups
.iter_mut()
.filter(|item| partition_pruner.should_include(item.key()))
{
let fgs_ref = fgs.value_mut();
for fg in fgs_ref {
for mut partition_entry in self.partition_to_file_groups.iter_mut() {
if !partition_pruner.should_include(partition_entry.key()) {
continue;
}
let file_groups = partition_entry.value_mut();
for fg in file_groups.iter_mut() {
if excluding_file_groups.contains(fg) {
continue;
}
if let Some(fsl) = fg.get_file_slice_mut_as_of(timestamp) {
// TODO: pass ref instead of copying
fsl.load_stats(&self.storage).await?;
let immut_fsl: &FileSlice = fsl;
file_slices.push(immut_fsl.clone());
file_slices.push(fsl.clone());
}
}
}
Expand Down
118 changes: 68 additions & 50 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,88 +793,106 @@ mod tests {
}

#[tokio::test]
async fn hudi_table_get_file_paths_for_simple_key_and_non_hive_style() {
async fn hudi_table_get_file_paths_for_simple_keygen_non_hive_style() {
let base_url = TestTable::V6SimplekeygenNonhivestyle.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
assert_eq!(hudi_table.timeline.instants.len(), 2);

let partition_filters = &[];
let actual: HashSet<String> = HashSet::from_iter(
hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap(),
);
let expected: HashSet<String> = HashSet::from_iter(
vec![
"10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet",
"20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet",
"30/6db57019-98ee-480e-8eb1-fb3de48e1c24-0_1-119-167_20240418172804498.parquet",
]
let actual = hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap()
.into_iter()
.map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string())
.collect::<Vec<_>>(),
);
.collect::<HashSet<_>>();
let expected = [
"10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet",
"20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet",
"30/6db57019-98ee-480e-8eb1-fb3de48e1c24-0_1-119-167_20240418172804498.parquet",
]
.map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string())
.into_iter()
.collect::<HashSet<_>>();
assert_eq!(actual, expected);

let partition_filters = &["byteField >= 10", "byteField < 30"];
let actual: HashSet<String> = HashSet::from_iter(
hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap(),
);
let expected: HashSet<String> = HashSet::from_iter(
vec![
"10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet",
"20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet",
]
let actual = hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap()
.into_iter()
.map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string())
.collect::<Vec<_>>(),
);
.collect::<HashSet<_>>();
let expected = [
"10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet",
"20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet",
]
.map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string())
.into_iter()
.collect::<HashSet<_>>();
assert_eq!(actual, expected);

let partition_filters = &["byteField > 30"];
let actual = hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap()
.into_iter()
.collect::<HashSet<_>>();
let expected = HashSet::new();
assert_eq!(actual, expected);
}

#[tokio::test]
async fn hudi_table_get_file_paths_for_complex_keygen_and_hive_style() {
async fn hudi_table_get_file_paths_for_complex_keygen_hive_style() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
assert_eq!(hudi_table.timeline.instants.len(), 2);

let partition_filters = &[];
let actual: HashSet<String> = HashSet::from_iter(
hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap(),
);
let expected: HashSet<String> = HashSet::from_iter(vec![
let actual = hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap()
.into_iter()
.collect::<HashSet<_>>();
let expected= [
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
"byteField=20/shortField=100/bb7c3a45-387f-490d-aab2-981c3f1a8ada-0_0-140-198_20240418173213674.parquet",
"byteField=30/shortField=100/4668e35e-bff8-4be9-9ff2-e7fb17ecb1a7-0_1-161-224_20240418173235694.parquet",
]
.into_iter().map(|f| { join_url_segments(&base_url, &[f]).unwrap().to_string() })
.collect::<Vec<_>>());
.map(|f| { join_url_segments(&base_url, &[f]).unwrap().to_string() })
.into_iter()
.collect::<HashSet<_>>();
assert_eq!(actual, expected);

let partition_filters = &["byteField >= 10", "byteField < 20", "shortField != 100"];
let actual: HashSet<String> = HashSet::from_iter(
hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap(),
);
let expected: HashSet<String> = HashSet::from_iter(vec![
let actual = hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap()
.into_iter()
.collect::<HashSet<_>>();
let expected = [
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
]
.into_iter().map(|f| { join_url_segments(&base_url, &[f]).unwrap().to_string() })
.collect::<Vec<_>>());
.map(|f| { join_url_segments(&base_url, &[f]).unwrap().to_string() })
.into_iter()
.collect::<HashSet<_>>();
assert_eq!(actual, expected);

let partition_filters = &["byteField > 20", "shortField = 300"];
let actual = hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap()
.into_iter()
.collect::<HashSet<_>>();
let expected = HashSet::new();
assert_eq!(actual, expected);
}

#[tokio::test]
async fn hudi_table_read_snapshot_for_complex_keygen_and_hive_style() {
async fn hudi_table_read_snapshot_for_complex_keygen_hive_style() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let partition_filters = &["byteField >= 10", "byteField < 20", "shortField != 100"];
Expand Down

0 comments on commit 4836a37

Please sign in to comment.