Skip to content

Commit

Permalink
chore: re-enable struct support update cdf
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Aug 4, 2024
1 parent 5f2fca8 commit 47752cb
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 35 deletions.
37 changes: 3 additions & 34 deletions crates/core/src/operations/cdc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,14 @@ use tracing::log::*;
/// The CDCTracker is useful for hooking reads/writes in a manner nececessary to create CDC files
/// associated with commits
pub(crate) struct CDCTracker {
schema: SchemaRef,
pre_dataframe: DataFrame,
post_dataframe: DataFrame,
}

impl CDCTracker {
/// construct
pub(crate) fn new(
schema: SchemaRef,
pre_dataframe: DataFrame,
post_dataframe: DataFrame,
) -> Self {
pub(crate) fn new(pre_dataframe: DataFrame, post_dataframe: DataFrame) -> Self {
Self {
schema,
pre_dataframe,
post_dataframe,
}
Expand All @@ -44,27 +38,6 @@ impl CDCTracker {
let preimage = pre_df.clone().except(post_df.clone())?;
let postimage = post_df.except(pre_df)?;

// Create a new schema which represents the input batch along with the CDC
// columns
let fields: Vec<Arc<Field>> = self.schema.fields().to_vec().clone();

let mut has_struct = false;
for field in fields.iter() {
match field.data_type() {
DataType::Struct(_) => {
has_struct = true;
}
DataType::List(_) => {
has_struct = true;
}
_ => {}
}
}

if has_struct {
warn!("The schema contains a Struct or List type, which unfortunately means a change data file cannot be captured in this release of delta-rs: <https://github.com/delta-io/delta-rs/issues/2568>. The write operation will complete properly, but no CDC data will be generated for schema: {fields:?}");
}

let preimage = preimage.with_column(
"_change_type",
lit(ScalarValue::Utf8(Some("update_preimage".to_string()))),
Expand Down Expand Up @@ -253,7 +226,7 @@ mod tests {
Arc::new(MemTable::try_new(schema.clone(), vec![vec![updated_batch]]).unwrap());
let updated_df = ctx.read_table(table_provider_updated).unwrap();

let tracker = CDCTracker::new(schema, source_df, updated_df);
let tracker = CDCTracker::new(source_df, updated_df);

match tracker.collect() {
Ok(df) => {
Expand All @@ -276,8 +249,6 @@ mod tests {
}
}

// This cannot be re-enabled until DataFrame.except() works: <https://github.com/apache/datafusion/issues/10749>
#[ignore]
#[tokio::test]
async fn test_sanity_check_with_pure_df() {
let nested_schema = Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -354,8 +325,6 @@ mod tests {
assert_eq!(diff.len(), 1);
}

// This cannot be re-enabled until DataFrame.except() works: <https://github.com/apache/datafusion/issues/10749>
#[ignore]
#[tokio::test]
async fn test_sanity_check_with_struct() {
let ctx = SessionContext::new();
Expand Down Expand Up @@ -423,7 +392,7 @@ mod tests {
Arc::new(MemTable::try_new(schema.clone(), vec![vec![updated_batch]]).unwrap());
let updated_df = ctx.read_table(table_provider_updated).unwrap();

let tracker = CDCTracker::new(schema, source_df, updated_df);
let tracker = CDCTracker::new(source_df, updated_df);

match tracker.collect() {
Ok(df) => {
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,6 @@ async fn execute(
);

let tracker = CDCTracker::new(
input_schema.clone(),
df,
updated_df.drop_columns(&vec![UPDATE_PREDICATE_COLNAME])?,
);
Expand Down

0 comments on commit 47752cb

Please sign in to comment.