diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index 8056c85f29..5d6efc44f5 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -115,7 +115,8 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
 utime = "0.3"
 
 [features]
-default = []
+cdf = []
+default = ["cdf"]
 datafusion = [
     "dep:datafusion",
     "datafusion-expr",
diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs
index c1b6208cff..fae36d7cbf 100644
--- a/crates/core/src/delta_datafusion/mod.rs
+++ b/crates/core/src/delta_datafusion/mod.rs
@@ -509,6 +509,12 @@ impl<'a> DeltaScanBuilder<'a> {
         self
     }
 
+    /// Use the provided [SchemaRef] for the [DeltaScan]
+    pub fn with_schema(mut self, schema: SchemaRef) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+
     pub async fn build(self) -> DeltaResult<DeltaScan> {
         let config = self.config;
         let schema = match self.schema {
diff --git a/crates/core/src/operations/cdc.rs b/crates/core/src/operations/cdc.rs
new file mode 100644
index 0000000000..365f4a649a
--- /dev/null
+++ b/crates/core/src/operations/cdc.rs
@@ -0,0 +1,454 @@
+//!
+//! The CDC module contains private tools for managing CDC files
+//!
+
+use crate::table::state::DeltaTableState;
+use crate::DeltaResult;
+
+use arrow::array::{Array, StringArray};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow::record_batch::RecordBatch;
+use datafusion::error::Result as DataFusionResult;
+use datafusion::physical_plan::{
+    metrics::MetricsSet, DisplayAs, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
+};
+use datafusion::prelude::*;
+use futures::{Stream, StreamExt};
+use std::sync::Arc;
+use tokio::sync::mpsc::*;
+use tracing::log::*;
+
+/// Maximum in-memory channel size for the tracker to use
+const MAX_CHANNEL_SIZE: usize = 1024;
+
+/// The CDCTracker is useful for hooking reads/writes in a manner nececessary to create CDC files
+/// associated with commits
+pub(crate) struct CDCTracker {
+    schema: SchemaRef,
+    pre_sender: Sender<RecordBatch>,
+    pre_receiver: Receiver<RecordBatch>,
+    post_sender: Sender<RecordBatch>,
+    post_receiver: Receiver<RecordBatch>,
+}
+
+impl CDCTracker {
+    ///  construct
+    pub(crate) fn new(schema: SchemaRef) -> Self {
+        let (pre_sender, pre_receiver) = channel(MAX_CHANNEL_SIZE);
+        let (post_sender, post_receiver) = channel(MAX_CHANNEL_SIZE);
+        Self {
+            schema,
+            pre_sender,
+            pre_receiver,
+            post_sender,
+            post_receiver,
+        }
+    }
+
+    /// Return an owned [Sender] for the caller to use when sending read but not altered batches
+    pub(crate) fn pre_sender(&self) -> Sender<RecordBatch> {
+        self.pre_sender.clone()
+    }
+
+    /// Return an owned [Sender][ for the caller to use when sending altered batches
+    pub(crate) fn post_sender(&self) -> Sender<RecordBatch> {
+        self.post_sender.clone()
+    }
+
+    pub(crate) async fn collect(mut self) -> DeltaResult<Vec<RecordBatch>> {
+        debug!("Collecting all the batches for diffing");
+        let ctx = SessionContext::new();
+        let mut pre = vec![];
+        let mut post = vec![];
+
+        while !self.pre_receiver.is_empty() {
+            if let Ok(batch) = self.pre_receiver.try_recv() {
+                pre.push(batch);
+            } else {
+                warn!("Error when receiving on the pre-receiver");
+            }
+        }
+
+        while !self.post_receiver.is_empty() {
+            if let Ok(batch) = self.post_receiver.try_recv() {
+                post.push(batch);
+            } else {
+                warn!("Error when receiving on the post-receiver");
+            }
+        }
+
+        // Collect _all_ the batches for consideration
+        let pre = ctx.read_batches(pre)?;
+        let post = ctx.read_batches(post)?;
+
+        // There is certainly a better way to do this other than stupidly cloning data for diffing
+        // purposes, but this is the quickest and easiest way to "diff" the two sets of batches
+        let preimage = pre.clone().except(post.clone())?;
+        let postimage = post.except(pre)?;
+
+        // Create a new schema which represents the input batch along with the CDC
+        // columns
+        let mut fields: Vec<Arc<Field>> = self.schema.fields().to_vec().clone();
+        fields.push(Arc::new(Field::new("_change_type", DataType::Utf8, true)));
+        let schema = Arc::new(Schema::new(fields));
+
+        let mut batches = vec![];
+
+        let mut pre_stream = preimage.execute_stream().await?;
+        let mut post_stream = postimage.execute_stream().await?;
+
+        // Fill up on pre image batches
+        while let Some(Ok(batch)) = pre_stream.next().await {
+            let batch = crate::operations::cast::cast_record_batch(
+                &batch,
+                self.schema.clone(),
+                true,
+                false,
+            )?;
+            let new_column = Arc::new(StringArray::from(vec![
+                Some("update_preimage");
+                batch.num_rows()
+            ]));
+            let mut columns: Vec<Arc<dyn Array>> = batch.columns().to_vec();
+            columns.push(new_column);
+
+            let batch = RecordBatch::try_new(schema.clone(), columns)?;
+            batches.push(batch);
+        }
+
+        // Fill up on the post-image batches
+        while let Some(Ok(batch)) = post_stream.next().await {
+            let batch = crate::operations::cast::cast_record_batch(
+                &batch,
+                self.schema.clone(),
+                true,
+                false,
+            )?;
+            let new_column = Arc::new(StringArray::from(vec![
+                Some("update_postimage");
+                batch.num_rows()
+            ]));
+            let mut columns: Vec<Arc<dyn Array>> = batch.columns().to_vec();
+            columns.push(new_column);
+
+            let batch = RecordBatch::try_new(schema.clone(), columns)?;
+            batches.push(batch);
+        }
+
+        debug!("Found {} batches to consider `CDC` data", batches.len());
+
+        // At this point the batches should just contain the changes
+        Ok(batches)
+    }
+}
+
+/// A DataFusion observer to help pick up on pre-image changes
+pub(crate) struct CDCObserver {
+    parent: Arc<dyn ExecutionPlan>,
+    id: String,
+    sender: Sender<RecordBatch>,
+}
+
+impl CDCObserver {
+    pub(crate) fn new(
+        id: String,
+        sender: Sender<RecordBatch>,
+        parent: Arc<dyn ExecutionPlan>,
+    ) -> Self {
+        Self { id, sender, parent }
+    }
+}
+
+impl std::fmt::Debug for CDCObserver {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("CDCObserver").field("id", &self.id).finish()
+    }
+}
+
+impl DisplayAs for CDCObserver {
+    fn fmt_as(
+        &self,
+        _: datafusion::physical_plan::DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        write!(f, "CDCObserver id={}", self.id)
+    }
+}
+
+impl ExecutionPlan for CDCObserver {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.parent.schema()
+    }
+
+    fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
+        self.parent.properties()
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.parent.clone()]
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<datafusion::execution::context::TaskContext>,
+    ) -> datafusion_common::Result<datafusion::physical_plan::SendableRecordBatchStream> {
+        let res = self.parent.execute(partition, context)?;
+        Ok(Box::pin(CDCObserverStream {
+            schema: self.schema(),
+            input: res,
+            sender: self.sender.clone(),
+        }))
+    }
+
+    fn statistics(&self) -> DataFusionResult<datafusion_common::Statistics> {
+        self.parent.statistics()
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
+        if let Some(parent) = children.first() {
+            Ok(Arc::new(CDCObserver {
+                id: self.id.clone(),
+                sender: self.sender.clone(),
+                parent: parent.clone(),
+            }))
+        } else {
+            Err(datafusion_common::DataFusionError::Internal(
+                "Failed to handle CDCObserver".into(),
+            ))
+        }
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        self.parent.metrics()
+    }
+}
+
+/// The CDCObserverStream simply acts to help observe the stream of data being
+/// read by DataFusion to capture the pre-image versions of data
+pub(crate) struct CDCObserverStream {
+    schema: SchemaRef,
+    input: SendableRecordBatchStream,
+    sender: Sender<RecordBatch>,
+}
+
+impl Stream for CDCObserverStream {
+    type Item = DataFusionResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        self.input.poll_next_unpin(cx).map(|x| match x {
+            Some(Ok(batch)) => {
+                let _ = self.sender.try_send(batch.clone());
+                Some(Ok(batch))
+            }
+            other => other,
+        })
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        self.input.size_hint()
+    }
+}
+
+impl RecordBatchStream for CDCObserverStream {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+///
+/// Return true if the specified table is capable of writing Change Data files
+///
+/// From the Protocol:
+///
+/// > For Writer Versions 4 up to 6, all writers must respect the delta.enableChangeDataFeed
+/// > configuration flag in the metadata of the table. When delta.enableChangeDataFeed is true,
+/// > writers must produce the relevant AddCDCFile's for any operation that changes data, as
+/// > specified in Change Data Files.
+/// >
+/// > For Writer Version 7, all writers must respect the delta.enableChangeDataFeed configuration flag in
+/// > the metadata of the table only if the feature changeDataFeed exists in the table protocol's
+/// > writerFeatures.
+pub(crate) fn should_write_cdc(snapshot: &DeltaTableState) -> DeltaResult<bool> {
+    if let Some(features) = &snapshot.protocol().writer_features {
+        // Features should only exist at writer version 7 but to avoid cases where
+        // the Option<HashSet<T>> can get filled with an empty set, checking for the value
+        // explicitly
+        if snapshot.protocol().min_writer_version == 7
+            && !features.contains(&crate::kernel::WriterFeatures::ChangeDataFeed)
+        {
+            // If the writer feature has not been set, then the table should not have CDC written
+            // to it. Otherwise fallback to the configured table configuration
+            return Ok(false);
+        }
+    }
+    Ok(snapshot.table_config().enable_change_data_feed())
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::kernel::DataType as DeltaDataType;
+    use crate::kernel::{Action, PrimitiveType, Protocol};
+    use crate::operations::DeltaOps;
+    use crate::{DeltaConfigKey, DeltaTable};
+    use arrow::array::Int32Array;
+    use datafusion::assert_batches_sorted_eq;
+
+    /// A simple test which validates primitive writer version 1 tables should
+    /// not write Change Data Files
+    #[tokio::test]
+    async fn test_should_write_cdc_basic_table() {
+        let mut table = DeltaOps::new_in_memory()
+            .create()
+            .with_column(
+                "value",
+                DeltaDataType::Primitive(PrimitiveType::Integer),
+                true,
+                None,
+            )
+            .await
+            .expect("Failed to make a table");
+        table.load().await.expect("Failed to reload table");
+        let result = should_write_cdc(table.snapshot().unwrap()).expect("Failed to use table");
+        assert!(
+            result == false,
+            "A default table should not create CDC files"
+        );
+    }
+
+    ///
+    /// This test manually creates a table with writer version 4 that has the configuration sets
+    ///
+    #[tokio::test]
+    async fn test_should_write_cdc_table_with_configuration() {
+        let actions = vec![Action::Protocol(Protocol::new(1, 4))];
+        let mut table: DeltaTable = DeltaOps::new_in_memory()
+            .create()
+            .with_column(
+                "value",
+                DeltaDataType::Primitive(PrimitiveType::Integer),
+                true,
+                None,
+            )
+            .with_actions(actions)
+            .with_configuration_property(DeltaConfigKey::EnableChangeDataFeed, Some("true"))
+            .await
+            .expect("failed to make a version 4 table with EnableChangeDataFeed");
+        table.load().await.expect("Failed to reload table");
+
+        let result = should_write_cdc(table.snapshot().unwrap()).expect("Failed to use table");
+        assert!(
+            result == true,
+            "A table with the EnableChangeDataFeed should create CDC files"
+        );
+    }
+
+    ///
+    /// This test creates a writer version 7 table which has a slightly different way of
+    /// determining whether CDC files should be written or not.
+    #[tokio::test]
+    async fn test_should_write_cdc_v7_table_no_writer_feature() {
+        let actions = vec![Action::Protocol(Protocol::new(1, 7))];
+        let mut table: DeltaTable = DeltaOps::new_in_memory()
+            .create()
+            .with_column(
+                "value",
+                DeltaDataType::Primitive(PrimitiveType::Integer),
+                true,
+                None,
+            )
+            .with_actions(actions)
+            .await
+            .expect("failed to make a version 4 table with EnableChangeDataFeed");
+        table.load().await.expect("Failed to reload table");
+
+        let result = should_write_cdc(table.snapshot().unwrap()).expect("Failed to use table");
+        assert!(
+            result == false,
+            "A v7 table must not write CDC files unless the writer feature is set"
+        );
+    }
+
+    ///
+    /// This test creates a writer version 7 table with a writer table feature enabled for CDC and
+    /// therefore should write CDC files
+    #[tokio::test]
+    async fn test_should_write_cdc_v7_table_with_writer_feature() {
+        let protocol = Protocol::new(1, 7)
+            .with_writer_features(vec![crate::kernel::WriterFeatures::ChangeDataFeed]);
+        let actions = vec![Action::Protocol(protocol)];
+        let mut table: DeltaTable = DeltaOps::new_in_memory()
+            .create()
+            .with_column(
+                "value",
+                DeltaDataType::Primitive(PrimitiveType::Integer),
+                true,
+                None,
+            )
+            .with_actions(actions)
+            .with_configuration_property(DeltaConfigKey::EnableChangeDataFeed, Some("true"))
+            .await
+            .expect("failed to make a version 4 table with EnableChangeDataFeed");
+        table.load().await.expect("Failed to reload table");
+
+        let result = should_write_cdc(table.snapshot().unwrap()).expect("Failed to use table");
+        assert!(
+            result == true,
+            "A v7 table must not write CDC files unless the writer feature is set"
+        );
+    }
+
+    #[tokio::test]
+    async fn test_sanity_check() {
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "value",
+            DataType::Int32,
+            true,
+        )]));
+        let tracker = CDCTracker::new(schema.clone());
+
+        let batch = RecordBatch::try_new(
+            Arc::clone(&schema),
+            vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)]))],
+        )
+        .unwrap();
+        let updated_batch = RecordBatch::try_new(
+            Arc::clone(&schema),
+            vec![Arc::new(Int32Array::from(vec![Some(1), Some(12), Some(3)]))],
+        )
+        .unwrap();
+
+        let _ = tracker.pre_sender().send(batch).await;
+        let _ = tracker.post_sender().send(updated_batch).await;
+
+        match tracker.collect().await {
+            Ok(batches) => {
+                let _ = arrow::util::pretty::print_batches(&batches);
+                assert_eq!(batches.len(), 2);
+                assert_batches_sorted_eq! {[
+                "+-------+------------------+",
+                "| value | _change_type     |",
+                "+-------+------------------+",
+                "| 2     | update_preimage  |",
+                "| 12    | update_postimage |",
+                "+-------+------------------+",
+                    ], &batches }
+            }
+            Err(err) => {
+                println!("err: {err:#?}");
+                panic!("Should have never reached this assertion");
+            }
+        }
+    }
+}
diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs
index bf17ed6085..aba54cd5f1 100644
--- a/crates/core/src/operations/delete.rs
+++ b/crates/core/src/operations/delete.rs
@@ -174,6 +174,7 @@ async fn excute_non_empty_expr(
         false,
         None,
         writer_stats_config,
+        None,
     )
     .await?
     .into_iter()
diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs
index ddbe113d16..c13da4d879 100644
--- a/crates/core/src/operations/merge/mod.rs
+++ b/crates/core/src/operations/merge/mod.rs
@@ -1389,6 +1389,7 @@ async fn execute(
         safe_cast,
         None,
         writer_stats_config,
+        None,
     )
     .await?;
 
diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs
index 7923431d45..761ebd7b4e 100644
--- a/crates/core/src/operations/mod.rs
+++ b/crates/core/src/operations/mod.rs
@@ -39,6 +39,8 @@ use optimize::OptimizeBuilder;
 use restore::RestoreBuilder;
 use set_tbl_properties::SetTablePropertiesBuilder;
 
+#[cfg(all(feature = "cdf", feature = "datafusion"))]
+mod cdc;
 #[cfg(feature = "datafusion")]
 pub mod constraints;
 #[cfg(feature = "datafusion")]
diff --git a/crates/core/src/operations/transaction/protocol.rs b/crates/core/src/operations/transaction/protocol.rs
index c5d9cdf650..707f1daf02 100644
--- a/crates/core/src/operations/transaction/protocol.rs
+++ b/crates/core/src/operations/transaction/protocol.rs
@@ -228,6 +228,11 @@ pub static INSTANCE: Lazy<ProtocolChecker> = Lazy::new(|| {
     let mut writer_features = HashSet::new();
     writer_features.insert(WriterFeatures::AppendOnly);
     writer_features.insert(WriterFeatures::TimestampWithoutTimezone);
+    #[cfg(feature = "cdf")]
+    {
+        writer_features.insert(WriterFeatures::ChangeDataFeed);
+        writer_features.insert(WriterFeatures::GeneratedColumns);
+    }
     #[cfg(feature = "datafusion")]
     {
         writer_features.insert(WriterFeatures::Invariants);
diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs
index 9a088c6ae9..6ca79c6244 100644
--- a/crates/core/src/operations/update.rs
+++ b/crates/core/src/operations/update.rs
@@ -38,8 +38,10 @@ use datafusion_physical_expr::{
     PhysicalExpr,
 };
 use futures::future::BoxFuture;
+use object_store::prefix::PrefixStore;
 use parquet::file::properties::WriterProperties;
 use serde::Serialize;
+use tracing::log::*;
 
 use super::write::write_execution_plan;
 use super::{
@@ -52,12 +54,17 @@ use crate::delta_datafusion::{
     DataFusionMixins, DeltaColumn, DeltaSessionContext,
 };
 use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder};
-use crate::kernel::{Action, Remove};
+use crate::kernel::{Action, AddCDCFile, Remove};
 use crate::logstore::LogStoreRef;
+use crate::operations::cdc::*;
+use crate::operations::writer::{DeltaWriter, WriterConfig};
 use crate::protocol::DeltaOperation;
 use crate::table::state::DeltaTableState;
 use crate::{DeltaResult, DeltaTable};
 
+/// Custom column name used for marking internal [RecordBatch] rows as updated
+pub(crate) const UPDATE_PREDICATE_COLNAME: &str = "__delta_rs_update_predicate";
+
 /// Updates records in the Delta Table.
 /// See this module's documentation for more information
 pub struct UpdateBuilder {
@@ -222,6 +229,10 @@ async fn execute(
 
     let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true))));
 
+    // Create a projection for a new column with the predicate evaluated
+    let input_schema = snapshot.input_schema()?;
+    let tracker = CDCTracker::new(input_schema.clone());
+
     let execution_props = state.execution_props();
     // For each rewrite evaluate the predicate and then modify each expression
     // to either compute the new value or obtain the old one then write these batches
@@ -231,15 +242,23 @@ async fn execute(
         .await?;
     let scan = Arc::new(scan);
 
-    // Create a projection for a new column with the predicate evaluated
-    let input_schema = snapshot.input_schema()?;
+    // Wrap the scan with a CDCObserver if CDC has been abled so that the tracker can
+    // later be used to produce the CDC files
+    let scan: Arc<dyn ExecutionPlan> = match should_write_cdc(&snapshot) {
+        Ok(true) => Arc::new(CDCObserver::new(
+            "cdc-update-observer".into(),
+            tracker.pre_sender(),
+            scan.clone(),
+        )),
+        _others => scan,
+    };
 
     let mut fields = Vec::new();
     for field in input_schema.fields.iter() {
         fields.push(field.to_owned());
     }
     fields.push(Arc::new(Field::new(
-        "__delta_rs_update_predicate",
+        UPDATE_PREDICATE_COLNAME,
         arrow_schema::DataType::Boolean,
         true,
     )));
@@ -265,16 +284,16 @@ async fn execute(
         when(predicate.clone(), lit(true)).otherwise(lit(ScalarValue::Boolean(None)))?;
     let predicate_expr =
         create_physical_expr_fix(predicate_null, &input_dfschema, execution_props)?;
-    expressions.push((predicate_expr, "__delta_rs_update_predicate".to_string()));
+    expressions.push((predicate_expr, UPDATE_PREDICATE_COLNAME.to_string()));
 
     let projection_predicate: Arc<dyn ExecutionPlan> =
-        Arc::new(ProjectionExec::try_new(expressions, scan)?);
+        Arc::new(ProjectionExec::try_new(expressions, scan.clone())?);
 
     let count_plan = Arc::new(MetricObserverExec::new(
         "update_count".into(),
         projection_predicate.clone(),
         |batch, metrics| {
-            let array = batch.column_by_name("__delta_rs_update_predicate").unwrap();
+            let array = batch.column_by_name(UPDATE_PREDICATE_COLNAME).unwrap();
             let copied_rows = array.null_count();
             let num_updated = array.len() - copied_rows;
 
@@ -305,10 +324,10 @@ async fn execute(
     // Maintain a map from the original column name to its temporary column index
     let mut map = HashMap::<String, usize>::new();
     let mut control_columns = HashSet::<String>::new();
-    control_columns.insert("__delta_rs_update_predicate".to_owned());
+    control_columns.insert(UPDATE_PREDICATE_COLNAME.to_string());
 
     for (column, expr) in updates {
-        let expr = case(col("__delta_rs_update_predicate"))
+        let expr = case(col(UPDATE_PREDICATE_COLNAME))
             .when(lit(true), expr.to_owned())
             .otherwise(col(column.to_owned()))?;
         let predicate_expr = create_physical_expr_fix(expr, &input_dfschema, execution_props)?;
@@ -324,6 +343,7 @@ async fn execute(
     // Project again to remove __delta_rs columns and rename update columns to their original name
     let mut expressions: Vec<(Arc<dyn PhysicalExpr>, String)> = Vec::new();
     let scan_schema = projection_update.schema();
+
     for (i, field) in scan_schema.fields().into_iter().enumerate() {
         if !control_columns.contains(field.name()) {
             match map.get(field.name()) {
@@ -364,10 +384,11 @@ async fn execute(
         log_store.object_store().clone(),
         Some(snapshot.table_config().target_file_size() as usize),
         None,
-        writer_properties,
+        writer_properties.clone(),
         safe_cast,
         None,
         writer_stats_config,
+        Some(tracker.post_sender()),
     )
     .await?;
 
@@ -422,6 +443,50 @@ async fn execute(
         serde_json::to_value(&metrics)?,
     );
 
+    match tracker.collect().await {
+        Ok(batches) => {
+            if !batches.is_empty() {
+                debug!(
+                    "Collected {} batches to write as part of this transaction:",
+                    batches.len()
+                );
+                let config = WriterConfig::new(
+                    batches[0].schema().clone(),
+                    snapshot.metadata().partition_columns.clone(),
+                    writer_properties.clone(),
+                    None,
+                    None,
+                    0,
+                    None,
+                );
+
+                let store = Arc::new(PrefixStore::new(
+                    log_store.object_store().clone(),
+                    "_change_data",
+                ));
+                let mut writer = DeltaWriter::new(store, config);
+                for batch in batches {
+                    writer.write(&batch).await?;
+                }
+                // Add the AddCDCFile actions that exist to the commit
+                actions.extend(writer.close().await?.into_iter().map(|add| {
+                    Action::Cdc(AddCDCFile {
+                        // This is a gnarly hack, but the action needs the nested path, not the
+                        // path isnide the prefixed store
+                        path: format!("_change_data/{}", add.path),
+                        size: add.size,
+                        partition_values: add.partition_values,
+                        data_change: false,
+                        tags: add.tags,
+                    })
+                }));
+            }
+        }
+        Err(err) => {
+            error!("Failed to collect CDC batches: {err:#?}");
+        }
+    };
+
     let commit = CommitBuilder::from(commit_properties)
         .with_actions(actions)
         .build(Some(&snapshot), log_store, operation)
@@ -472,10 +537,12 @@ impl std::future::IntoFuture for UpdateBuilder {
 
 #[cfg(test)]
 mod tests {
+    use super::*;
+
+    use crate::delta_datafusion::cdf::DeltaCdfScan;
     use crate::kernel::DataType as DeltaDataType;
-    use crate::kernel::PrimitiveType;
-    use crate::kernel::StructField;
-    use crate::kernel::StructType;
+    use crate::kernel::{Action, PrimitiveType, Protocol, StructField, StructType};
+    use crate::operations::collect_sendable_stream;
     use crate::operations::DeltaOps;
     use crate::writer::test_utils::datafusion::get_data;
     use crate::writer::test_utils::datafusion::write_batch;
@@ -484,12 +551,13 @@ mod tests {
     };
     use crate::DeltaConfigKey;
     use crate::DeltaTable;
+    use arrow::array::{Int32Array, StringArray};
     use arrow::datatypes::Schema as ArrowSchema;
     use arrow::datatypes::{Field, Schema};
     use arrow::record_batch::RecordBatch;
-    use arrow_array::Int32Array;
     use arrow_schema::DataType;
     use datafusion::assert_batches_sorted_eq;
+    use datafusion::physical_plan::ExecutionPlan;
     use datafusion::prelude::*;
     use serde_json::json;
     use std::sync::Arc;
@@ -969,4 +1037,248 @@ mod tests {
             .await;
         assert!(res.is_err());
     }
+
+    #[tokio::test]
+    async fn test_no_cdc_on_older_tables() {
+        let table = prepare_values_table().await;
+        assert_eq!(table.version(), 0);
+        assert_eq!(table.get_files_count(), 1);
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "value",
+            arrow::datatypes::DataType::Int32,
+            true,
+        )]));
+        let batch = RecordBatch::try_new(
+            schema,
+            vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)]))],
+        )
+        .unwrap();
+        let table = DeltaOps(table)
+            .write(vec![batch])
+            .await
+            .expect("Failed to write first batch");
+        assert_eq!(table.version(), 1);
+
+        let (table, _metrics) = DeltaOps(table)
+            .update()
+            .with_predicate(col("value").eq(lit(2)))
+            .with_update("value", lit(12))
+            .await
+            .unwrap();
+        assert_eq!(table.version(), 2);
+
+        // NOTE: This currently doesn't really assert anything because cdc_files() is not reading
+        // actions correct
+        if let Some(state) = table.state.clone() {
+            let cdc_files = state.cdc_files();
+            assert!(cdc_files.is_ok());
+            if let Ok(cdc_files) = cdc_files {
+                let cdc_files: Vec<_> = cdc_files.collect();
+                assert_eq!(cdc_files.len(), 0);
+            }
+        } else {
+            panic!("I shouldn't exist!");
+        }
+
+        // Too close for missiles, switching to guns. Just checking that the data wasn't actually
+        // written instead!
+        if let Ok(files) = crate::storage::utils::flatten_list_stream(
+            &table.object_store(),
+            Some(&object_store::path::Path::from("_change_data")),
+        )
+        .await
+        {
+            assert_eq!(
+                0,
+                files.len(),
+                "This test should not find any written CDC files! {files:#?}"
+            );
+        }
+    }
+
+    #[tokio::test]
+    async fn test_update_cdc_enabled() {
+        // Currently you cannot pass EnableChangeDataFeed through `with_configuration_property`
+        // so the only way to create a truly CDC enabled table is by shoving the Protocol
+        // directly into the actions list
+        let actions = vec![Action::Protocol(Protocol::new(1, 4))];
+        let table: DeltaTable = DeltaOps::new_in_memory()
+            .create()
+            .with_column(
+                "value",
+                DeltaDataType::Primitive(PrimitiveType::Integer),
+                true,
+                None,
+            )
+            .with_actions(actions)
+            .with_configuration_property(DeltaConfigKey::EnableChangeDataFeed, Some("true"))
+            .await
+            .unwrap();
+        assert_eq!(table.version(), 0);
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "value",
+            arrow::datatypes::DataType::Int32,
+            true,
+        )]));
+
+        let batch = RecordBatch::try_new(
+            Arc::clone(&schema),
+            vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)]))],
+        )
+        .unwrap();
+        let table = DeltaOps(table)
+            .write(vec![batch])
+            .await
+            .expect("Failed to write first batch");
+        assert_eq!(table.version(), 1);
+
+        let (table, _metrics) = DeltaOps(table)
+            .update()
+            .with_predicate(col("value").eq(lit(2)))
+            .with_update("value", lit(12))
+            .await
+            .unwrap();
+        assert_eq!(table.version(), 2);
+
+        let ctx = SessionContext::new();
+        let table = DeltaOps(table)
+            .load_cdf()
+            .with_session_ctx(ctx.clone())
+            .with_starting_version(0)
+            .build()
+            .await
+            .expect("Failed to load CDF");
+
+        let mut batches = collect_batches(
+            table.properties().output_partitioning().partition_count(),
+            table,
+            ctx,
+        )
+        .await
+        .expect("Failed to collect batches");
+
+        // The batches will contain a current _commit_timestamp which shouldn't be check_append_only
+        let _: Vec<_> = batches.iter_mut().map(|b| b.remove_column(3)).collect();
+
+        assert_batches_sorted_eq! {[
+        "+-------+------------------+-----------------+",
+        "| value | _change_type     | _commit_version |",
+        "+-------+------------------+-----------------+",
+        "| 1     | insert           | 1               |",
+        "| 2     | insert           | 1               |",
+        "| 2     | update_preimage  | 2               |",
+        "| 12    | update_postimage | 2               |",
+        "| 3     | insert           | 1               |",
+        "+-------+------------------+-----------------+",
+            ], &batches }
+    }
+
+    #[tokio::test]
+    async fn test_update_cdc_enabled_partitions() {
+        // Currently you cannot pass EnableChangeDataFeed through `with_configuration_property`
+        // so the only way to create a truly CDC enabled table is by shoving the Protocol
+        // directly into the actions list
+        let actions = vec![Action::Protocol(Protocol::new(1, 4))];
+        let table: DeltaTable = DeltaOps::new_in_memory()
+            .create()
+            .with_column(
+                "year",
+                DeltaDataType::Primitive(PrimitiveType::String),
+                true,
+                None,
+            )
+            .with_column(
+                "value",
+                DeltaDataType::Primitive(PrimitiveType::Integer),
+                true,
+                None,
+            )
+            .with_partition_columns(vec!["year"])
+            .with_actions(actions)
+            .with_configuration_property(DeltaConfigKey::EnableChangeDataFeed, Some("true"))
+            .await
+            .unwrap();
+        assert_eq!(table.version(), 0);
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("year", DataType::Utf8, true),
+            Field::new("value", DataType::Int32, true),
+        ]));
+
+        let batch = RecordBatch::try_new(
+            Arc::clone(&schema),
+            vec![
+                Arc::new(StringArray::from(vec![
+                    Some("2020"),
+                    Some("2020"),
+                    Some("2024"),
+                ])),
+                Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])),
+            ],
+        )
+        .unwrap();
+        let table = DeltaOps(table)
+            .write(vec![batch])
+            .await
+            .expect("Failed to write first batch");
+        assert_eq!(table.version(), 1);
+
+        let (table, _metrics) = DeltaOps(table)
+            .update()
+            .with_predicate(col("value").eq(lit(2)))
+            .with_update("year", "2024")
+            .await
+            .unwrap();
+        assert_eq!(table.version(), 2);
+
+        let ctx = SessionContext::new();
+        let table = DeltaOps(table)
+            .load_cdf()
+            .with_session_ctx(ctx.clone())
+            .with_starting_version(0)
+            .build()
+            .await
+            .expect("Failed to load CDF");
+
+        let mut batches = collect_batches(
+            table.properties().output_partitioning().partition_count(),
+            table,
+            ctx,
+        )
+        .await
+        .expect("Failed to collect batches");
+
+        let _ = arrow::util::pretty::print_batches(&batches);
+
+        // The batches will contain a current _commit_timestamp which shouldn't be check_append_only
+        let _: Vec<_> = batches.iter_mut().map(|b| b.remove_column(3)).collect();
+
+        assert_batches_sorted_eq! {[
+        "+-------+------------------+-----------------+------+",
+        "| value | _change_type     | _commit_version | year |",
+        "+-------+------------------+-----------------+------+",
+        "| 1     | insert           | 1               | 2020 |",
+        "| 2     | insert           | 1               | 2020 |",
+        "| 2     | update_preimage  | 2               | 2020 |",
+        "| 2     | update_postimage | 2               | 2024 |",
+        "| 3     | insert           | 1               | 2024 |",
+        "+-------+------------------+-----------------+------+",
+            ], &batches }
+    }
+
+    async fn collect_batches(
+        num_partitions: usize,
+        stream: DeltaCdfScan,
+        ctx: SessionContext,
+    ) -> Result<Vec<RecordBatch>, Box<dyn std::error::Error>> {
+        let mut batches = vec![];
+        for p in 0..num_partitions {
+            let data: Vec<RecordBatch> =
+                collect_sendable_stream(stream.execute(p, ctx.task_ctx())?).await?;
+            batches.extend_from_slice(&data);
+        }
+        Ok(batches)
+    }
 }
diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs
index f3b87d4f66..84705c415d 100644
--- a/crates/core/src/operations/write.rs
+++ b/crates/core/src/operations/write.rs
@@ -41,6 +41,7 @@ use datafusion_expr::Expr;
 use futures::future::BoxFuture;
 use futures::StreamExt;
 use parquet::file::properties::WriterProperties;
+use tracing::log::*;
 
 use super::datafusion_utils::Expression;
 use super::transaction::{CommitBuilder, CommitProperties, TableReference, PROTOCOL};
@@ -63,6 +64,8 @@ use crate::table::Constraint as DeltaConstraint;
 use crate::writer::record_batch::divide_by_partition_values;
 use crate::DeltaTable;
 
+use tokio::sync::mpsc::Sender;
+
 #[derive(thiserror::Error, Debug)]
 enum WriteError {
     #[error("No data source supplied to write command.")]
@@ -370,6 +373,7 @@ async fn write_execution_plan_with_predicate(
     safe_cast: bool,
     schema_mode: Option<SchemaMode>,
     writer_stats_config: WriterStatsConfig,
+    sender: Option<Sender<RecordBatch>>,
 ) -> DeltaResult<Vec<Action>> {
     let schema: ArrowSchemaRef = if schema_mode.is_some() {
         plan.schema()
@@ -378,7 +382,6 @@ async fn write_execution_plan_with_predicate(
             .and_then(|s| s.input_schema().ok())
             .unwrap_or(plan.schema())
     };
-
     let checker = if let Some(snapshot) = snapshot {
         DeltaDataChecker::new(snapshot)
     } else {
@@ -410,11 +413,15 @@ async fn write_execution_plan_with_predicate(
         );
         let mut writer = DeltaWriter::new(object_store.clone(), config);
         let checker_stream = checker.clone();
+        let sender_stream = sender.clone();
         let mut stream = inner_plan.execute(i, task_ctx)?;
-        let handle: tokio::task::JoinHandle<DeltaResult<Vec<Action>>> =
-            tokio::task::spawn(async move {
+
+        let handle: tokio::task::JoinHandle<DeltaResult<Vec<Action>>> = tokio::task::spawn(
+            async move {
+                let sendable = sender_stream.clone();
                 while let Some(maybe_batch) = stream.next().await {
                     let batch = maybe_batch?;
+
                     checker_stream.check_batch(&batch).await?;
                     let arr = super::cast::cast_record_batch(
                         &batch,
@@ -422,6 +429,12 @@ async fn write_execution_plan_with_predicate(
                         safe_cast,
                         schema_mode == Some(SchemaMode::Merge),
                     )?;
+
+                    if let Some(s) = sendable.as_ref() {
+                        let _ = s.send(arr.clone()).await;
+                    } else {
+                        debug!("write_execution_plan_with_predicate did not send any batches, no sender.");
+                    }
                     writer.write(&arr).await?;
                 }
                 let add_actions = writer.close().await;
@@ -429,7 +442,8 @@ async fn write_execution_plan_with_predicate(
                     Ok(actions) => Ok(actions.into_iter().map(Action::Add).collect::<Vec<_>>()),
                     Err(err) => Err(err),
                 }
-            });
+            },
+        );
 
         tasks.push(handle);
     }
@@ -460,6 +474,7 @@ pub(crate) async fn write_execution_plan(
     safe_cast: bool,
     schema_mode: Option<SchemaMode>,
     writer_stats_config: WriterStatsConfig,
+    sender: Option<Sender<RecordBatch>>,
 ) -> DeltaResult<Vec<Action>> {
     write_execution_plan_with_predicate(
         None,
@@ -474,6 +489,7 @@ pub(crate) async fn write_execution_plan(
         safe_cast,
         schema_mode,
         writer_stats_config,
+        sender,
     )
     .await
 }
@@ -522,6 +538,7 @@ async fn execute_non_empty_expr(
         false,
         None,
         writer_stats_config,
+        None,
     )
     .await?;
 
@@ -778,6 +795,7 @@ impl std::future::IntoFuture for WriteBuilder {
                 this.safe_cast,
                 this.schema_mode,
                 writer_stats_config.clone(),
+                None,
             )
             .await?;
             actions.extend(add_actions);
@@ -1270,7 +1288,6 @@ mod tests {
             ],
         )
         .unwrap();
-        println!("new_batch: {:?}", new_batch.schema());
         let table = DeltaOps(table)
             .write(vec![new_batch])
             .with_save_mode(SaveMode::Append)