Skip to content

Commit

Permalink
Merge branch 'main' into fix/writer_insert_changes
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco authored Aug 12, 2024
2 parents c77a704 + 747603f commit 9c58c4b
Show file tree
Hide file tree
Showing 6 changed files with 350 additions and 47 deletions.
2 changes: 1 addition & 1 deletion crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ use crate::table::state::DeltaTableState;
use crate::table::Constraint;
use crate::{open_table, open_table_with_storage_options, DeltaTable};

const PATH_COLUMN: &str = "__delta_rs_path";
pub(crate) const PATH_COLUMN: &str = "__delta_rs_path";

pub mod cdf;
pub mod expr;
Expand Down
5 changes: 4 additions & 1 deletion crates/core/src/operations/cdc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use crate::DeltaResult;
use datafusion::prelude::*;
use datafusion_common::ScalarValue;

pub const CDC_COLUMN_NAME: &str = "_change_type";

/// The CDCTracker is useful for hooking reads/writes in a manner nececessary to create CDC files
/// associated with commits
pub(crate) struct CDCTracker {
Expand Down Expand Up @@ -88,8 +90,9 @@ mod tests {
use crate::operations::DeltaOps;
use crate::{DeltaConfigKey, DeltaTable};
use arrow::array::{ArrayRef, Int32Array, StructArray};
use arrow::datatypes::{DataType, Field};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema};
use arrow_schema::Schema;
use datafusion::assert_batches_sorted_eq;
use datafusion::datasource::{MemTable, TableProvider};

Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use serde::Serialize;
use super::cdc::should_write_cdc;
use super::datafusion_utils::Expression;
use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL};
use super::write::WriterStatsConfig;

use crate::delta_datafusion::expr::fmt_expr_to_sql;
use crate::delta_datafusion::{
Expand All @@ -52,7 +51,7 @@ use crate::delta_datafusion::{
};
use crate::errors::DeltaResult;
use crate::kernel::{Action, Add, Remove};
use crate::operations::write::{write_execution_plan, write_execution_plan_cdc};
use crate::operations::write::{write_execution_plan, write_execution_plan_cdc, WriterStatsConfig};
use crate::protocol::DeltaOperation;
use crate::table::state::DeltaTableState;
use crate::{DeltaTable, DeltaTableError};
Expand Down
31 changes: 16 additions & 15 deletions crates/core/src/operations/load_cdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use datafusion_physical_expr::{
use std::sync::Arc;
use std::time::SystemTime;

use arrow::record_batch::RecordBatch;
use arrow_schema::{ArrowError, Field};
use chrono::{DateTime, Utc};
use datafusion::datasource::file_format::parquet::ParquetFormat;
Expand Down Expand Up @@ -379,10 +380,24 @@ impl CdfLoadBuilder {
}
}

/// Helper function to collect batches associated with reading CDF data
pub(crate) async fn collect_batches(
num_partitions: usize,
stream: DeltaCdfScan,
ctx: SessionContext,
) -> Result<Vec<RecordBatch>, Box<dyn std::error::Error>> {
let mut batches = vec![];
for p in 0..num_partitions {
let data: Vec<RecordBatch> =
crate::operations::collect_sendable_stream(stream.execute(p, ctx.task_ctx())?).await?;
batches.extend_from_slice(&data);
}
Ok(batches)
}

#[cfg(test)]
pub(crate) mod tests {
use super::*;
use std::error::Error;
use std::str::FromStr;

use arrow_array::{Int32Array, RecordBatch, StringArray};
Expand All @@ -399,20 +414,6 @@ pub(crate) mod tests {
use crate::writer::test_utils::TestResult;
use crate::{DeltaConfigKey, DeltaOps, DeltaTable};

pub(crate) async fn collect_batches(
num_partitions: usize,
stream: DeltaCdfScan,
ctx: SessionContext,
) -> Result<Vec<RecordBatch>, Box<dyn Error>> {
let mut batches = vec![];
for p in 0..num_partitions {
let data: Vec<RecordBatch> =
collect_sendable_stream(stream.execute(p, ctx.task_ctx())?).await?;
batches.extend_from_slice(&data);
}
Ok(batches)
}

#[tokio::test]
async fn test_load_local() -> TestResult {
let ctx = SessionContext::new();
Expand Down
Loading

0 comments on commit 9c58c4b

Please sign in to comment.