From 90649d21c3a7adb1086da1b3fb6f405e1cc1e3af Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 11 Aug 2024 17:20:29 +0200 Subject: [PATCH] refactor: use remove actions in CDF reads --- crates/core/src/delta_datafusion/cdf/mod.rs | 61 +++++- .../src/delta_datafusion/cdf/scan_utils.rs | 12 +- crates/core/src/operations/load_cdf.rs | 197 ++++++++++++++++-- python/src/lib.rs | 2 +- 4 files changed, 235 insertions(+), 37 deletions(-) diff --git a/crates/core/src/delta_datafusion/cdf/mod.rs b/crates/core/src/delta_datafusion/cdf/mod.rs index 02382aa725..09959a12ad 100644 --- a/crates/core/src/delta_datafusion/cdf/mod.rs +++ b/crates/core/src/delta_datafusion/cdf/mod.rs @@ -7,7 +7,10 @@ use std::collections::HashMap; pub(crate) use scan::*; pub(crate) use scan_utils::*; -use crate::kernel::{Add, AddCDCFile}; +use crate::{ + kernel::{Add, AddCDCFile, Remove}, + DeltaResult, +}; mod scan; mod scan_utils; @@ -59,37 +62,73 @@ impl CdcDataSpec { /// This trait defines a generic set of operations used by CDF Reader pub trait FileAction { /// Adds partition values - fn partition_values(&self) -> &HashMap>; + fn partition_values(&self) -> DeltaResult<&HashMap>>; /// Physical Path to the data fn path(&self) -> String; /// Byte size of the physical file - fn size(&self) -> usize; + fn size(&self) -> DeltaResult; } impl FileAction for Add { - fn partition_values(&self) -> &HashMap> { - &self.partition_values + fn partition_values(&self) -> DeltaResult<&HashMap>> { + Ok(&self.partition_values) } fn path(&self) -> String { self.path.clone() } - fn size(&self) -> usize { - self.size as usize + fn size(&self) -> DeltaResult { + Ok(self.size as usize) } } impl FileAction for AddCDCFile { - fn partition_values(&self) -> &HashMap> { - &self.partition_values + fn partition_values(&self) -> DeltaResult<&HashMap>> { + Ok(&self.partition_values) } fn path(&self) -> String { self.path.clone() } - fn size(&self) -> usize { - self.size as usize + fn size(&self) -> DeltaResult { + Ok(self.size as usize) + } +} + +impl FileAction for Remove { + fn partition_values(&self) -> DeltaResult<&HashMap>> { + // If extended_file_metadata is true, it should be required to have this filled in + if self.extended_file_metadata.unwrap_or_default() { + Ok(self.partition_values.as_ref().unwrap()) + } else { + match self.partition_values { + Some(ref part_map) => Ok(part_map), + _ => Err(crate::DeltaTableError::Protocol { + source: crate::protocol::ProtocolError::InvalidField( + "partition_values".to_string(), + ), + }), + } + } + } + + fn path(&self) -> String { + self.path.clone() + } + + fn size(&self) -> DeltaResult { + // If extended_file_metadata is true, it should be required to have this filled in + if self.extended_file_metadata.unwrap_or_default() { + Ok(self.size.unwrap() as usize) + } else { + match self.size { + Some(size) => Ok(size as usize), + _ => Err(crate::DeltaTableError::Protocol { + source: crate::protocol::ProtocolError::InvalidField("size".to_string()), + }), + } + } } } diff --git a/crates/core/src/delta_datafusion/cdf/scan_utils.rs b/crates/core/src/delta_datafusion/cdf/scan_utils.rs index 79d7a2359e..a05efadd44 100644 --- a/crates/core/src/delta_datafusion/cdf/scan_utils.rs +++ b/crates/core/src/delta_datafusion/cdf/scan_utils.rs @@ -18,9 +18,9 @@ pub fn map_action_to_scalar( action: &F, part: &str, schema: SchemaRef, -) -> ScalarValue { - action - .partition_values() +) -> DeltaResult { + Ok(action + .partition_values()? .get(part) .map(|val| { schema @@ -36,7 +36,7 @@ pub fn map_action_to_scalar( }) .unwrap_or(ScalarValue::Null) }) - .unwrap_or(ScalarValue::Null) + .unwrap_or(ScalarValue::Null)) } pub fn create_spec_partition_values( @@ -67,7 +67,7 @@ pub fn create_partition_values( let partition_values = table_partition_cols .iter() .map(|part| map_action_to_scalar(&action, part, schema.clone())) - .collect::>(); + .collect::>>()?; let mut new_part_values = spec_partition_values.clone(); new_part_values.extend(partition_values); @@ -75,7 +75,7 @@ pub fn create_partition_values( let part = PartitionedFile { object_meta: ObjectMeta { location: Path::parse(action.path().as_str())?, - size: action.size(), + size: action.size()?, e_tag: None, last_modified: chrono::Utc.timestamp_nanos(0), version: None, diff --git a/crates/core/src/operations/load_cdf.rs b/crates/core/src/operations/load_cdf.rs index 57542ab668..3a815b71ef 100644 --- a/crates/core/src/operations/load_cdf.rs +++ b/crates/core/src/operations/load_cdf.rs @@ -19,13 +19,13 @@ use datafusion::prelude::SessionContext; use datafusion_common::{ScalarValue, Statistics}; use tracing::log; -use crate::delta_datafusion::cdf::*; use crate::delta_datafusion::{register_store, DataFusionMixins}; use crate::errors::DeltaResult; use crate::kernel::{Action, Add, AddCDCFile, CommitInfo}; use crate::logstore::{get_actions, LogStoreRef}; use crate::table::state::DeltaTableState; use crate::DeltaTableError; +use crate::{delta_datafusion::cdf::*, kernel::Remove}; /// Builder for create a read of change data feeds for delta tables #[derive(Clone)] @@ -105,7 +105,11 @@ impl CdfLoadBuilder { /// than I have right now. I plan to extend the checks once we have a stable state of the initial implementation. async fn determine_files_to_read( &self, - ) -> DeltaResult<(Vec>, Vec>)> { + ) -> DeltaResult<( + Vec>, + Vec>, + Vec>, + )> { let start = self.starting_version; let end = self .ending_version @@ -127,8 +131,9 @@ impl CdfLoadBuilder { ); log::debug!("starting version = {}, ending version = {:?}", start, end); - let mut change_files = vec![]; - let mut add_files = vec![]; + let mut change_files: Vec> = vec![]; + let mut add_files: Vec> = vec![]; + let mut remove_files: Vec> = vec![]; for version in start..=end { let snapshot_bytes = self @@ -142,6 +147,8 @@ impl CdfLoadBuilder { let mut cdc_actions = vec![]; if self.starting_timestamp.is_some() || self.ending_timestamp.is_some() { + // TODO: fallback on other actions for timestamps because CommitInfo action is optional + // theoretically. let version_commit = version_actions .iter() .find(|a| matches!(a, Action::CommitInfo(_))); @@ -202,6 +209,14 @@ impl CdfLoadBuilder { }) .collect::>(); + let remove_actions = version_actions + .iter() + .filter_map(|r| match r { + Action::Remove(r) if r.data_change => Some(r.clone()), + _ => None, + }) + .collect::>(); + if !add_actions.is_empty() { log::debug!( "Located {} cdf actions for version: {}", @@ -210,10 +225,19 @@ impl CdfLoadBuilder { ); add_files.push(CdcDataSpec::new(version, ts, add_actions)); } + + if !remove_actions.is_empty() { + log::debug!( + "Located {} cdf actions for version: {}", + remove_actions.len(), + version + ); + remove_files.push(CdcDataSpec::new(version, ts, remove_actions)); + } } } - Ok((change_files, add_files)) + Ok((change_files, add_files, remove_files)) } #[inline] @@ -221,9 +245,13 @@ impl CdfLoadBuilder { Some(ScalarValue::Utf8(Some(String::from("insert")))) } + fn get_remove_action_type() -> Option { + Some(ScalarValue::Utf8(Some(String::from("delete")))) + } + /// Executes the scan pub async fn build(&self) -> DeltaResult { - let (cdc, add) = self.determine_files_to_read().await?; + let (cdc, add, remove) = self.determine_files_to_read().await?; register_store( self.log_store.clone(), self.ctx.state().runtime_env().clone(), @@ -248,16 +276,16 @@ impl CdfLoadBuilder { // Setup for the Read Schemas of each kind of file, CDC files include commit action type so they need a slightly // different schema than standard add file reads let cdc_file_schema = create_cdc_schema(schema_fields.clone(), true); - let add_file_schema = create_cdc_schema(schema_fields, false); + let add_remove_file_schema = create_cdc_schema(schema_fields, false); // Set up the mapping of partition columns to be projected into the final output batch // cdc for example has timestamp, version, and any table partitions mapped here. // add on the other hand has action type, timestamp, version and any additional table partitions because adds do // not include their actions let mut cdc_partition_cols = CDC_PARTITION_SCHEMA.clone(); - let mut add_partition_cols = ADD_PARTITION_SCHEMA.clone(); + let mut add_remove_partition_cols = ADD_PARTITION_SCHEMA.clone(); cdc_partition_cols.extend_from_slice(&this_partition_values); - add_partition_cols.extend_from_slice(&this_partition_values); + add_remove_partition_cols.extend_from_slice(&this_partition_values); // Set up the partition to physical file mapping, this is a mostly unmodified version of what is done in load let cdc_file_groups = @@ -268,9 +296,14 @@ impl CdfLoadBuilder { &partition_values, Self::get_add_action_type(), )?; + let remove_file_groups = create_partition_values( + schema.clone(), + remove, + &partition_values, + Self::get_remove_action_type(), + )?; - // Create the parquet scans for each associated type of file. I am not sure when we would use removes yet, but - // they would be here if / when they are necessary + // Create the parquet scans for each associated type of file. let cdc_scan = ParquetFormat::new() .create_physical_plan( &self.ctx.state(), @@ -293,12 +326,29 @@ impl CdfLoadBuilder { &self.ctx.state(), FileScanConfig { object_store_url: self.log_store.object_store_url(), - file_schema: add_file_schema.clone(), + file_schema: add_remove_file_schema.clone(), file_groups: add_file_groups.into_values().collect(), - statistics: Statistics::new_unknown(&add_file_schema), + statistics: Statistics::new_unknown(&add_remove_file_schema.clone()), + projection: None, + limit: None, + table_partition_cols: add_remove_partition_cols.clone(), + output_ordering: vec![], + }, + None, + ) + .await?; + + let remove_scan = ParquetFormat::new() + .create_physical_plan( + &self.ctx.state(), + FileScanConfig { + object_store_url: self.log_store.object_store_url(), + file_schema: add_remove_file_schema.clone(), + file_groups: remove_file_groups.into_values().collect(), + statistics: Statistics::new_unknown(&add_remove_file_schema), projection: None, limit: None, - table_partition_cols: add_partition_cols, + table_partition_cols: add_remove_partition_cols, output_ordering: vec![], }, None, @@ -308,7 +358,7 @@ impl CdfLoadBuilder { // The output batches are then unioned to create a single output. Coalesce partitions is only here for the time // being for development. I plan to parallelize the reads once the base idea is correct. let mut union_scan: Arc = - Arc::new(UnionExec::new(vec![cdc_scan, add_scan])); + Arc::new(UnionExec::new(vec![cdc_scan, add_scan, remove_scan])); if let Some(columns) = &self.columns { let expressions: Vec<(Arc, String)> = union_scan @@ -330,23 +380,26 @@ impl CdfLoadBuilder { } #[cfg(test)] -mod tests { +pub(crate) mod tests { use super::*; use std::error::Error; use std::str::FromStr; - use arrow_array::RecordBatch; + use arrow_array::{Int32Array, RecordBatch, StringArray}; + use arrow_schema::Schema; use chrono::NaiveDateTime; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; use datafusion_common::assert_batches_sorted_eq; + use itertools::Itertools; use crate::delta_datafusion::cdf::DeltaCdfScan; use crate::operations::collect_sendable_stream; + use crate::test_utils::TestSchemas; use crate::writer::test_utils::TestResult; - use crate::DeltaOps; + use crate::{DeltaConfigKey, DeltaOps, DeltaTable}; - async fn collect_batches( + pub(crate) async fn collect_batches( num_partitions: usize, stream: DeltaCdfScan, ctx: SessionContext, @@ -544,4 +597,110 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_use_remove_actions_for_deletions() -> TestResult { + let delta_schema = TestSchemas::simple(); + let table: DeltaTable = DeltaOps::new_in_memory() + .create() + .with_columns(delta_schema.fields().cloned()) + .with_partition_columns(["id"]) + .with_configuration_property(DeltaConfigKey::EnableChangeDataFeed, Some("true")) + .await + .unwrap(); + assert_eq!(table.version(), 0); + + let schema = Arc::new(Schema::try_from(delta_schema)?); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(StringArray::from(vec![Some("1"), Some("2"), Some("3")])), + Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])), + Arc::new(StringArray::from(vec![ + Some("yes"), + Some("yes"), + Some("no"), + ])), + ], + ) + .unwrap(); + + let second_batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(StringArray::from(vec![Some("3")])), + Arc::new(Int32Array::from(vec![Some(10)])), + Arc::new(StringArray::from(vec![Some("yes")])), + ], + ) + .unwrap(); + + let table = DeltaOps(table) + .write(vec![batch]) + .await + .expect("Failed to write first batch"); + assert_eq!(table.version(), 1); + + let table = DeltaOps(table) + .write([second_batch]) + .with_save_mode(crate::protocol::SaveMode::Overwrite) + .await + .unwrap(); + assert_eq!(table.version(), 2); + + let ctx = SessionContext::new(); + let cdf_scan = DeltaOps(table.clone()) + .load_cdf() + .with_session_ctx(ctx.clone()) + .with_starting_version(0) + .build() + .await + .expect("Failed to load CDF"); + + let mut batches = collect_batches( + cdf_scan + .properties() + .output_partitioning() + .partition_count(), + cdf_scan, + ctx, + ) + .await + .expect("Failed to collect batches"); + + // The batches will contain a current _commit_timestamp which shouldn't be check_append_only + let _: Vec<_> = batches.iter_mut().map(|b| b.remove_column(4)).collect(); + + assert_batches_sorted_eq! {[ + "+-------+----------+--------------+-----------------+----+", + "| value | modified | _change_type | _commit_version | id |", + "+-------+----------+--------------+-----------------+----+", + "| 1 | yes | delete | 2 | 1 |", + "| 1 | yes | insert | 1 | 1 |", + "| 10 | yes | insert | 2 | 3 |", + "| 2 | yes | delete | 2 | 2 |", + "| 2 | yes | insert | 1 | 2 |", + "| 3 | no | delete | 2 | 3 |", + "| 3 | no | insert | 1 | 3 |", + "+-------+----------+--------------+-----------------+----+", + ], &batches } + + let snapshot_bytes = table + .log_store + .read_commit_entry(2) + .await? + .expect("failed to get snapshot bytes"); + let version_actions = get_actions(2, snapshot_bytes).await?; + + let cdc_actions = version_actions + .iter() + .filter(|action| match action { + &&Action::Cdc(_) => true, + _ => false, + }) + .collect_vec(); + assert!(cdc_actions.is_empty()); + Ok(()) + } } diff --git a/python/src/lib.rs b/python/src/lib.rs index 8d0ff3cc0c..e87668b3c8 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1247,7 +1247,7 @@ impl RawDeltaTable { Ok(actions .iter() - .map(|action| (action.path(), action.size() as i64)) + .map(|action| (action.path(), action.size)) .collect::>()) } /// Run the delete command on the delta table: delete records following a predicate and return the delete metrics.