Skip to content

Commit

Permalink
test: more snapshot tests
Browse files Browse the repository at this point in the history
Signed-off-by: Robert Pack <[email protected]>
  • Loading branch information
roeap committed Jan 21, 2025
1 parent fe62dc6 commit d59867e
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 29 deletions.
10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ debug = "line-tables-only"

[workspace.dependencies]
# delta_kernel = { version = "=0.6.0", features = ["default-engine"] }
delta_kernel = { path = "../delta-kernel-rs/kernel", features = [
"default-engine",
"developer-visibility",
] }
# delta_kernel = { git = "https://github.com/roeap/delta-kernel-rs", rev = "2e09bfcc0447283a3acc320ad2350f4075dba83e", features = [
# delta_kernel = { path = "../delta-kernel-rs/kernel", features = [
# "default-engine",
# "developer-visibility",
# ] }
delta_kernel = { git = "https://github.com/roeap/delta-kernel-rs", rev = "50c1c023b7e9d60df69f6e592b91e4cc06a5a0b1", features = [
"default-engine",
"developer-visibility",
] }

# arrow
arrow = { version = "53" }
Expand Down
37 changes: 33 additions & 4 deletions crates/core/src/kernel/snapshot_next/eager.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use std::sync::Arc;

use arrow_array::RecordBatch;
use arrow_array::{BooleanArray, RecordBatch};
use arrow_select::filter::filter_record_batch;
use delta_kernel::actions::set_transaction::SetTransactionMap;
use delta_kernel::actions::{get_log_add_schema, get_log_schema, ADD_NAME, REMOVE_NAME};
use delta_kernel::actions::{Add, Metadata, Protocol, SetTransaction};
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::log_segment::LogSegment;
use delta_kernel::scan::log_replay::scan_action_iter;
use delta_kernel::schema::Schema;
use delta_kernel::table_properties::TableProperties;
use delta_kernel::{ExpressionRef, Table, Version};
use delta_kernel::{EngineData, ExpressionRef, Table, Version};
use itertools::Itertools;
use object_store::ObjectStore;
use url::Url;
Expand Down Expand Up @@ -55,9 +57,36 @@ impl Snapshot for EagerSnapshot {

fn logical_files(
&self,
_predicate: Option<ExpressionRef>,
predicate: Option<ExpressionRef>,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<RecordBatch>>>> {
todo!()
let scan = self
.snapshot
.inner
.as_ref()
.clone()
.into_scan_builder()
.with_predicate(predicate)
.build()?;

let iter = scan_action_iter(
self.snapshot.engine_ref().as_ref(),
vec![Ok((
Box::new(ArrowEngineData::new(self.file_data()?.clone())) as Box<dyn EngineData>,
false,
))]
.into_iter(),
scan.physical_predicate()
.map(|p| (p, scan.schema().clone())),
)
.map(|res| {
res.and_then(|(data, predicate)| {
let batch: RecordBatch = ArrowEngineData::try_from_engine_data(data)?.into();
Ok(filter_record_batch(&batch, &BooleanArray::from(predicate))?)
})
})
.map(|batch| batch.map_err(|e| e.into()));

Ok(Box::new(iter))
}

fn files(
Expand Down
66 changes: 65 additions & 1 deletion crates/core/src/kernel/snapshot_next/iterators.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::{Arc, LazyLock};

use arrow_array::cast::AsArray;
use arrow_array::types::Int64Type;
Expand All @@ -14,6 +14,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::arrow_expression::ProvidesColumnByName;
use delta_kernel::engine_data::{GetData, RowVisitor};
use delta_kernel::expressions::{Scalar, StructData};
use delta_kernel::scan::scan_row_schema;

use crate::kernel::scalars::ScalarExt;
use crate::{DeltaResult, DeltaTableError};
Expand Down Expand Up @@ -218,6 +219,69 @@ impl Iterator for LogicalFileView {
}
}

pub struct LogicalFileViewIterator<I>
where
I: IntoIterator<Item = Result<RecordBatch, DeltaTableError>>,
{
inner: I::IntoIter,
batch: Option<RecordBatch>,
current: usize,
}

impl<I> LogicalFileViewIterator<I>
where
I: IntoIterator<Item = Result<RecordBatch, DeltaTableError>>,
{
/// Create a new [LogicalFileViewIterator].
///
/// If `iter` is an infallible iterator, use `.map(Ok)`.
pub fn new(iter: I) -> Self {
Self {
inner: iter.into_iter(),
batch: None,
current: 0,
}
}
}

// impl<I> Iterator for LogicalFileViewIterator<I>
// where
// I: IntoIterator<Item = DeltaResult<RecordBatch>>,
// {
// type Item = DeltaResult<LogicalFileView>;
//
// fn next(&mut self) -> Option<Self::Item> {
// if let Some(batch) = &self.batch {
// if self.current < batch.num_rows() {
// let item = LogicalFileView {
// files: batch.clone(),
// index: self.current,
// };
// self.current += 1;
// return Some(Ok(item));
// }
// }
// match self.inner.next() {
// Some(Ok(batch)) => {
// if validate_logical_file(&batch).is_err() {
// return Some(Err(DeltaTableError::generic(
// "Invalid logical file data encountered.",
// )));
// }
// self.batch = Some(batch);
// self.current = 0;
// self.next()
// }
// Some(Err(e)) => Some(Err(e)),
// None => None,
// }
// }
//
// fn size_hint(&self) -> (usize, Option<usize>) {
// self.inner.size_hint()
// }
// }

pub struct AddViewIterator<I>
where
I: IntoIterator<Item = Result<RecordBatch, DeltaTableError>>,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/kernel/snapshot_next/lazy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
use std::io::{BufRead, BufReader, Cursor};
use std::sync::{Arc, LazyLock};

use arrow::compute::filter_record_batch;
use arrow_array::{BooleanArray, RecordBatch};
use arrow_select::filter::filter_record_batch;
use delta_kernel::actions::set_transaction::{SetTransactionMap, SetTransactionScanner};
use delta_kernel::actions::{get_log_schema, REMOVE_NAME};
use delta_kernel::actions::{Metadata, Protocol, SetTransaction};
Expand Down
55 changes: 44 additions & 11 deletions crates/core/src/kernel/snapshot_next/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::arrow_expression::apply_schema;
use delta_kernel::expressions::{Scalar, StructData};
use delta_kernel::scan::log_replay::scan_action_iter;
use delta_kernel::scan::scan_row_schema;
use delta_kernel::schema::{DataType, Schema};
use delta_kernel::table_properties::TableProperties;
use delta_kernel::{EngineData, ExpressionRef, Version};
Expand Down Expand Up @@ -92,19 +93,34 @@ pub trait Snapshot {
/// Get the [`TableProperties`] for this [`Snapshot`].
fn table_properties(&self) -> &TableProperties;

/// Get all currently active files in the table.
fn logical_file_schema(&self) -> &'static Schema {
scan_row_schema()
}

/// Get all logical files present in the current snapshot.
///
/// # Parameters
/// - `predicate`: An optional predicate to filter the files based on file statistics.
///
/// # Returns
/// An iterator of [`RecordBatch`]es, where each batch contains add action data.
fn files(
/// An iterator of [`RecordBatch`]es, where each batch contains logical file data.
fn logical_files(
&self,
predicate: Option<ExpressionRef>,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<RecordBatch>>>>;

fn logical_files(
/// Get all currently active files in the table.
///
/// # Parameters
/// - `predicate`: An optional predicate to filter the files based on file statistics.
///
/// # Returns
/// An iterator of [`RecordBatch`]es, where each batch contains add action data.
#[deprecated(
since = "0.25.0",
note = "Use `logical_files` instead, which returns a more focussed dataset and avoids computational overhead."
)]
fn files(
&self,
predicate: Option<ExpressionRef>,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<RecordBatch>>>>;
Expand All @@ -113,6 +129,7 @@ pub trait Snapshot {
&self,
predicate: Option<ExpressionRef>,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<AddView>>>> {
#[allow(deprecated)]
Ok(Box::new(AddViewIterator::new(self.files(predicate)?)))
}

Expand Down Expand Up @@ -216,6 +233,7 @@ impl<T: Snapshot> Snapshot for Box<T> {
&self,
predicate: Option<ExpressionRef>,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<RecordBatch>>>> {
#[allow(deprecated)]
self.as_ref().files(predicate)
}

Expand Down Expand Up @@ -404,6 +422,7 @@ mod tests {
test_files(snapshot.as_ref())?;
test_files_view(snapshot.as_ref())?;
test_commit_infos(snapshot.as_ref())?;
test_logical_files(snapshot.as_ref())?;
}

let mut snapshot = get_snapshot(ctx, TestTables::Checkpoints, Some(0))?.await?;
Expand All @@ -414,22 +433,29 @@ mod tests {
test_files(snapshot.as_ref())?;
test_files_view(snapshot.as_ref())?;
test_commit_infos(snapshot.as_ref())?;
test_logical_files(snapshot.as_ref())?;
}

Ok(())
}

fn test_files(snapshot: &dyn Snapshot) -> TestResult<()> {
let batches = snapshot.files(None)?.collect::<Result<Vec<_>, _>>()?;
let num_files = batches.iter().map(|b| b.num_rows() as i64).sum::<i64>();
fn test_logical_files(snapshot: &dyn Snapshot) -> TestResult<()> {
let logical_files = snapshot
.logical_files(None)?
.collect::<Result<Vec<_>, _>>()?;
let num_files = logical_files
.iter()
.map(|b| b.num_rows() as i64)
.sum::<i64>();
assert_eq!((num_files as u64), snapshot.version());
Ok(())
}

fn test_commit_infos(snapshot: &dyn Snapshot) -> TestResult<()> {
let commit_infos = snapshot.commit_infos(None, Some(100))?.collect::<Vec<_>>();
assert_eq!((commit_infos.len() as u64), snapshot.version() + 1);
assert_eq!(commit_infos.first().unwrap().0, snapshot.version());
fn test_files(snapshot: &dyn Snapshot) -> TestResult<()> {
#[allow(deprecated)]
let batches = snapshot.files(None)?.collect::<Result<Vec<_>, _>>()?;
let num_files = batches.iter().map(|b| b.num_rows() as i64).sum::<i64>();
assert_eq!((num_files as u64), snapshot.version());
Ok(())
}

Expand All @@ -441,4 +467,11 @@ mod tests {
assert_eq!(num_files_view, snapshot.version());
Ok(())
}

fn test_commit_infos(snapshot: &dyn Snapshot) -> TestResult<()> {
let commit_infos = snapshot.commit_infos(None, Some(100))?.collect::<Vec<_>>();
assert_eq!((commit_infos.len() as u64), snapshot.version() + 1);
assert_eq!(commit_infos.first().unwrap().0, snapshot.version());
Ok(())
}
}
30 changes: 23 additions & 7 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ use bytes::Bytes;
use chrono::Utc;
use conflict_checker::ConflictChecker;
use futures::future::BoxFuture;
use itertools::Itertools;
use object_store::path::Path;
use object_store::Error as ObjectStoreError;
use serde_json::Value;
Expand Down Expand Up @@ -268,22 +269,37 @@ pub struct CommitData {
impl CommitData {
/// Create new data to be committed
pub fn new(
mut actions: Vec<Action>,
actions: Vec<Action>,
operation: DeltaOperation,
mut app_metadata: HashMap<String, Value>,
app_transactions: Vec<Transaction>,
) -> Self {
if !actions.iter().any(|a| matches!(a, Action::CommitInfo(..))) {
let mut commit_info = operation.get_commit_info();
commit_info.timestamp = Some(Utc::now().timestamp_millis());
// When in-commit-timestamps are enabled, we need to ensure that the commit info is the first action
// in the commit log. If it is not present, we need to add it.
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#writer-requirements-for-in-commit-timestamps
let mut commit_info = None::<Action>;
let mut actions = actions
.into_iter()
.inspect(|a| {
if matches!(a, Action::CommitInfo(..)) {
commit_info = Some(a.clone())
}
})
.filter(|a| matches!(a, Action::CommitInfo(..)))
.collect_vec();
if !commit_info.is_some() {
let mut cm = operation.get_commit_info();
cm.timestamp = Some(Utc::now().timestamp_millis());
app_metadata.insert(
"clientVersion".to_string(),
Value::String(format!("delta-rs.{}", crate_version())),
);
app_metadata.extend(commit_info.info);
commit_info.info = app_metadata.clone();
actions.push(Action::CommitInfo(commit_info))
app_metadata.extend(cm.info);
cm.info = app_metadata.clone();
commit_info = Some(Action::CommitInfo(cm));
}
// safety: we assured commit_info is Some just above.
actions.insert(0, commit_info.unwrap());

for txn in &app_transactions {
actions.push(Action::Txn(txn.clone()))
Expand Down

0 comments on commit d59867e

Please sign in to comment.