From d35c6d1a700e46776a45facbe855a272c9f64670 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sun, 11 Aug 2024 10:54:13 +0200 Subject: [PATCH] feat: improve test fixtures to generate actions and data --- crates/core/src/kernel/models/actions.rs | 15 +- crates/core/src/kernel/scalars.rs | 53 ++++ crates/core/src/kernel/snapshot/mod.rs | 37 +-- crates/core/src/lib.rs | 3 + .../transaction/conflict_checker.rs | 104 ++++---- crates/core/src/operations/transaction/mod.rs | 2 - .../src/operations/transaction/protocol.rs | 41 +-- .../core/src/operations/transaction/state.rs | 44 +++- .../src/operations/transaction/test_utils.rs | 171 ------------- .../core/src/test_utils/factories/actions.rs | 112 ++++++++ crates/core/src/test_utils/factories/data.rs | 241 ++++++++++++++++++ crates/core/src/test_utils/factories/mod.rs | 66 +++++ crates/core/src/test_utils/mod.rs | 5 + crates/test/src/lib.rs | 6 +- 14 files changed, 604 insertions(+), 296 deletions(-) delete mode 100644 crates/core/src/operations/transaction/test_utils.rs create mode 100644 crates/core/src/test_utils/factories/actions.rs create mode 100644 crates/core/src/test_utils/factories/data.rs create mode 100644 crates/core/src/test_utils/factories/mod.rs create mode 100644 crates/core/src/test_utils/mod.rs diff --git a/crates/core/src/kernel/models/actions.rs b/crates/core/src/kernel/models/actions.rs index 962b71b21b..f5c1129dfc 100644 --- a/crates/core/src/kernel/models/actions.rs +++ b/crates/core/src/kernel/models/actions.rs @@ -1,11 +1,7 @@ use std::collections::{HashMap, HashSet}; use std::fmt; use std::str::FromStr; -// use std::io::{Cursor, Read}; -// use std::sync::Arc; -// use roaring::RoaringTreemap; -use crate::DeltaConfigKey; use maplit::hashset; use serde::{Deserialize, Serialize}; use tracing::warn; @@ -13,9 +9,10 @@ use url::Url; use super::schema::StructType; use crate::kernel::{error::Error, DeltaResult}; +use crate::DeltaConfigKey; -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] /// Defines a file format used in table +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct Format { /// Name of the encoding for files in this table pub provider: String, @@ -1126,15 +1123,9 @@ pub(crate) mod serde_path { #[cfg(test)] mod tests { use std::path::PathBuf; - // use std::sync::Arc; - - // use object_store::local::LocalFileSystem; - - use crate::kernel::PrimitiveType; use super::*; - // use crate::client::filesystem::ObjectStoreFileSystemClient; - // use crate::executor::tokio::TokioBackgroundExecutor; + use crate::kernel::PrimitiveType; fn dv_relateive() -> DeletionVectorDescriptor { DeletionVectorDescriptor { diff --git a/crates/core/src/kernel/scalars.rs b/crates/core/src/kernel/scalars.rs index 92c6838234..a884933308 100644 --- a/crates/core/src/kernel/scalars.rs +++ b/crates/core/src/kernel/scalars.rs @@ -8,6 +8,8 @@ use delta_kernel::{ schema::StructField, }; use object_store::path::Path; +#[cfg(test)] +use serde_json::Value; use std::cmp::Ordering; use urlencoding::encode; @@ -21,6 +23,9 @@ pub trait ScalarExt: Sized { fn serialize_encoded(&self) -> String; /// Create a [`Scalar`] from an arrow array row fn from_array(arr: &dyn Array, index: usize) -> Option; + /// Serialize as serde_json::Value + #[cfg(test)] + fn to_json(&self) -> serde_json::Value; } impl ScalarExt for Scalar { @@ -218,6 +223,54 @@ impl ScalarExt for Scalar { | Null => None, } } + + /// Serializes this scalar as a serde_json::Value. + #[cfg(test)] + fn to_json(&self) -> serde_json::Value { + match self { + Self::String(s) => Value::String(s.to_owned()), + Self::Byte(b) => Value::Number(serde_json::Number::from(*b)), + Self::Short(s) => Value::Number(serde_json::Number::from(*s)), + Self::Integer(i) => Value::Number(serde_json::Number::from(*i)), + Self::Long(l) => Value::Number(serde_json::Number::from(*l)), + Self::Float(f) => Value::Number(serde_json::Number::from_f64(*f as f64).unwrap()), + Self::Double(d) => Value::Number(serde_json::Number::from_f64(*d).unwrap()), + Self::Boolean(b) => Value::Bool(*b), + Self::TimestampNtz(ts) | Self::Timestamp(ts) => { + let ts = Utc.timestamp_micros(*ts).single().unwrap(); + Value::String(ts.format("%Y-%m-%d %H:%M:%S%.6f").to_string()) + } + Self::Date(days) => { + let date = DateTime::from_timestamp(*days as i64 * 24 * 3600, 0).unwrap(); + Value::String(date.format("%Y-%m-%d").to_string()) + } + Self::Decimal(value, _, scale) => match scale.cmp(&0) { + Ordering::Equal => Value::String(value.to_string()), + Ordering::Greater => { + let scalar_multiple = 10_i128.pow(*scale as u32); + let mut s = String::new(); + s.push_str((value / scalar_multiple).to_string().as_str()); + s.push('.'); + s.push_str(&format!( + "{:0>scale$}", + value % scalar_multiple, + scale = *scale as usize + )); + Value::String(s) + } + Ordering::Less => { + let mut s = value.to_string(); + for _ in 0..*scale { + s.push('0'); + } + Value::String(s) + } + }, + Self::Binary(val) => Value::String(create_escaped_binary_string(val.as_slice())), + Self::Null(_) => Value::Null, + Self::Struct(_) => unimplemented!(), + } + } } fn create_escaped_binary_string(data: &[u8]) -> String { diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 4d9b3bd8d1..c0825a5c0b 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -777,6 +777,7 @@ mod tests { use super::*; use crate::kernel::Remove; use crate::protocol::{DeltaOperation, SaveMode}; + use crate::test_utils::ActionFactory; #[tokio::test] async fn test_snapshots() -> TestResult { @@ -991,23 +992,9 @@ mod tests { ]); let partition_columns = vec!["date".to_string()]; - let metadata = Metadata { - id: "id".to_string(), - name: None, - description: None, - format: Default::default(), - schema_string: serde_json::to_string(&schema).unwrap(), - partition_columns, - configuration: Default::default(), - created_time: None, - }; + let metadata = ActionFactory::metadata(&schema, Some(&partition_columns), None); + let protocol = ActionFactory::protocol(None, None, None::>, None::>); - let protocol = Protocol { - min_reader_version: 1, - min_writer_version: 2, - reader_features: Default::default(), - writer_features: Default::default(), - }; let commit_data = CommitData::new( vec![ Action::Protocol(protocol.clone()), @@ -1015,7 +1002,7 @@ mod tests { ], DeltaOperation::Write { mode: SaveMode::Append, - partition_by: None, + partition_by: Some(partition_columns), predicate: None, }, HashMap::new(), @@ -1025,27 +1012,17 @@ mod tests { let snapshot = Snapshot { log_segment: log_segment.clone(), - config: Default::default(), - protocol: Default::default(), + protocol: protocol.clone(), metadata, schema: schema.clone(), table_url: "table".to_string(), + config: Default::default(), }; let expected = StructType::new(vec![StructField::new("date", DataType::DATE, true)]); assert_eq!(snapshot.partitions_schema(None).unwrap(), Some(expected)); - let metadata = Metadata { - id: "id".to_string(), - name: None, - description: None, - format: Default::default(), - schema_string: serde_json::to_string(&schema).unwrap(), - partition_columns: vec![], - configuration: Default::default(), - created_time: None, - }; - + let metadata = ActionFactory::metadata(&schema, None::>, None); let commit_data = CommitData::new( vec![ Action::Protocol(protocol.clone()), diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 4ef9fc06fd..93e3ff29ed 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -79,6 +79,9 @@ pub mod schema; pub mod storage; pub mod table; +#[cfg(test)] +pub mod test_utils; + #[cfg(feature = "datafusion")] pub mod delta_datafusion; pub mod writer; diff --git a/crates/core/src/operations/transaction/conflict_checker.rs b/crates/core/src/operations/transaction/conflict_checker.rs index d44c704b53..f9fbe84f8d 100644 --- a/crates/core/src/operations/transaction/conflict_checker.rs +++ b/crates/core/src/operations/transaction/conflict_checker.rs @@ -645,28 +645,30 @@ pub(super) fn can_downgrade_to_snapshot_isolation<'a>( #[cfg(test)] #[allow(unused)] mod tests { - use super::super::test_utils as tu; - use super::super::test_utils::init_table_actions; - use super::*; - use crate::kernel::Action; + use std::collections::HashMap; + #[cfg(feature = "datafusion")] use datafusion_expr::{col, lit}; use serde_json::json; - fn get_stats(min: i64, max: i64) -> Option { - let data = json!({ - "numRecords": 18, - "minValues": { - "value": min - }, - "maxValues": { - "value": max - }, - "nullCount": { - "value": 0 - } - }); - Some(data.to_string()) + use super::*; + use crate::kernel::Action; + use crate::test_utils::{ActionFactory, TestSchemas}; + + fn simple_add(data_change: bool, min: &str, max: &str) -> Add { + ActionFactory::add( + TestSchemas::simple(), + HashMap::from_iter([("value", (min, max))]), + Default::default(), + true, + ) + } + + fn init_table_actions() -> Vec { + vec![ + ActionFactory::protocol(None, None, None::>, None::>).into(), + ActionFactory::metadata(TestSchemas::simple(), None::>, None).into(), + ] } #[test] @@ -676,7 +678,8 @@ mod tests { predicate: None, target_size: 0, }; - let add = tu::create_add_action("p", false, None); + let add = + ActionFactory::add(TestSchemas::simple(), HashMap::new(), HashMap::new(), true).into(); let res = can_downgrade_to_snapshot_isolation(&[add], &operation, &isolation); assert!(!res) } @@ -697,7 +700,7 @@ mod tests { ) -> Result<(), CommitConflictError> { use crate::table::state::DeltaTableState; - let setup_actions = setup.unwrap_or_else(|| init_table_actions(None)); + let setup_actions = setup.unwrap_or_else(|| init_table_actions()); let state = DeltaTableState::from_actions(setup_actions).unwrap(); let snapshot = state.snapshot(); let transaction_info = TransactionInfo::new(snapshot, reads, &actions, read_whole_table); @@ -715,22 +718,23 @@ mod tests { async fn test_allowed_concurrent_actions() { // append - append // append file to table while a concurrent writer also appends a file - let file1 = tu::create_add_action("file1", true, get_stats(1, 10)); - let file2 = tu::create_add_action("file2", true, get_stats(1, 10)); + let file1 = simple_add(true, "1", "10").into(); + let file2 = simple_add(true, "1", "10").into(); + let result = execute_test(None, None, vec![file1], vec![file2], false); assert!(result.is_ok()); // disjoint delete - read // the concurrent transaction deletes a file that the current transaction did NOT read - let file_not_read = tu::create_add_action("file_not_read", true, get_stats(1, 10)); - let file_read = tu::create_add_action("file_read", true, get_stats(100, 10000)); - let mut setup_actions = init_table_actions(None); - setup_actions.push(file_not_read); + let file_not_read = simple_add(true, "1", "10"); + let file_read = simple_add(true, "100", "10000").into(); + let mut setup_actions = init_table_actions(); + setup_actions.push(file_not_read.clone().into()); setup_actions.push(file_read); let result = execute_test( Some(setup_actions), Some(col("value").gt(lit::(10))), - vec![tu::create_remove_action("file_not_read", true)], + vec![ActionFactory::remove(&file_not_read, true).into()], vec![], false, ); @@ -738,9 +742,9 @@ mod tests { // disjoint add - read // concurrently add file, that the current transaction would not have read - let file_added = tu::create_add_action("file_added", true, get_stats(1, 10)); - let file_read = tu::create_add_action("file_read", true, get_stats(100, 10000)); - let mut setup_actions = init_table_actions(None); + let file_added = simple_add(true, "1", "10").into(); + let file_read = simple_add(true, "100", "10000").into(); + let mut setup_actions = init_table_actions(); setup_actions.push(file_read); let result = execute_test( Some(setup_actions), @@ -774,7 +778,8 @@ mod tests { async fn test_disallowed_concurrent_actions() { // delete - delete // remove file from table that has previously been removed - let removed_file = tu::create_remove_action("removed_file", true); + let removed_file = simple_add(true, "1", "10"); + let removed_file: Action = ActionFactory::remove(&removed_file, true).into(); let result = execute_test( None, None, @@ -789,9 +794,8 @@ mod tests { // add / read + write // a file is concurrently added that should have been read by the current transaction - let file_added = tu::create_add_action("file_added", true, get_stats(1, 10)); - let file_should_have_read = - tu::create_add_action("file_should_have_read", true, get_stats(1, 10)); + let file_added = simple_add(true, "1", "10").into(); + let file_should_have_read = simple_add(true, "1", "10").into(); let result = execute_test( None, Some(col("value").lt_eq(lit::(10))), @@ -803,13 +807,13 @@ mod tests { // delete / read // transaction reads a file that is removed by concurrent transaction - let file_read = tu::create_add_action("file_read", true, get_stats(1, 10)); - let mut setup_actions = init_table_actions(None); - setup_actions.push(file_read); + let file_read = simple_add(true, "1", "10"); + let mut setup_actions = init_table_actions(); + setup_actions.push(file_read.clone().into()); let result = execute_test( Some(setup_actions), Some(col("value").lt_eq(lit::(10))), - vec![tu::create_remove_action("file_read", true)], + vec![ActionFactory::remove(&file_read, true).into()], vec![], false, ); @@ -823,7 +827,7 @@ mod tests { let result = execute_test( None, None, - vec![tu::create_metadata_action(None, None)], + vec![ActionFactory::metadata(TestSchemas::simple(), None::>, None).into()], vec![], false, ); @@ -834,8 +838,8 @@ mod tests { let result = execute_test( None, None, - vec![tu::create_protocol_action(None, None)], - vec![tu::create_protocol_action(None, None)], + vec![ActionFactory::protocol(None, None, None::>, None::>).into()], + vec![ActionFactory::protocol(None, None, None::>, None::>).into()], false, ); assert!(matches!( @@ -846,10 +850,10 @@ mod tests { // taint whole table // `read_whole_table` should disallow any concurrent change, even if the change // is disjoint with the earlier filter - let file_part1 = tu::create_add_action("file_part1", true, get_stats(1, 10)); - let file_part2 = tu::create_add_action("file_part2", true, get_stats(11, 100)); - let file_part3 = tu::create_add_action("file_part3", true, get_stats(101, 1000)); - let mut setup_actions = init_table_actions(None); + let file_part1 = simple_add(true, "1", "10").into(); + let file_part2 = simple_add(true, "11", "100").into(); + let file_part3 = simple_add(true, "101", "1000").into(); + let mut setup_actions = init_table_actions(); setup_actions.push(file_part1); let result = execute_test( Some(setup_actions), @@ -863,14 +867,14 @@ mod tests { // taint whole table + concurrent remove // `read_whole_table` should disallow any concurrent remove actions - let file_part1 = tu::create_add_action("file_part1", true, get_stats(1, 10)); - let file_part2 = tu::create_add_action("file_part2", true, get_stats(11, 100)); - let mut setup_actions = init_table_actions(None); - setup_actions.push(file_part1); + let file_part1 = simple_add(true, "1", "10"); + let file_part2 = simple_add(true, "11", "100").into(); + let mut setup_actions = init_table_actions(); + setup_actions.push(file_part1.clone().into()); let result = execute_test( Some(setup_actions), None, - vec![tu::create_remove_action("file_part1", true)], + vec![ActionFactory::remove(&file_part1, true).into()], vec![file_part2], true, ); diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index babff18439..28af281fa1 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -103,8 +103,6 @@ mod conflict_checker; mod protocol; #[cfg(feature = "datafusion")] mod state; -#[cfg(test)] -pub(crate) mod test_utils; const DELTA_LOG_FOLDER: &str = "_delta_log"; pub(crate) const DEFAULT_RETRIES: usize = 15; diff --git a/crates/core/src/operations/transaction/protocol.rs b/crates/core/src/operations/transaction/protocol.rs index f3bb87098a..6e16b95809 100644 --- a/crates/core/src/operations/transaction/protocol.rs +++ b/crates/core/src/operations/transaction/protocol.rs @@ -264,14 +264,19 @@ pub static INSTANCE: Lazy = Lazy::new(|| { #[cfg(test)] mod tests { - use super::super::test_utils::create_metadata_action; + use std::collections::HashMap; + use super::*; use crate::kernel::DataType as DeltaDataType; - use crate::kernel::{Action, Add, PrimitiveType, Protocol, Remove}; + use crate::kernel::{Action, Add, Metadata, PrimitiveType, Protocol, Remove}; use crate::protocol::SaveMode; use crate::table::state::DeltaTableState; + use crate::test_utils::{ActionFactory, TestSchemas}; use crate::DeltaConfigKey; - use std::collections::HashMap; + + fn metadata_action(configuration: Option>>) -> Metadata { + ActionFactory::metadata(TestSchemas::simple(), None::>, configuration) + } #[test] fn test_can_commit_append_only() { @@ -322,13 +327,11 @@ mod tests { writer_features: Some(feat.into_iter().collect()), ..Default::default() }), - create_metadata_action( - None, - Some(HashMap::from([( - DeltaConfigKey::AppendOnly.as_ref().to_string(), - Some(append.to_string()), - )])), - ), + metadata_action(Some(HashMap::from([( + DeltaConfigKey::AppendOnly.as_ref().to_string(), + Some(append.to_string()), + )]))) + .into(), ] }; @@ -422,7 +425,7 @@ mod tests { min_writer_version: 1, ..Default::default() }), - create_metadata_action(None, Some(HashMap::new())), + metadata_action(None).into(), ]; let snapshot_1 = DeltaTableState::from_actions(actions).unwrap(); let eager_1 = snapshot_1.snapshot(); @@ -436,7 +439,7 @@ mod tests { min_writer_version: 1, ..Default::default() }), - create_metadata_action(None, Some(HashMap::new())), + metadata_action(None).into(), ]; let snapshot_2 = DeltaTableState::from_actions(actions).unwrap(); let eager_2 = snapshot_2.snapshot(); @@ -453,7 +456,7 @@ mod tests { min_writer_version: 2, ..Default::default() }), - create_metadata_action(None, Some(HashMap::new())), + metadata_action(None).into(), ]; let snapshot_3 = DeltaTableState::from_actions(actions).unwrap(); let eager_3 = snapshot_3.snapshot(); @@ -473,7 +476,7 @@ mod tests { min_writer_version: 3, ..Default::default() }), - create_metadata_action(None, Some(HashMap::new())), + metadata_action(None).into(), ]; let snapshot_4 = DeltaTableState::from_actions(actions).unwrap(); let eager_4 = snapshot_4.snapshot(); @@ -496,7 +499,7 @@ mod tests { min_writer_version: 4, ..Default::default() }), - create_metadata_action(None, Some(HashMap::new())), + metadata_action(None).into(), ]; let snapshot_5 = DeltaTableState::from_actions(actions).unwrap(); let eager_5 = snapshot_5.snapshot(); @@ -522,7 +525,7 @@ mod tests { min_writer_version: 5, ..Default::default() }), - create_metadata_action(None, Some(HashMap::new())), + metadata_action(None).into(), ]; let snapshot_6 = DeltaTableState::from_actions(actions).unwrap(); let eager_6 = snapshot_6.snapshot(); @@ -551,7 +554,7 @@ mod tests { min_writer_version: 6, ..Default::default() }), - create_metadata_action(None, Some(HashMap::new())), + metadata_action(None).into(), ]; let snapshot_7 = DeltaTableState::from_actions(actions).unwrap(); let eager_7 = snapshot_7.snapshot(); @@ -585,7 +588,7 @@ mod tests { Protocol::new(2, 4) .with_writer_features(vec![crate::kernel::WriterFeatures::ChangeDataFeed]), ), - create_metadata_action(None, Some(HashMap::new())), + metadata_action(None).into(), ]; let snapshot_5 = DeltaTableState::from_actions(actions).unwrap(); let eager_5 = snapshot_5.snapshot(); @@ -603,7 +606,7 @@ mod tests { Protocol::new(2, 4) .with_writer_features(vec![crate::kernel::WriterFeatures::GeneratedColumns]), ), - create_metadata_action(None, Some(HashMap::new())), + metadata_action(None).into(), ]; let snapshot_5 = DeltaTableState::from_actions(actions).unwrap(); let eager_5 = snapshot_5.snapshot(); diff --git a/crates/core/src/operations/transaction/state.rs b/crates/core/src/operations/transaction/state.rs index d705a616b1..4648883ef8 100644 --- a/crates/core/src/operations/transaction/state.rs +++ b/crates/core/src/operations/transaction/state.rs @@ -327,15 +327,26 @@ impl PruningStatistics for DeltaTableState { #[cfg(test)] mod tests { - use super::*; - use crate::delta_datafusion::DataFusionFileMixins; - use crate::operations::transaction::test_utils::{create_add_action, init_table_actions}; + use std::collections::HashMap; + use datafusion::prelude::SessionContext; use datafusion_expr::{col, lit}; + use super::*; + use crate::delta_datafusion::DataFusionFileMixins; + use crate::kernel::Action; + use crate::test_utils::{ActionFactory, TestSchemas}; + + fn init_table_actions() -> Vec { + vec![ + ActionFactory::protocol(None, None, None::>, None::>).into(), + ActionFactory::metadata(TestSchemas::simple(), None::>, None).into(), + ] + } + #[test] fn test_parse_predicate_expression() { - let snapshot = DeltaTableState::from_actions(init_table_actions(None)).unwrap(); + let snapshot = DeltaTableState::from_actions(init_table_actions()).unwrap(); let session = SessionContext::new(); let state = session.state(); @@ -362,10 +373,26 @@ mod tests { #[test] fn test_files_matching_predicate() { - let mut actions = init_table_actions(None); - actions.push(create_add_action("excluded", true, Some("{\"numRecords\":10,\"minValues\":{\"value\":1},\"maxValues\":{\"value\":10},\"nullCount\":{\"value\":0}}".into()))); - actions.push(create_add_action("included-1", true, Some("{\"numRecords\":10,\"minValues\":{\"value\":1},\"maxValues\":{\"value\":100},\"nullCount\":{\"value\":0}}".into()))); - actions.push(create_add_action("included-2", true, Some("{\"numRecords\":10,\"minValues\":{\"value\":-10},\"maxValues\":{\"value\":3},\"nullCount\":{\"value\":0}}".into()))); + let mut actions = init_table_actions(); + + actions.push(Action::Add(ActionFactory::add( + TestSchemas::simple(), + HashMap::from_iter([("value", ("1", "10"))]), + Default::default(), + true, + ))); + actions.push(Action::Add(ActionFactory::add( + TestSchemas::simple(), + HashMap::from_iter([("value", ("1", "100"))]), + Default::default(), + true, + ))); + actions.push(Action::Add(ActionFactory::add( + TestSchemas::simple(), + HashMap::from_iter([("value", ("-10", "3"))]), + Default::default(), + true, + ))); let state = DeltaTableState::from_actions(actions).unwrap(); let files = state @@ -385,6 +412,5 @@ mod tests { .unwrap() .collect::>(); assert_eq!(files.len(), 2); - assert!(files.iter().all(|add| add.path.contains("included"))); } } diff --git a/crates/core/src/operations/transaction/test_utils.rs b/crates/core/src/operations/transaction/test_utils.rs deleted file mode 100644 index ada5ded056..0000000000 --- a/crates/core/src/operations/transaction/test_utils.rs +++ /dev/null @@ -1,171 +0,0 @@ -#![allow(unused)] -use std::collections::HashMap; - -use super::CommitBuilder; -use crate::kernel::{ - Action, Add, CommitInfo, DataType, Metadata, PrimitiveType, Protocol, Remove, StructField, - StructType, -}; -use crate::operations::transaction::PROTOCOL; -use crate::protocol::{DeltaOperation, SaveMode}; -use crate::table::state::DeltaTableState; -use crate::{DeltaTable, DeltaTableBuilder}; - -pub fn create_add_action( - path: impl Into, - data_change: bool, - stats: Option, -) -> Action { - Action::Add(Add { - path: path.into(), - size: 100, - data_change, - stats, - modification_time: -1, - partition_values: Default::default(), - stats_parsed: None, - base_row_id: None, - default_row_commit_version: None, - tags: None, - deletion_vector: None, - clustering_provider: None, - }) -} - -pub fn create_remove_action(path: impl Into, data_change: bool) -> Action { - Action::Remove(Remove { - path: path.into(), - data_change, - size: None, - deletion_timestamp: None, - deletion_vector: None, - partition_values: Default::default(), - extended_file_metadata: None, - base_row_id: None, - default_row_commit_version: None, - tags: None, - }) -} - -pub fn create_protocol_action(max_reader: Option, max_writer: Option) -> Action { - let protocol = Protocol { - min_reader_version: max_reader.unwrap_or(PROTOCOL.default_reader_version()), - min_writer_version: max_writer.unwrap_or(PROTOCOL.default_writer_version()), - writer_features: None, - reader_features: None, - }; - Action::Protocol(protocol) -} - -pub fn create_metadata_action( - parttiton_columns: Option>, - configuration: Option>>, -) -> Action { - let table_schema = StructType::new(vec![ - StructField::new( - "id".to_string(), - DataType::Primitive(PrimitiveType::String), - true, - ), - StructField::new( - "value".to_string(), - DataType::Primitive(PrimitiveType::Integer), - true, - ), - StructField::new( - "modified".to_string(), - DataType::Primitive(PrimitiveType::String), - true, - ), - ]); - Action::Metadata( - Metadata::try_new( - table_schema, - parttiton_columns.unwrap_or_default(), - configuration.unwrap_or_default(), - ) - .unwrap(), - ) -} - -pub fn init_table_actions(configuration: Option>>) -> Vec { - let raw = r#" - { - "timestamp": 1670892998177, - "operation": "WRITE", - "operationParameters": { - "mode": "Append", - "partitionBy": "[\"c1\",\"c2\"]" - }, - "isolationLevel": "Serializable", - "isBlindAppend": true, - "operationMetrics": { - "numFiles": "3", - "numOutputRows": "3", - "numOutputBytes": "1356" - }, - "engineInfo": "Apache-Spark/3.3.1 Delta-Lake/2.2.0", - "txnId": "046a258f-45e3-4657-b0bf-abfb0f76681c" - }"#; - - let commit_info = serde_json::from_str::(raw).unwrap(); - vec![ - Action::CommitInfo(commit_info), - create_protocol_action(None, None), - create_metadata_action(None, configuration), - ] -} - -pub async fn create_initialized_table( - partition_cols: &[String], - configuration: Option>>, -) -> DeltaTable { - let log_store = DeltaTableBuilder::from_uri("memory://") - .build_storage() - .unwrap(); - let table_schema = StructType::new(vec![ - StructField::new( - "id".to_string(), - DataType::Primitive(PrimitiveType::String), - true, - ), - StructField::new( - "value".to_string(), - DataType::Primitive(PrimitiveType::Integer), - true, - ), - StructField::new( - "modified".to_string(), - DataType::Primitive(PrimitiveType::String), - true, - ), - ]); - let state = DeltaTableState::from_actions(init_table_actions(None)).unwrap(); - let operation = DeltaOperation::Create { - mode: SaveMode::ErrorIfExists, - location: "location".into(), - protocol: Protocol { - min_reader_version: 1, - min_writer_version: 1, - writer_features: None, - reader_features: None, - }, - metadata: Metadata { - id: uuid::Uuid::new_v4().to_string(), - name: None, - description: None, - format: Default::default(), - schema_string: serde_json::to_string(&table_schema).unwrap(), - partition_columns: partition_cols.to_vec(), - configuration: configuration.unwrap_or_default(), - created_time: Some(chrono::Utc::now().timestamp_millis()), - }, - }; - let actions = init_table_actions(None); - CommitBuilder::default() - .with_actions(actions) - .build(None, log_store.clone(), operation) - .await - .unwrap(); - DeltaTable::new_with_state(log_store, state) -} diff --git a/crates/core/src/test_utils/factories/actions.rs b/crates/core/src/test_utils/factories/actions.rs new file mode 100644 index 0000000000..94722bd877 --- /dev/null +++ b/crates/core/src/test_utils/factories/actions.rs @@ -0,0 +1,112 @@ +use std::collections::HashMap; + +use chrono::Utc; +use object_store::path::Path; +use object_store::ObjectMeta; + +use super::{get_parquet_bytes, DataFactory, FileStats}; +use crate::kernel::{Add, Metadata, Protocol, ReaderFeatures, Remove, StructType, WriterFeatures}; +use crate::operations::transaction::PROTOCOL; + +pub struct ActionFactory; + +impl ActionFactory { + pub fn add_raw( + meta: ObjectMeta, + stats: FileStats, + partition_values: HashMap>, + data_change: bool, + ) -> Add { + Add { + path: meta.location.to_string(), + size: meta.size as i64, + partition_values, + data_change, + modification_time: meta.last_modified.timestamp_millis(), + stats: serde_json::to_string(&stats).ok(), + tags: Some(HashMap::new()), + default_row_commit_version: None, + deletion_vector: None, + base_row_id: None, + clustering_provider: None, + stats_parsed: None, + } + } + + pub fn add( + schema: &StructType, + bounds: HashMap<&str, (&str, &str)>, + partition_values: HashMap>, + data_change: bool, + ) -> Add { + let batch = DataFactory::record_batch(schema, 10, bounds).unwrap(); + let stats = DataFactory::file_stats(&batch).unwrap(); + let path = Path::from(generate_file_name()); + let data = get_parquet_bytes(&batch).unwrap(); + let meta = ObjectMeta { + location: path.clone(), + size: data.len(), + last_modified: Utc::now(), + e_tag: None, + version: None, + }; + ActionFactory::add_raw(meta, stats, partition_values, data_change) + } + + pub fn remove(add: &Add, data_change: bool) -> Remove { + add_as_remove(add, data_change) + } + + pub fn protocol( + max_reader: Option, + max_writer: Option, + reader_features: Option>, + writer_features: Option>, + ) -> Protocol { + Protocol { + min_reader_version: max_reader.unwrap_or(PROTOCOL.default_reader_version()), + min_writer_version: max_writer.unwrap_or(PROTOCOL.default_writer_version()), + writer_features: writer_features.map(|i| i.into_iter().collect()), + reader_features: reader_features.map(|i| i.into_iter().collect()), + } + } + + pub fn metadata( + schema: &StructType, + partition_columns: Option>, + configuration: Option>>, + ) -> Metadata { + Metadata { + id: uuid::Uuid::new_v4().hyphenated().to_string(), + format: Default::default(), + schema_string: serde_json::to_string(schema).unwrap(), + partition_columns: partition_columns + .map(|i| i.into_iter().map(|c| c.to_string()).collect()) + .unwrap_or_default(), + configuration: configuration.unwrap_or_default(), + name: None, + description: None, + created_time: Some(Utc::now().timestamp_millis()), + } + } +} + +pub fn add_as_remove(add: &Add, data_change: bool) -> Remove { + Remove { + path: add.path.clone(), + data_change, + deletion_timestamp: Some(Utc::now().timestamp_millis()), + size: Some(add.size), + extended_file_metadata: Some(true), + partition_values: Some(add.partition_values.clone()), + tags: add.tags.clone(), + deletion_vector: add.deletion_vector.clone(), + base_row_id: add.base_row_id, + default_row_commit_version: add.default_row_commit_version, + } +} + +fn generate_file_name() -> String { + let file_name = uuid::Uuid::new_v4().hyphenated().to_string(); + format!("part-0001-{}.parquet", file_name) +} diff --git a/crates/core/src/test_utils/factories/data.rs b/crates/core/src/test_utils/factories/data.rs new file mode 100644 index 0000000000..efa31f52ef --- /dev/null +++ b/crates/core/src/test_utils/factories/data.rs @@ -0,0 +1,241 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_arith::aggregate::{max as arrow_max, min as arrow_min}; +use arrow_array::*; +use arrow_schema::DataType as ArrowDataType; +use bytes::Bytes; +use delta_kernel::expressions::Scalar; +use parquet::arrow::arrow_writer::ArrowWriter; +use parquet::file::properties::WriterProperties; +use rand::distributions::{Alphanumeric, DistString, Distribution, Uniform}; + +use super::super::TestResult; +use super::FileStats; +use crate::kernel::scalars::ScalarExt; +use crate::kernel::{DataType, PrimitiveType, StructType}; + +pub struct DataFactory; + +impl DataFactory { + pub fn record_batch( + schema: &StructType, + length: usize, + bounds: HashMap<&str, (&str, &str)>, + ) -> TestResult { + generate_random_batch(schema, length, bounds) + } + + pub fn file_stats(batch: &RecordBatch) -> TestResult { + get_stats(batch) + } + + pub fn array( + data_type: DataType, + length: usize, + min_val: Option, + max_val: Option, + ) -> TestResult { + generate_random_array(data_type, length, min_val, max_val) + } +} + +fn generate_random_batch( + schema: &StructType, + length: usize, + bounds: HashMap<&str, (&str, &str)>, +) -> TestResult { + schema + .fields() + .map(|field| { + let (min_val, max_val) = + if let Some((min_val, max_val)) = bounds.get(field.name().as_str()) { + (*min_val, *max_val) + } else { + // NOTE providing illegal strings will resolve to default bounds, + // an empty string will resolve to null. + ("$%&", "$%&") + }; + generate_random_array( + field.data_type().clone(), + length, + Some(min_val.to_string()), + Some(max_val.to_string()), + ) + }) + .collect::>>() + .map(|columns| RecordBatch::try_new(Arc::new(schema.try_into().unwrap()), columns).unwrap()) +} + +pub fn generate_random_array( + data_type: DataType, + length: usize, + min_val: Option, + max_val: Option, +) -> TestResult { + use DataType::*; + use PrimitiveType::*; + let mut rng = rand::thread_rng(); + + match data_type { + Primitive(Integer) => { + let min_val = min_val + .and_then(|min| Integer.parse_scalar(&min).ok()) + .unwrap_or(Scalar::Integer(-10)); + let max_val = max_val + .and_then(|max| Integer.parse_scalar(&max).ok()) + .unwrap_or(Scalar::Integer(10)); + let between = match (min_val, max_val) { + (Scalar::Integer(min), Scalar::Integer(max)) => Uniform::from(min..=max), + _ => unreachable!(), + }; + let arr = Int32Array::from( + (0..length) + .map(|_| between.sample(&mut rng)) + .collect::>(), + ); + Ok(Arc::new(arr)) + } + Primitive(Long) => { + let min_val = min_val + .and_then(|min| Long.parse_scalar(&min).ok()) + .unwrap_or(Scalar::Long(-10)); + let max_val = max_val + .and_then(|max| Long.parse_scalar(&max).ok()) + .unwrap_or(Scalar::Long(10)); + let between = match (min_val, max_val) { + (Scalar::Long(min), Scalar::Long(max)) => Uniform::from(min..=max), + _ => unreachable!(), + }; + let arr = Int64Array::from( + (0..length) + .map(|_| between.sample(&mut rng)) + .collect::>(), + ); + Ok(Arc::new(arr)) + } + Primitive(Float) => { + let min_val = min_val + .and_then(|min| Float.parse_scalar(&min).ok()) + .unwrap_or(Scalar::Float(-10.1)); + let max_val = max_val + .and_then(|max| Float.parse_scalar(&max).ok()) + .unwrap_or(Scalar::Float(10.1)); + let between = match (min_val, max_val) { + (Scalar::Float(min), Scalar::Float(max)) => Uniform::from(min..=max), + _ => unreachable!(), + }; + let arr = Float32Array::from( + (0..length) + .map(|_| between.sample(&mut rng)) + .collect::>(), + ); + Ok(Arc::new(arr)) + } + Primitive(Double) => { + let min_val = min_val + .and_then(|min| Double.parse_scalar(&min).ok()) + .unwrap_or(Scalar::Double(-10.1)); + let max_val = max_val + .and_then(|max| Double.parse_scalar(&max).ok()) + .unwrap_or(Scalar::Double(10.1)); + let between = match (min_val, max_val) { + (Scalar::Double(min), Scalar::Double(max)) => Uniform::from(min..=max), + _ => unreachable!(), + }; + let arr = Float64Array::from( + (0..length) + .map(|_| between.sample(&mut rng)) + .collect::>(), + ); + Ok(Arc::new(arr)) + } + Primitive(String) => { + let arr = StringArray::from( + (0..length) + .map(|_| Alphanumeric.sample_string(&mut rng, 16)) + .collect::>(), + ); + Ok(Arc::new(arr)) + } + _ => todo!(), + } +} + +fn get_stats(batch: &RecordBatch) -> TestResult { + use ArrowDataType::*; + + let mut file_stats = FileStats::new(batch.num_rows() as i64); + for (i, field) in batch.schema().fields().iter().enumerate() { + let array = batch.column(i); + let stats = match array.data_type() { + Int8 => { + let array = array.as_any().downcast_ref::().unwrap(); + let min = Scalar::Byte(arrow_min(array).unwrap()); + let max = Scalar::Byte(arrow_max(array).unwrap()); + let null_count = Scalar::Long(array.null_count() as i64); + Some((null_count, min, max)) + } + Int16 => { + let array = array.as_any().downcast_ref::().unwrap(); + let min = Scalar::Short(arrow_min(array).unwrap()); + let max = Scalar::Short(arrow_max(array).unwrap()); + let null_count = Scalar::Long(array.null_count() as i64); + Some((null_count, min, max)) + } + Int32 => { + let array = array.as_any().downcast_ref::().unwrap(); + let min = Scalar::Integer(arrow_min(array).unwrap()); + let max = Scalar::Integer(arrow_max(array).unwrap()); + let null_count = Scalar::Long(array.null_count() as i64); + Some((null_count, min, max)) + } + Int64 => { + let array = array.as_any().downcast_ref::().unwrap(); + let min = Scalar::Long(arrow_min(array).unwrap()); + let max = Scalar::Long(arrow_max(array).unwrap()); + let null_count = Scalar::Long(array.null_count() as i64); + Some((null_count, min, max)) + } + Float32 => { + let array = array.as_any().downcast_ref::().unwrap(); + let min = Scalar::Float(arrow_min(array).unwrap()); + let max = Scalar::Float(arrow_max(array).unwrap()); + let null_count = Scalar::Long(array.null_count() as i64); + Some((null_count, min, max)) + } + Float64 => { + let array = array.as_any().downcast_ref::().unwrap(); + let min = Scalar::Double(arrow_min(array).unwrap()); + let max = Scalar::Double(arrow_max(array).unwrap()); + let null_count = Scalar::Long(array.null_count() as i64); + Some((null_count, min, max)) + } + Utf8 => None, + Struct(_) => None, + _ => todo!(), + }; + if let Some((null_count, min, max)) = stats { + file_stats + .null_count + .insert(field.name().to_string(), null_count.to_json()); + file_stats + .min_values + .insert(field.name().to_string(), min.to_json()); + file_stats + .max_values + .insert(field.name().to_string(), max.to_json()); + } + } + Ok(file_stats) +} + +pub(crate) fn get_parquet_bytes(batch: &RecordBatch) -> TestResult { + let mut data: Vec = Vec::new(); + let props = WriterProperties::builder().build(); + let mut writer = ArrowWriter::try_new(&mut data, batch.schema(), Some(props))?; + writer.write(batch)?; + // writer must be closed to write footer + writer.close()?; + Ok(data.into()) +} diff --git a/crates/core/src/test_utils/factories/mod.rs b/crates/core/src/test_utils/factories/mod.rs new file mode 100644 index 0000000000..551749a89d --- /dev/null +++ b/crates/core/src/test_utils/factories/mod.rs @@ -0,0 +1,66 @@ +use std::collections::HashMap; + +use lazy_static::lazy_static; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::kernel::{DataType, PrimitiveType, StructField, StructType}; + +mod actions; +mod data; + +pub use actions::*; +pub use data::*; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct FileStats { + pub num_records: i64, + pub null_count: HashMap, + pub min_values: HashMap, + pub max_values: HashMap, +} + +impl FileStats { + pub fn new(num_records: i64) -> Self { + Self { + num_records, + null_count: HashMap::new(), + min_values: HashMap::new(), + max_values: HashMap::new(), + } + } +} + +pub struct TestSchemas; + +impl TestSchemas { + /// A simple flat schema with string and integer columns. + /// + /// ### Columns + /// - id: string + /// - value: integer + /// - modified: string + pub fn simple() -> &'static StructType { + lazy_static! { + static ref _simple: StructType = StructType::new(vec![ + StructField::new( + "id".to_string(), + DataType::Primitive(PrimitiveType::String), + true + ), + StructField::new( + "value".to_string(), + DataType::Primitive(PrimitiveType::Integer), + true + ), + StructField::new( + "modified".to_string(), + DataType::Primitive(PrimitiveType::String), + true + ), + ]); + } + &_simple + } +} diff --git a/crates/core/src/test_utils/mod.rs b/crates/core/src/test_utils/mod.rs new file mode 100644 index 0000000000..0d3ff9ed65 --- /dev/null +++ b/crates/core/src/test_utils/mod.rs @@ -0,0 +1,5 @@ +mod factories; + +pub use factories::*; + +pub type TestResult = Result>; diff --git a/crates/test/src/lib.rs b/crates/test/src/lib.rs index c53d34b1d3..56781aaa3c 100644 --- a/crates/test/src/lib.rs +++ b/crates/test/src/lib.rs @@ -1,4 +1,7 @@ #![allow(dead_code, unused_variables)] +use std::any::Any; +use std::collections::HashMap; +use std::sync::Arc; use bytes::Bytes; use deltalake_core::kernel::{Action, Add, Remove, StructType}; @@ -9,9 +12,6 @@ use deltalake_core::protocol::{DeltaOperation, SaveMode}; use deltalake_core::DeltaTable; use deltalake_core::DeltaTableBuilder; use deltalake_core::{ObjectStore, Path}; -use std::any::Any; -use std::collections::HashMap; -use std::sync::Arc; use tempfile::TempDir; pub mod clock;