Skip to content

Commit

Permalink
test: add tests crate and adopt testing tables (apache#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
xushiyan authored Jun 29, 2024
1 parent e15174d commit 6d35b74
Show file tree
Hide file tree
Showing 29 changed files with 739 additions and 99 deletions.
9 changes: 3 additions & 6 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@

[package]
name = "hudi-core"
version = "0.1.0"
version.workspace = true
edition.workspace = true
license.workspace = true
rust-version.workspace = true

[dependencies]
hudi-tests = { path = "../tests" }
# arrow
arrow = { workspace = true }
arrow-arith = { workspace = true }
arrow-array = { workspace = true , features = ["chrono-tz"]}
arrow-array = { workspace = true, features = ["chrono-tz"] }
arrow-buffer = { workspace = true }
arrow-cast = { workspace = true }
arrow-ipc = { workspace = true }
Expand Down Expand Up @@ -68,7 +69,3 @@ async-recursion = { workspace = true }
async-trait = { workspace = true }
tokio = { workspace = true }
futures = { workspace = true }

# test
tempfile = "3.10.1"
zip-extract = "0.1.3"
1 change: 0 additions & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ pub mod file_group;
pub mod table;
pub type HudiTable = Table;
mod storage;
pub mod test_utils;
mod timeline;

pub fn crate_version() -> &'static str {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/storage/file_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct FileInfo {
pub uri: String,
pub name: String,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/storage/file_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct FileStats {
pub num_records: i64,
}
54 changes: 38 additions & 16 deletions crates/core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ impl Storage {
.objects
.into_iter()
.map(|obj_meta| FileInfo {
uri: prefix_url
.join(obj_meta.location.filename().unwrap())
uri: join_url_segments(&prefix_url, &[obj_meta.location.filename().unwrap()])
.unwrap()
.to_string(),
name: obj_meta.location.filename().unwrap().to_string(),
Expand Down Expand Up @@ -172,6 +171,7 @@ mod tests {
use object_store::path::Path as ObjPath;
use url::Url;

use crate::storage::file_info::FileInfo;
use crate::storage::utils::join_url_segments;
use crate::storage::{get_leaf_dirs, Storage};

Expand Down Expand Up @@ -224,28 +224,50 @@ mod tests {
canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
)
.unwrap();
let storage = Storage::new(base_url, HashMap::new());
let file_names_1: Vec<String> = storage
.list_files(None)
.await
.into_iter()
.map(|file_info| file_info.name)
.collect();
assert_eq!(file_names_1, vec!["a.parquet"]);
let file_names_2: Vec<String> = storage
let storage = Storage::new(base_url.clone(), HashMap::new());
let file_info_1: Vec<FileInfo> = storage.list_files(None).await.into_iter().collect();
assert_eq!(
file_info_1,
vec![FileInfo {
uri: base_url.clone().join("a.parquet").unwrap().to_string(),
name: "a.parquet".to_string(),
size: 0,
}]
);
let file_info_2: Vec<FileInfo> = storage
.list_files(Some("part1"))
.await
.into_iter()
.map(|file_info| file_info.name)
.collect();
assert_eq!(file_names_2, vec!["b.parquet"]);
let file_names_3: Vec<String> = storage
assert_eq!(
file_info_2,
vec![FileInfo {
uri: base_url
.clone()
.join("part1/b.parquet")
.unwrap()
.to_string(),
name: "b.parquet".to_string(),
size: 0,
}]
);
let file_info_3: Vec<FileInfo> = storage
.list_files(Some("part2/part22"))
.await
.into_iter()
.map(|file_info| file_info.name)
.collect();
assert_eq!(file_names_3, vec!["c.parquet"]);
assert_eq!(
file_info_3,
vec![FileInfo {
uri: base_url
.clone()
.join("part2/part22/c.parquet")
.unwrap()
.to_string(),
name: "c.parquet".to_string(),
size: 0,
}]
);
}

#[tokio::test]
Expand Down
58 changes: 54 additions & 4 deletions crates/core/src/storage/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
* under the License.
*/

use anyhow::{anyhow, Result};
use std::path::Path;

use anyhow::{anyhow, Result};
use url::{ParseError, Url};

pub fn split_filename(filename: &str) -> Result<(String, String)> {
Expand Down Expand Up @@ -46,9 +47,58 @@ pub fn join_url_segments(base_url: &Url, segments: &[&str]) -> Result<Url> {
url.path_segments_mut().unwrap().pop();
}

url.path_segments_mut()
.map_err(|_| ParseError::RelativeUrlWithoutBase)?
.extend(segments);
for &seg in segments {
let segs: Vec<_> = seg.split('/').filter(|&s| !s.is_empty()).collect();
url.path_segments_mut()
.map_err(|_| ParseError::RelativeUrlWithoutBase)?
.extend(segs);
}

Ok(url)
}

#[cfg(test)]
mod tests {
use std::str::FromStr;

use url::Url;

use crate::storage::utils::join_url_segments;

#[test]
fn join_base_url_with_segments() {
let base_url = Url::from_str("file:///base").unwrap();

assert_eq!(
join_url_segments(&base_url, &["foo"]).unwrap(),
Url::from_str("file:///base/foo").unwrap()
);

assert_eq!(
join_url_segments(&base_url, &["/foo"]).unwrap(),
Url::from_str("file:///base/foo").unwrap()
);

assert_eq!(
join_url_segments(&base_url, &["/foo", "bar/", "/baz/"]).unwrap(),
Url::from_str("file:///base/foo/bar/baz").unwrap()
);

assert_eq!(
join_url_segments(&base_url, &["foo/", "", "bar/baz"]).unwrap(),
Url::from_str("file:///base/foo/bar/baz").unwrap()
);

assert_eq!(
join_url_segments(&base_url, &["foo1/bar1", "foo2/bar2"]).unwrap(),
Url::from_str("file:///base/foo1/bar1/foo2/bar2").unwrap()
);
}

#[test]
fn join_failed_due_to_invalid_base() {
let base_url = Url::from_str("foo:text/plain,bar").unwrap();
let result = join_url_segments(&base_url, &["foo"]);
assert!(result.is_err());
}
}
45 changes: 21 additions & 24 deletions crates/core/src/table/fs_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,53 +185,50 @@ async fn get_partitions_and_file_groups(
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::fs::canonicalize;
use std::path::Path;

use url::Url;
use hudi_tests::TestTable;

use crate::table::fs_view::FileSystemView;
use crate::test_utils::extract_test_table;

#[tokio::test]
async fn get_partition_paths() {
let fixture_path =
canonicalize(Path::new("fixtures/table/0.x_cow_partitioned.zip")).unwrap();
let base_url = Url::from_file_path(extract_test_table(&fixture_path)).unwrap();
async fn get_partition_paths_for_nonpartitioned_table() {
let base_url = TestTable::V6Nonpartitioned.url();
let fs_view = FileSystemView::new(base_url);
let partition_paths = fs_view.get_partition_paths().await.unwrap();
let partition_path_set: HashSet<&str> =
HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
assert_eq!(partition_path_set, HashSet::new(),)
}

#[tokio::test]
async fn get_partition_paths_for_complexkeygen_table() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let fs_view = FileSystemView::new(base_url);
let partition_paths = fs_view.get_partition_paths().await.unwrap();
let partition_path_set: HashSet<&str> =
HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
assert_eq!(
partition_path_set,
HashSet::from_iter(vec!["chennai", "sao_paulo", "san_francisco"])
HashSet::from_iter(vec![
"byteField=10/shortField=300",
"byteField=20/shortField=100",
"byteField=30/shortField=100"
])
)
}

#[test]
fn get_latest_file_slices() {
let fixture_path =
canonicalize(Path::new("fixtures/table/0.x_cow_partitioned.zip")).unwrap();
let base_url = Url::from_file_path(extract_test_table(&fixture_path)).unwrap();
let base_url = TestTable::V6Nonpartitioned.url();
let mut fs_view = FileSystemView::new(base_url);
fs_view.load_file_groups();
let file_slices = fs_view.get_latest_file_slices();
assert_eq!(file_slices.len(), 5);
assert_eq!(file_slices.len(), 1);
let mut fg_ids = Vec::new();
for f in file_slices {
let fp = f.file_group_id();
fg_ids.push(fp);
}
let actual: HashSet<&str> = fg_ids.into_iter().collect();
assert_eq!(
actual,
HashSet::from_iter(vec![
"780b8586-3ad0-48ef-a6a1-d2217845ce4a-0",
"d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0",
"ee915c68-d7f8-44f6-9759-e691add290d8-0",
"68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0",
"5a226868-2934-4f84-a16f-55124630c68d-0"
])
);
assert_eq!(fg_ids, vec!["a079bdb3-731c-4894-b855-abfcd6921007-0"])
}
}
68 changes: 49 additions & 19 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,21 +255,21 @@ impl ProvidesTableMetadata for Table {

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fs::canonicalize;
use std::path::Path;
use url::Url;

use hudi_tests::TestTable;

use crate::storage::utils::join_url_segments;
use crate::table::config::BaseFileFormat::Parquet;
use crate::table::config::TableType::CopyOnWrite;
use crate::table::metadata::ProvidesTableMetadata;
use crate::table::Table;
use crate::test_utils::extract_test_table;

#[test]
fn hudi_table_get_latest_schema() {
let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
let base_url = Url::from_file_path(extract_test_table(fixture_path)).unwrap();
let base_url = TestTable::V6Nonpartitioned.url();
let hudi_table = Table::new(base_url.path(), HashMap::new());
let fields: Vec<String> = hudi_table
.get_latest_schema()
Expand All @@ -285,36 +285,66 @@ mod tests {
"_hoodie_record_key",
"_hoodie_partition_path",
"_hoodie_file_name",
"ts",
"uuid",
"rider",
"driver",
"fare",
"city"
"id",
"name",
"isActive",
"byteField",
"shortField",
"intField",
"longField",
"floatField",
"doubleField",
"decimalField",
"dateField",
"timestampField",
"binaryField",
"arrayField",
"array",
"arr_struct_f1",
"arr_struct_f2",
"mapField",
"key_value",
"key",
"value",
"map_field_value_struct_f1",
"map_field_value_struct_f2",
"structField",
"field1",
"field2",
"child_struct",
"child_field1",
"child_field2"
])
);
}

#[test]
fn hudi_table_read_file_slice() {
let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
let base_url = Url::from_file_path(extract_test_table(fixture_path)).unwrap();
let base_url = TestTable::V6Nonpartitioned.url();
let mut hudi_table = Table::new(base_url.path(), HashMap::new());
let batches = hudi_table.read_file_slice(
"san_francisco/780b8586-3ad0-48ef-a6a1-d2217845ce4a-0_0-8-0_20240402123035233.parquet",
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
);
assert_eq!(batches.len(), 1);
assert_eq!(batches.first().unwrap().num_rows(), 1);
assert_eq!(batches.first().unwrap().num_columns(), 11);
assert_eq!(batches.first().unwrap().num_rows(), 4);
assert_eq!(batches.first().unwrap().num_columns(), 21);
}

#[test]
fn hudi_table_get_latest_file_paths() {
let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
let base_url = Url::from_file_path(extract_test_table(fixture_path)).unwrap();
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let mut hudi_table = Table::new(base_url.path(), HashMap::new());
assert_eq!(hudi_table.get_timeline().unwrap().instants.len(), 2);
assert_eq!(hudi_table.get_latest_file_paths().unwrap().len(), 5);
let actual: HashSet<String> =
HashSet::from_iter(hudi_table.get_latest_file_paths().unwrap());
let expected: HashSet<String> = HashSet::from_iter(vec![
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
"byteField=20/shortField=100/bb7c3a45-387f-490d-aab2-981c3f1a8ada-0_0-140-198_20240418173213674.parquet",
"byteField=30/shortField=100/4668e35e-bff8-4be9-9ff2-e7fb17ecb1a7-0_1-161-224_20240418173235694.parquet",
]
.into_iter().map(|f| { join_url_segments(&base_url, &[f]).unwrap().to_string() })
.collect::<Vec<_>>());
assert_eq!(actual, expected);
}

#[test]
Expand Down
Loading

0 comments on commit 6d35b74

Please sign in to comment.