From d59867e68c7c9af0e4653d27f55eb6ffa2912b59 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Tue, 21 Jan 2025 17:28:00 +0100 Subject: [PATCH] test: more snapshot tests Signed-off-by: Robert Pack --- Cargo.toml | 10 +-- crates/core/src/kernel/snapshot_next/eager.rs | 37 +++++++++-- .../src/kernel/snapshot_next/iterators.rs | 66 ++++++++++++++++++- crates/core/src/kernel/snapshot_next/lazy.rs | 2 +- crates/core/src/kernel/snapshot_next/mod.rs | 55 ++++++++++++---- crates/core/src/operations/transaction/mod.rs | 30 +++++++-- 6 files changed, 171 insertions(+), 29 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0bbe1e07ab..00c970fe8e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,14 +27,14 @@ debug = "line-tables-only" [workspace.dependencies] # delta_kernel = { version = "=0.6.0", features = ["default-engine"] } -delta_kernel = { path = "../delta-kernel-rs/kernel", features = [ - "default-engine", - "developer-visibility", -] } -# delta_kernel = { git = "https://github.com/roeap/delta-kernel-rs", rev = "2e09bfcc0447283a3acc320ad2350f4075dba83e", features = [ +# delta_kernel = { path = "../delta-kernel-rs/kernel", features = [ # "default-engine", # "developer-visibility", # ] } +delta_kernel = { git = "https://github.com/roeap/delta-kernel-rs", rev = "50c1c023b7e9d60df69f6e592b91e4cc06a5a0b1", features = [ + "default-engine", + "developer-visibility", +] } # arrow arrow = { version = "53" } diff --git a/crates/core/src/kernel/snapshot_next/eager.rs b/crates/core/src/kernel/snapshot_next/eager.rs index 83e0a00863..2e8ff24059 100644 --- a/crates/core/src/kernel/snapshot_next/eager.rs +++ b/crates/core/src/kernel/snapshot_next/eager.rs @@ -1,14 +1,16 @@ use std::sync::Arc; -use arrow_array::RecordBatch; +use arrow_array::{BooleanArray, RecordBatch}; +use arrow_select::filter::filter_record_batch; use delta_kernel::actions::set_transaction::SetTransactionMap; use delta_kernel::actions::{get_log_add_schema, get_log_schema, ADD_NAME, REMOVE_NAME}; use delta_kernel::actions::{Add, Metadata, Protocol, SetTransaction}; use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::log_segment::LogSegment; +use delta_kernel::scan::log_replay::scan_action_iter; use delta_kernel::schema::Schema; use delta_kernel::table_properties::TableProperties; -use delta_kernel::{ExpressionRef, Table, Version}; +use delta_kernel::{EngineData, ExpressionRef, Table, Version}; use itertools::Itertools; use object_store::ObjectStore; use url::Url; @@ -55,9 +57,36 @@ impl Snapshot for EagerSnapshot { fn logical_files( &self, - _predicate: Option, + predicate: Option, ) -> DeltaResult>>> { - todo!() + let scan = self + .snapshot + .inner + .as_ref() + .clone() + .into_scan_builder() + .with_predicate(predicate) + .build()?; + + let iter = scan_action_iter( + self.snapshot.engine_ref().as_ref(), + vec![Ok(( + Box::new(ArrowEngineData::new(self.file_data()?.clone())) as Box, + false, + ))] + .into_iter(), + scan.physical_predicate() + .map(|p| (p, scan.schema().clone())), + ) + .map(|res| { + res.and_then(|(data, predicate)| { + let batch: RecordBatch = ArrowEngineData::try_from_engine_data(data)?.into(); + Ok(filter_record_batch(&batch, &BooleanArray::from(predicate))?) + }) + }) + .map(|batch| batch.map_err(|e| e.into())); + + Ok(Box::new(iter)) } fn files( diff --git a/crates/core/src/kernel/snapshot_next/iterators.rs b/crates/core/src/kernel/snapshot_next/iterators.rs index 38aa0e1c2f..c353881e2c 100644 --- a/crates/core/src/kernel/snapshot_next/iterators.rs +++ b/crates/core/src/kernel/snapshot_next/iterators.rs @@ -1,5 +1,5 @@ use std::collections::HashSet; -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use arrow_array::cast::AsArray; use arrow_array::types::Int64Type; @@ -14,6 +14,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::engine::arrow_expression::ProvidesColumnByName; use delta_kernel::engine_data::{GetData, RowVisitor}; use delta_kernel::expressions::{Scalar, StructData}; +use delta_kernel::scan::scan_row_schema; use crate::kernel::scalars::ScalarExt; use crate::{DeltaResult, DeltaTableError}; @@ -218,6 +219,69 @@ impl Iterator for LogicalFileView { } } +pub struct LogicalFileViewIterator +where + I: IntoIterator>, +{ + inner: I::IntoIter, + batch: Option, + current: usize, +} + +impl LogicalFileViewIterator +where + I: IntoIterator>, +{ + /// Create a new [LogicalFileViewIterator]. + /// + /// If `iter` is an infallible iterator, use `.map(Ok)`. + pub fn new(iter: I) -> Self { + Self { + inner: iter.into_iter(), + batch: None, + current: 0, + } + } +} + +// impl Iterator for LogicalFileViewIterator +// where +// I: IntoIterator>, +// { +// type Item = DeltaResult; +// +// fn next(&mut self) -> Option { +// if let Some(batch) = &self.batch { +// if self.current < batch.num_rows() { +// let item = LogicalFileView { +// files: batch.clone(), +// index: self.current, +// }; +// self.current += 1; +// return Some(Ok(item)); +// } +// } +// match self.inner.next() { +// Some(Ok(batch)) => { +// if validate_logical_file(&batch).is_err() { +// return Some(Err(DeltaTableError::generic( +// "Invalid logical file data encountered.", +// ))); +// } +// self.batch = Some(batch); +// self.current = 0; +// self.next() +// } +// Some(Err(e)) => Some(Err(e)), +// None => None, +// } +// } +// +// fn size_hint(&self) -> (usize, Option) { +// self.inner.size_hint() +// } +// } + pub struct AddViewIterator where I: IntoIterator>, diff --git a/crates/core/src/kernel/snapshot_next/lazy.rs b/crates/core/src/kernel/snapshot_next/lazy.rs index 572b4ead37..20ccfb7031 100644 --- a/crates/core/src/kernel/snapshot_next/lazy.rs +++ b/crates/core/src/kernel/snapshot_next/lazy.rs @@ -3,8 +3,8 @@ use std::io::{BufRead, BufReader, Cursor}; use std::sync::{Arc, LazyLock}; -use arrow::compute::filter_record_batch; use arrow_array::{BooleanArray, RecordBatch}; +use arrow_select::filter::filter_record_batch; use delta_kernel::actions::set_transaction::{SetTransactionMap, SetTransactionScanner}; use delta_kernel::actions::{get_log_schema, REMOVE_NAME}; use delta_kernel::actions::{Metadata, Protocol, SetTransaction}; diff --git a/crates/core/src/kernel/snapshot_next/mod.rs b/crates/core/src/kernel/snapshot_next/mod.rs index 817d94e477..0023f4236d 100644 --- a/crates/core/src/kernel/snapshot_next/mod.rs +++ b/crates/core/src/kernel/snapshot_next/mod.rs @@ -13,6 +13,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::engine::arrow_expression::apply_schema; use delta_kernel::expressions::{Scalar, StructData}; use delta_kernel::scan::log_replay::scan_action_iter; +use delta_kernel::scan::scan_row_schema; use delta_kernel::schema::{DataType, Schema}; use delta_kernel::table_properties::TableProperties; use delta_kernel::{EngineData, ExpressionRef, Version}; @@ -92,19 +93,34 @@ pub trait Snapshot { /// Get the [`TableProperties`] for this [`Snapshot`]. fn table_properties(&self) -> &TableProperties; - /// Get all currently active files in the table. + fn logical_file_schema(&self) -> &'static Schema { + scan_row_schema() + } + + /// Get all logical files present in the current snapshot. /// /// # Parameters /// - `predicate`: An optional predicate to filter the files based on file statistics. /// /// # Returns - /// An iterator of [`RecordBatch`]es, where each batch contains add action data. - fn files( + /// An iterator of [`RecordBatch`]es, where each batch contains logical file data. + fn logical_files( &self, predicate: Option, ) -> DeltaResult>>>; - fn logical_files( + /// Get all currently active files in the table. + /// + /// # Parameters + /// - `predicate`: An optional predicate to filter the files based on file statistics. + /// + /// # Returns + /// An iterator of [`RecordBatch`]es, where each batch contains add action data. + #[deprecated( + since = "0.25.0", + note = "Use `logical_files` instead, which returns a more focussed dataset and avoids computational overhead." + )] + fn files( &self, predicate: Option, ) -> DeltaResult>>>; @@ -113,6 +129,7 @@ pub trait Snapshot { &self, predicate: Option, ) -> DeltaResult>>> { + #[allow(deprecated)] Ok(Box::new(AddViewIterator::new(self.files(predicate)?))) } @@ -216,6 +233,7 @@ impl Snapshot for Box { &self, predicate: Option, ) -> DeltaResult>>> { + #[allow(deprecated)] self.as_ref().files(predicate) } @@ -404,6 +422,7 @@ mod tests { test_files(snapshot.as_ref())?; test_files_view(snapshot.as_ref())?; test_commit_infos(snapshot.as_ref())?; + test_logical_files(snapshot.as_ref())?; } let mut snapshot = get_snapshot(ctx, TestTables::Checkpoints, Some(0))?.await?; @@ -414,22 +433,29 @@ mod tests { test_files(snapshot.as_ref())?; test_files_view(snapshot.as_ref())?; test_commit_infos(snapshot.as_ref())?; + test_logical_files(snapshot.as_ref())?; } Ok(()) } - fn test_files(snapshot: &dyn Snapshot) -> TestResult<()> { - let batches = snapshot.files(None)?.collect::, _>>()?; - let num_files = batches.iter().map(|b| b.num_rows() as i64).sum::(); + fn test_logical_files(snapshot: &dyn Snapshot) -> TestResult<()> { + let logical_files = snapshot + .logical_files(None)? + .collect::, _>>()?; + let num_files = logical_files + .iter() + .map(|b| b.num_rows() as i64) + .sum::(); assert_eq!((num_files as u64), snapshot.version()); Ok(()) } - fn test_commit_infos(snapshot: &dyn Snapshot) -> TestResult<()> { - let commit_infos = snapshot.commit_infos(None, Some(100))?.collect::>(); - assert_eq!((commit_infos.len() as u64), snapshot.version() + 1); - assert_eq!(commit_infos.first().unwrap().0, snapshot.version()); + fn test_files(snapshot: &dyn Snapshot) -> TestResult<()> { + #[allow(deprecated)] + let batches = snapshot.files(None)?.collect::, _>>()?; + let num_files = batches.iter().map(|b| b.num_rows() as i64).sum::(); + assert_eq!((num_files as u64), snapshot.version()); Ok(()) } @@ -441,4 +467,11 @@ mod tests { assert_eq!(num_files_view, snapshot.version()); Ok(()) } + + fn test_commit_infos(snapshot: &dyn Snapshot) -> TestResult<()> { + let commit_infos = snapshot.commit_infos(None, Some(100))?.collect::>(); + assert_eq!((commit_infos.len() as u64), snapshot.version() + 1); + assert_eq!(commit_infos.first().unwrap().0, snapshot.version()); + Ok(()) + } } diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index bb173c40b7..d7d99f0dbe 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -80,6 +80,7 @@ use bytes::Bytes; use chrono::Utc; use conflict_checker::ConflictChecker; use futures::future::BoxFuture; +use itertools::Itertools; use object_store::path::Path; use object_store::Error as ObjectStoreError; use serde_json::Value; @@ -268,22 +269,37 @@ pub struct CommitData { impl CommitData { /// Create new data to be committed pub fn new( - mut actions: Vec, + actions: Vec, operation: DeltaOperation, mut app_metadata: HashMap, app_transactions: Vec, ) -> Self { - if !actions.iter().any(|a| matches!(a, Action::CommitInfo(..))) { - let mut commit_info = operation.get_commit_info(); - commit_info.timestamp = Some(Utc::now().timestamp_millis()); + // When in-commit-timestamps are enabled, we need to ensure that the commit info is the first action + // in the commit log. If it is not present, we need to add it. + // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#writer-requirements-for-in-commit-timestamps + let mut commit_info = None::; + let mut actions = actions + .into_iter() + .inspect(|a| { + if matches!(a, Action::CommitInfo(..)) { + commit_info = Some(a.clone()) + } + }) + .filter(|a| matches!(a, Action::CommitInfo(..))) + .collect_vec(); + if !commit_info.is_some() { + let mut cm = operation.get_commit_info(); + cm.timestamp = Some(Utc::now().timestamp_millis()); app_metadata.insert( "clientVersion".to_string(), Value::String(format!("delta-rs.{}", crate_version())), ); - app_metadata.extend(commit_info.info); - commit_info.info = app_metadata.clone(); - actions.push(Action::CommitInfo(commit_info)) + app_metadata.extend(cm.info); + cm.info = app_metadata.clone(); + commit_info = Some(Action::CommitInfo(cm)); } + // safety: we assured commit_info is Some just above. + actions.insert(0, commit_info.unwrap()); for txn in &app_transactions { actions.push(Action::Txn(txn.clone()))