From 96ac7eae7692aff329e1842cdd840635c958d42f Mon Sep 17 00:00:00 2001 From: Nikolay Ulmasov Date: Thu, 28 Dec 2023 14:41:28 +0000 Subject: [PATCH 1/3] implement replaceWhere Signed-off-by: Nikolay Ulmasov --- crates/core/src/delta_datafusion/mod.rs | 6 + crates/core/src/operations/write.rs | 381 ++++++++++++++++++++++-- crates/core/src/writer/test_utils.rs | 18 ++ docs/_build/links.yml | 4 +- docs/src/python/operations.py | 21 ++ docs/src/rust/operations.rs | 32 ++ docs/usage/writing/index.md | 19 +- python/deltalake/writer.py | 30 +- python/src/lib.rs | 2 +- python/tests/test_writer.py | 107 +++++-- 10 files changed, 577 insertions(+), 43 deletions(-) create mode 100644 docs/src/python/operations.py create mode 100644 docs/src/rust/operations.rs diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index ca64c9ef63..5f88d17d26 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -878,6 +878,12 @@ impl DeltaDataChecker { self } + /// Add the specified set of constraints to the current DeltaDataChecker's constraints + pub fn with_extra_constraints(mut self, constraints: Vec) -> Self { + self.constraints.extend(constraints); + self + } + /// Create a new DeltaDataChecker pub fn new(snapshot: &DeltaTableState) -> Self { let invariants = snapshot.schema().get_invariants().unwrap_or_default(); diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index bf36b32459..ad55d6921b 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -27,26 +27,36 @@ use std::collections::HashMap; use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; use arrow_array::RecordBatch; use arrow_cast::can_cast_types; use arrow_schema::{DataType, Fields, SchemaRef as ArrowSchemaRef}; use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; +use datafusion::physical_expr::create_physical_expr; +use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::{memory::MemoryExec, ExecutionPlan}; +use datafusion_common::DFSchema; +use datafusion_expr::Expr; use futures::future::BoxFuture; use futures::StreamExt; use parquet::file::properties::WriterProperties; +use super::datafusion_utils::Expression; use super::transaction::PROTOCOL; use super::writer::{DeltaWriter, WriterConfig}; use super::{transaction::commit, CreateBuilder}; +use crate::delta_datafusion::expr::fmt_expr_to_sql; +use crate::delta_datafusion::expr::parse_predicate_expression; use crate::delta_datafusion::DeltaDataChecker; +use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder}; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::kernel::{Action, Add, PartitionsExt, StructType}; +use crate::kernel::{Action, Add, PartitionsExt, Remove, StructType}; use crate::logstore::LogStoreRef; use crate::protocol::{DeltaOperation, SaveMode}; use crate::storage::ObjectStoreRef; use crate::table::state::DeltaTableState; +use crate::table::Constraint as DeltaConstraint; use crate::writer::record_batch::divide_by_partition_values; use crate::DeltaTable; @@ -79,7 +89,6 @@ impl From for DeltaTableError { } /// Write data into a DeltaTable -#[derive(Debug, Clone)] pub struct WriteBuilder { /// A snapshot of the to-be-loaded table's state snapshot: Option, @@ -94,7 +103,7 @@ pub struct WriteBuilder { /// Column names for table partitioning partition_columns: Option>, /// When using `Overwrite` mode, replace data that matches a predicate - predicate: Option, + predicate: Option, /// Size above which we will write a buffered parquet file to disk. target_file_size: Option, /// Number of records to be written in single batch to underlying writer @@ -154,7 +163,7 @@ impl WriteBuilder { } /// When using `Overwrite` mode, replace data that matches a predicate - pub fn with_replace_where(mut self, predicate: impl Into) -> Self { + pub fn with_replace_where(mut self, predicate: impl Into) -> Self { self.predicate = Some(predicate.into()); self } @@ -292,7 +301,8 @@ impl WriteBuilder { } #[allow(clippy::too_many_arguments)] -pub(crate) async fn write_execution_plan( +async fn write_execution_plan_with_predicate( + predicate: Option, snapshot: Option<&DeltaTableState>, state: SessionState, plan: Arc, @@ -318,6 +328,14 @@ pub(crate) async fn write_execution_plan( } else { DeltaDataChecker::empty() }; + let checker = match predicate { + Some(pred) => { + // TODO: get the name of the outer-most column? `*` will also work but would it be slower? + let chk = DeltaConstraint::new("*", &fmt_expr_to_sql(&pred)?); + checker.with_extra_constraints(vec![chk]) + } + _ => checker, + }; // Write data to disk let mut tasks = vec![]; @@ -363,6 +381,134 @@ pub(crate) async fn write_execution_plan( .collect::>()) } +#[allow(clippy::too_many_arguments)] +pub(crate) async fn write_execution_plan( + snapshot: Option<&DeltaTableState>, + state: SessionState, + plan: Arc, + partition_columns: Vec, + object_store: ObjectStoreRef, + target_file_size: Option, + write_batch_size: Option, + writer_properties: Option, + safe_cast: bool, + overwrite_schema: bool, +) -> DeltaResult> { + write_execution_plan_with_predicate( + None, + snapshot, + state, + plan, + partition_columns, + object_store, + target_file_size, + write_batch_size, + writer_properties, + safe_cast, + overwrite_schema, + ) + .await +} + +async fn execute_non_empty_expr( + snapshot: &DeltaTableState, + log_store: LogStoreRef, + state: SessionState, + partition_columns: Vec, + expression: &Expr, + rewrite: &[Add], + writer_properties: Option, +) -> DeltaResult> { + // For each identified file perform a parquet scan + filter + limit (1) + count. + // If returned count is not zero then append the file to be rewritten and removed from the log. Otherwise do nothing to the file. + + let input_schema = snapshot.input_schema()?; + let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; + + let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state) + .with_files(rewrite) + .build() + .await?; + let scan = Arc::new(scan); + + // Apply the negation of the filter and rewrite files + let negated_expression = Expr::Not(Box::new(Expr::IsTrue(Box::new(expression.clone())))); + + let predicate_expr = create_physical_expr( + &negated_expression, + &input_dfschema, + &input_schema, + state.execution_props(), + )?; + let filter: Arc = + Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?); + + // We don't want to verify the predicate against existing data + let add_actions = write_execution_plan( + Some(snapshot), + state, + filter, + partition_columns, + log_store.object_store(), + Some(snapshot.table_config().target_file_size() as usize), + None, + writer_properties, + false, + false, + ) + .await?; + + Ok(add_actions) +} + +// This should only be called wth a valid predicate +async fn prepare_predicate_actions( + predicate: Expr, + log_store: LogStoreRef, + snapshot: &DeltaTableState, + state: SessionState, + partition_columns: Vec, + writer_properties: Option, + deletion_timestamp: i64, +) -> DeltaResult> { + let candidates = + find_files(snapshot, log_store.clone(), &state, Some(predicate.clone())).await?; + + let add = if candidates.partition_scan { + Vec::new() + } else { + execute_non_empty_expr( + snapshot, + log_store, + state, + partition_columns, + &predicate, + &candidates.candidates, + writer_properties, + ) + .await? + }; + let remove = candidates.candidates; + + let mut actions: Vec = add.into_iter().map(Action::Add).collect(); + + for action in remove { + actions.push(Action::Remove(Remove { + path: action.path, + deletion_timestamp: Some(deletion_timestamp), + data_change: true, + extended_file_metadata: Some(true), + partition_values: Some(action.partition_values), + size: Some(action.size), + deletion_vector: action.deletion_vector, + tags: None, + base_row_id: action.base_row_id, + default_row_commit_version: action.default_row_commit_version, + })) + } + Ok(actions) +} + impl std::future::IntoFuture for WriteBuilder { type Output = DeltaResult; type IntoFuture = BoxFuture<'static, Self::Output>; @@ -465,19 +611,37 @@ impl std::future::IntoFuture for WriteBuilder { Some(state) => state, None => { let ctx = SessionContext::new(); + register_store(this.log_store.clone(), ctx.runtime_env()); ctx.state() } }; - let add_actions = write_execution_plan( + let (predicate_str, predicate) = match this.predicate { + Some(predicate) => { + let pred = match predicate { + Expression::DataFusion(expr) => expr, + Expression::String(s) => { + let df_schema = DFSchema::try_from(schema.as_ref().to_owned())?; + parse_predicate_expression(&df_schema, s, &state)? + // this.snapshot.unwrap().parse_predicate_expression(s, &state)? + } + }; + (Some(fmt_expr_to_sql(&pred)?), Some(pred)) + } + _ => (None, None), + }; + + // Here we need to validate if the new data conforms to a predicate if one is provided + let add_actions = write_execution_plan_with_predicate( + predicate.clone(), this.snapshot.as_ref(), - state, + state.clone(), plan, partition_columns.clone(), this.log_store.object_store().clone(), this.target_file_size, this.write_batch_size, - this.writer_properties, + this.writer_properties.clone(), this.safe_cast, this.overwrite_schema, ) @@ -501,12 +665,26 @@ impl std::future::IntoFuture for WriteBuilder { actions.push(Action::Metadata(metadata)); } - match this.predicate { - Some(_pred) => { - return Err(DeltaTableError::Generic( - "Overwriting data based on predicate is not yet implemented" - .to_string(), - )); + let deletion_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as i64; + + match predicate { + Some(pred) => { + let predicate_actions = prepare_predicate_actions( + pred, + this.log_store.clone(), + snapshot, + state, + partition_columns.clone(), + this.writer_properties, + deletion_timestamp, + ) + .await?; + if !predicate_actions.is_empty() { + actions.extend(predicate_actions); + } } _ => { let remove_actions = snapshot @@ -515,8 +693,8 @@ impl std::future::IntoFuture for WriteBuilder { .map(|p| p.remove_action(true).into()); actions.extend(remove_actions); } - } - }; + }; + } } let operation = DeltaOperation::Write { @@ -526,8 +704,9 @@ impl std::future::IntoFuture for WriteBuilder { } else { None }, - predicate: this.predicate, + predicate: predicate_str, }; + let version = commit( this.log_store.as_ref(), &actions, @@ -577,10 +756,10 @@ mod tests { use super::*; use crate::operations::{collect_sendable_stream, DeltaOps}; use crate::protocol::SaveMode; - use crate::writer::test_utils::datafusion::get_data; use crate::writer::test_utils::datafusion::write_batch; + use crate::writer::test_utils::datafusion::{get_data, get_data_sorted}; use crate::writer::test_utils::{ - get_delta_schema, get_delta_schema_with_nested_struct, get_record_batch, + get_arrow_schema, get_delta_schema, get_delta_schema_with_nested_struct, get_record_batch, get_record_batch_with_nested_struct, setup_table_with_configuration, }; use crate::DeltaConfigKey; @@ -588,6 +767,7 @@ mod tests { use arrow::datatypes::Schema as ArrowSchema; use arrow_array::{Int32Array, StringArray, TimestampMicrosecondArray}; use arrow_schema::{DataType, TimeUnit}; + use datafusion::prelude::*; use datafusion::{assert_batches_eq, assert_batches_sorted_eq}; use serde_json::{json, Value}; @@ -940,4 +1120,167 @@ mod tests { assert_batches_eq!(&expected, &data); } + + #[tokio::test] + async fn test_replace_where() { + let schema = get_arrow_schema(&None); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "C"])), + Arc::new(arrow::array::Int32Array::from(vec![0, 20, 10, 100])), + Arc::new(arrow::array::StringArray::from(vec![ + "2021-02-02", + "2021-02-03", + "2021-02-02", + "2021-02-04", + ])), + ], + ) + .unwrap(); + + let table = DeltaOps::new_in_memory() + .write(vec![batch]) + .with_save_mode(SaveMode::Append) + .await + .unwrap(); + assert_eq!(table.version(), 0); + + let batch_add = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(arrow::array::StringArray::from(vec!["C"])), + Arc::new(arrow::array::Int32Array::from(vec![50])), + Arc::new(arrow::array::StringArray::from(vec!["2023-01-01"])), + ], + ) + .unwrap(); + + let table = DeltaOps(table) + .write(vec![batch_add]) + .with_save_mode(SaveMode::Overwrite) + .with_replace_where(col("id").eq(lit("C"))) + .await + .unwrap(); + assert_eq!(table.version(), 1); + + let expected = [ + "+----+-------+------------+", + "| id | value | modified |", + "+----+-------+------------+", + "| A | 0 | 2021-02-02 |", + "| B | 20 | 2021-02-03 |", + "| C | 50 | 2023-01-01 |", + "+----+-------+------------+", + ]; + let actual = get_data(&table).await; + assert_batches_sorted_eq!(&expected, &actual); + } + + #[tokio::test] + async fn test_replace_where_fail_not_matching_predicate() { + let schema = get_arrow_schema(&None); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "C"])), + Arc::new(arrow::array::Int32Array::from(vec![0, 20, 10, 100])), + Arc::new(arrow::array::StringArray::from(vec![ + "2021-02-02", + "2021-02-03", + "2021-02-02", + "2021-02-04", + ])), + ], + ) + .unwrap(); + + let table = DeltaOps::new_in_memory() + .write(vec![batch]) + .with_save_mode(SaveMode::Append) + .await + .unwrap(); + assert_eq!(table.version(), 0); + + // Take clones of these before an operation resulting in error, otherwise it will + // be impossible to refer to an in-memory table + let table_logstore = table.log_store.clone(); + let table_state = table.state.clone().unwrap(); + + // An attempt to write records non comforming to predicate should fail + let batch_fail = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(arrow::array::StringArray::from(vec!["D"])), + Arc::new(arrow::array::Int32Array::from(vec![1000])), + Arc::new(arrow::array::StringArray::from(vec!["2023-01-01"])), + ], + ) + .unwrap(); + + let table = DeltaOps(table) + .write(vec![batch_fail]) + .with_save_mode(SaveMode::Overwrite) + .with_replace_where(col("id").eq(lit("C"))) + .await; + assert!(table.is_err()); + + // Verify that table state hasn't changed + let table = DeltaTable::new_with_state(table_logstore, table_state); + assert_eq!(table.get_latest_version().await.unwrap(), 0); + } + + #[tokio::test] + async fn test_replace_where_partitioned() { + let schema = get_arrow_schema(&None); + + let batch = get_record_batch(None, false); + + let table = DeltaOps::new_in_memory() + .write(vec![batch]) + .with_partition_columns(["id", "value"]) + .with_save_mode(SaveMode::Append) + .await + .unwrap(); + assert_eq!(table.version(), 0); + + let batch_add = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(arrow::array::StringArray::from(vec!["A", "A", "A"])), + Arc::new(arrow::array::Int32Array::from(vec![11, 13, 15])), + Arc::new(arrow::array::StringArray::from(vec![ + "2024-02-02", + "2024-02-02", + "2024-02-01", + ])), + ], + ) + .unwrap(); + + let table = DeltaOps(table) + .write(vec![batch_add]) + .with_save_mode(SaveMode::Overwrite) + .with_replace_where(col("id").eq(lit("A"))) + .await + .unwrap(); + assert_eq!(table.version(), 1); + + let expected = [ + "+----+-------+------------+", + "| id | value | modified |", + "+----+-------+------------+", + "| A | 11 | 2024-02-02 |", + "| A | 13 | 2024-02-02 |", + "| A | 15 | 2024-02-01 |", + "| B | 2 | 2021-02-02 |", + "| B | 4 | 2021-02-01 |", + "| B | 8 | 2021-02-01 |", + "| B | 9 | 2021-02-01 |", + "+----+-------+------------+", + ]; + let actual = get_data_sorted(&table, "id,value,modified").await; + assert_batches_sorted_eq!(&expected, &actual); + } } diff --git a/crates/core/src/writer/test_utils.rs b/crates/core/src/writer/test_utils.rs index 03552aab84..093ad7cbd0 100644 --- a/crates/core/src/writer/test_utils.rs +++ b/crates/core/src/writer/test_utils.rs @@ -327,6 +327,24 @@ pub mod datafusion { .unwrap() } + pub async fn get_data_sorted(table: &DeltaTable, columns: &str) -> Vec { + let table = DeltaTable::new_with_state( + table.log_store.clone(), + table.state.as_ref().unwrap().clone(), + ); + let ctx = SessionContext::new(); + ctx.register_table("test", Arc::new(table)).unwrap(); + ctx.sql(&format!( + "select {} from test order by {}", + columns, columns + )) + .await + .unwrap() + .collect() + .await + .unwrap() + } + pub async fn write_batch(table: DeltaTable, batch: RecordBatch) -> DeltaTable { DeltaOps(table) .write(vec![batch.clone()]) diff --git a/docs/_build/links.yml b/docs/_build/links.yml index 605cda2d55..5a603059bc 100644 --- a/docs/_build/links.yml +++ b/docs/_build/links.yml @@ -1,4 +1,6 @@ python: DeltaTable: PYTHON_API_URL/delta_table + replaceWhere: https://delta-io.github.io/delta-rs/api/delta_writer/ rust: - DeltaTable: https://docs.rs/deltalake/latest/deltalake/table/struct.DeltaTable.html \ No newline at end of file + DeltaTable: https://docs.rs/deltalake/latest/deltalake/table/struct.DeltaTable.html + replaceWhere: https://docs.rs/deltalake/latest/deltalake/operations/write/struct.WriteBuilder.html#method.with_replace_where \ No newline at end of file diff --git a/docs/src/python/operations.py b/docs/src/python/operations.py new file mode 100644 index 0000000000..700b77421a --- /dev/null +++ b/docs/src/python/operations.py @@ -0,0 +1,21 @@ +def replace_where(): + # --8<-- [start:replace_where] + import pyarrow as pa + from deltalake import write_deltalake + + # Assuming there is already a table in this location with some records where `id = '1'` which we want to overwrite + table_path = "/tmp/my_table" + data = pa.table( + { + "id": pa.array(["1", "1"], pa.string()), + "value": pa.array([11, 12], pa.int64()), + } + ) + write_deltalake( + table_path, + data, + mode="overwrite", + predicate="id = '1'", + engine="rust", + ) + # --8<-- [end:replace_where] diff --git a/docs/src/rust/operations.rs b/docs/src/rust/operations.rs new file mode 100644 index 0000000000..55ab40604f --- /dev/null +++ b/docs/src/rust/operations.rs @@ -0,0 +1,32 @@ +#[tokio::main] +async fn main() -> Result<(), Box> { + // --8<-- [start:replace_where] + // Assuming there is already a table in this location with some records where `id = '1'` which we want to overwrite + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + use arrow_array::RecordBatch; + import deltalake::protocol::SaveMode; + + let schema = ArrowSchema::new(vec![ + Field::new("id", DataType::Utf8, true), + Field::new("value", DataType::Int32, true), + ]); + + let data = RecordBatch::try_new( + &schema, + vec![ + Arc::new(arrow::array::StringArray::from(vec!["1", "1"])), + Arc::new(arrow::array::Int32Array::from(vec![11, 12])), + ], + ) + .unwrap(); + + let table = deltalake::open_table("/tmp/my_table").await.unwrap(); + let table = DeltaOps(table) + .write(vec![data]) + .with_save_mode(SaveMode::Overwrite) + .with_replace_where(col("id").eq(lit("1"))) + .await; + // --8<-- [end:replace_where] + + Ok(()) +} \ No newline at end of file diff --git a/docs/usage/writing/index.md b/docs/usage/writing/index.md index fe92572a81..dc8bb62389 100644 --- a/docs/usage/writing/index.md +++ b/docs/usage/writing/index.md @@ -50,4 +50,21 @@ that partition or else the method will raise an error. ``` This method could also be used to insert a new partition if one doesn't -already exist, making this operation idempotent. \ No newline at end of file +already exist, making this operation idempotent. + +## Overwriting part of the table data using a predicate + +!!! note + + This predicate is often called a `replaceWhere` predicate + +When you don’t specify the `predicate`, the overwrite save mode will replace the entire table. +Instead of replacing the entire table (which is costly!), you may want to overwrite only the specific parts of the table that should be changed. +In this case, you can use a `predicate` to overwrite only the relevant records or partitions. + +!!! note + + Data written must conform to the same predicate, i.e. not contain any records that don't match the `predicate` condition, + otherwise the operation will fail + +{{ code_example('operations', 'replace_where', ['replaceWhere'])}} \ No newline at end of file diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 45a35b64b6..d3b956cbfc 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -118,7 +118,35 @@ def write_deltalake( *, schema: Optional[Union[pa.Schema, DeltaSchema]] = ..., partition_by: Optional[Union[List[str], str]] = ..., - mode: Literal["error", "append", "overwrite", "ignore"] = ..., + mode: Literal["error", "append", "ignore"] = ..., + name: Optional[str] = ..., + description: Optional[str] = ..., + configuration: Optional[Mapping[str, Optional[str]]] = ..., + overwrite_schema: bool = ..., + storage_options: Optional[Dict[str, str]] = ..., + large_dtypes: bool = ..., + engine: Literal["rust"], + writer_properties: WriterProperties = ..., + custom_metadata: Optional[Dict[str, str]] = ..., +) -> None: + ... + + +@overload +def write_deltalake( + table_or_uri: Union[str, Path, DeltaTable], + data: Union[ + "pd.DataFrame", + ds.Dataset, + pa.Table, + pa.RecordBatch, + Iterable[pa.RecordBatch], + RecordBatchReader, + ], + *, + schema: Optional[Union[pa.Schema, DeltaSchema]] = ..., + partition_by: Optional[Union[List[str], str]] = ..., + mode: Literal["overwrite"], name: Optional[str] = ..., description: Optional[str] = ..., configuration: Optional[Mapping[str, Optional[str]]] = ..., diff --git a/python/src/lib.rs b/python/src/lib.rs index 7829d73a75..bfed12bb8b 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1386,7 +1386,7 @@ fn write_to_deltalake( builder = builder.with_description(description); }; - if let Some(predicate) = &predicate { + if let Some(predicate) = predicate { builder = builder.with_replace_where(predicate); }; diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 337d68f931..9252dfdd41 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -869,6 +869,8 @@ def test_replace_where_overwrite( value_type: pa.DataType, filter_string: str, ): + table_path = tmp_path + sample_data = pa.table( { "p1": pa.array(["1", "1", "2", "2"], pa.string()), @@ -876,9 +878,9 @@ def test_replace_where_overwrite( "val": pa.array([1, 1, 1, 1], pa.int64()), } ) - write_deltalake(tmp_path, sample_data, mode="overwrite", partition_by=["p1", "p2"]) + write_deltalake(table_path, sample_data, mode="overwrite") - delta_table = DeltaTable(tmp_path) + delta_table = DeltaTable(table_path) assert ( delta_table.to_pyarrow_table().sort_by( [("p1", "ascending"), ("p2", "ascending")] @@ -890,36 +892,101 @@ def test_replace_where_overwrite( { "p1": pa.array(["1", "1"], pa.string()), "p2": pa.array([value_2, value_1], value_type), - "val": pa.array([2, 2], pa.int64()), + "val": pa.array([2, 3], pa.int64()), } ) expected_data = pa.table( { "p1": pa.array(["1", "1", "2", "2"], pa.string()), "p2": pa.array([value_1, value_2, value_1, value_2], value_type), - "val": pa.array([2, 2, 1, 1], pa.int64()), + "val": pa.array([3, 2, 1, 1], pa.int64()), } ) - with pytest.raises( - DeltaError, - match="Generic DeltaTable error: Overwriting data based on predicate is not yet implemented", - ): - write_deltalake( - tmp_path, - sample_data, - mode="overwrite", - predicate="`p1` = 1", - engine="rust", + write_deltalake( + table_path, + sample_data, + mode="overwrite", + predicate="p1 = '1'", + engine="rust", + ) + + delta_table.update_incremental() + assert ( + delta_table.to_pyarrow_table().sort_by( + [("p1", "ascending"), ("p2", "ascending")] ) + == expected_data + ) - delta_table.update_incremental() - assert ( - delta_table.to_pyarrow_table().sort_by( - [("p1", "ascending"), ("p2", "ascending")] - ) - == expected_data + +@pytest.mark.parametrize( + "value_1,value_2,value_type,filter_string", + [ + (1, 2, pa.int64(), "1"), + (False, True, pa.bool_(), "false"), + (date(2022, 1, 1), date(2022, 1, 2), pa.date32(), "2022-01-01"), + ], +) +def test_replace_where_overwrite_partitioned( + tmp_path: pathlib.Path, + value_1: Any, + value_2: Any, + value_type: pa.DataType, + filter_string: str, +): + table_path = tmp_path + + sample_data = pa.table( + { + "p1": pa.array(["1", "1", "2", "2"], pa.string()), + "p2": pa.array([value_1, value_2, value_1, value_2], value_type), + "val": pa.array([1, 1, 1, 1], pa.int64()), + } + ) + write_deltalake( + table_path, sample_data, mode="overwrite", partition_by=["p1", "p2"] + ) + + delta_table = DeltaTable(table_path) + assert ( + delta_table.to_pyarrow_table().sort_by( + [("p1", "ascending"), ("p2", "ascending")] ) + == sample_data + ) + + replace_data = pa.table( + { + "p1": pa.array(["1", "1"], pa.string()), + "p2": pa.array([value_2, value_1], value_type), + "val": pa.array([2, 3], pa.int64()), + } + ) + expected_data = pa.table( + { + "p1": pa.array(["1", "1", "2", "2"], pa.string()), + "p2": pa.array([value_1, value_2, value_1, value_2], value_type), + "val": pa.array([3, 2, 1, 1], pa.int64()), + } + ) + + write_deltalake( + table_path, + replace_data, + mode="overwrite", + partition_by=["p1", "p2"], + predicate="p1 = '1'", + engine="rust", + ) + + delta_table.update_incremental() + assert ( + delta_table.to_pyarrow_table().sort_by( + [("p1", "ascending"), ("p2", "ascending")] + ) + == expected_data + ) def test_partition_overwrite_with_new_partition( From 31f2988f6a6d87d650249d926b8ba02e5a218cfb Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Wed, 31 Jan 2024 08:46:19 +0100 Subject: [PATCH 2/3] rm old code --- crates/core/src/operations/write.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index ad55d6921b..bb976b5fb9 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -437,7 +437,6 @@ async fn execute_non_empty_expr( let predicate_expr = create_physical_expr( &negated_expression, &input_dfschema, - &input_schema, state.execution_props(), )?; let filter: Arc = From 5e65a43a33073d5b88580aefdf03c19d483d03d3 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Wed, 31 Jan 2024 08:49:54 +0100 Subject: [PATCH 3/3] change dead delta.io links --- docs/index.md | 2 +- python/docs/source/index.rst | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/index.md b/docs/index.md index 8dfa67f924..99b7dc6cb3 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1,6 +1,6 @@ # The deltalake package -This is the documentation for the native Rust/Python implementation of Delta Lake. It is based on the delta-rs Rust library and requires no Spark or JVM dependencies. For the PySpark implementation, see [delta-spark](https://docs.delta.io/latest/api/python/index.html) instead. +This is the documentation for the native Rust/Python implementation of Delta Lake. It is based on the delta-rs Rust library and requires no Spark or JVM dependencies. For the PySpark implementation, see [delta-spark](https://docs.delta.io/latest/api/python/spark/index.html) instead. This module provides the capability to read, write, and manage [Delta Lake](https://delta.io/) tables with Python or Rust without Spark or Java. It uses [Apache Arrow](https://arrow.apache.org/) under the hood, so is compatible with other Arrow-native or integrated libraries such as [pandas](https://pandas.pydata.org/), [DuckDB](https://duckdb.org/), and [Polars](https://www.pola.rs/). diff --git a/python/docs/source/index.rst b/python/docs/source/index.rst index 271d3a85a5..0ab6ad86a7 100644 --- a/python/docs/source/index.rst +++ b/python/docs/source/index.rst @@ -16,7 +16,7 @@ Pandas_, DuckDB_, and Polars_. It is not yet as feature-complete as the PySpark implementation of Delta Lake. If you encounter a bug, please let us know in our `GitHub repo`_. -.. _delta-spark: https://docs.delta.io/latest/api/python/index.html +.. _delta-spark: https://docs.delta.io/latest/api/python/spark/index.html .. _Delta Lake: https://delta.io/ .. _Apache Arrow: https://arrow.apache.org/ .. _Pandas: https://pandas.pydata.org/