diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 7f24145dca..72912d696d 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -199,9 +199,9 @@ def _filters_to_expression(filters: FilterType) -> Expression: Predicates are expressed in disjunctive normal form (DNF), like [("x", "=", "a"), ...]. DNF allows arbitrary boolean logical combinations of single partition predicates. The innermost tuples each describe a single partition predicate. The list of inner -predicates is interpreted as a conjunction (AND), forming a more selective and -multiple partition predicates. Each tuple has format: (key, op, value) and compares -the key with the value. The supported op are: `=`, `!=`, `in`, and `not in`. If +predicates is interpreted as a conjunction (AND), forming a more selective and +multiple partition predicates. Each tuple has format: (key, op, value) and compares +the key with the value. The supported op are: `=`, `!=`, `in`, and `not in`. If the op is in or not in, the value must be a collection such as a list, a set or a tuple. The supported type for value is str. Use empty string `''` for Null partition value. @@ -302,13 +302,13 @@ def files( files.__doc__ = f""" Get the .parquet files of the DeltaTable. - + The paths are as they are saved in the delta log, which may either be relative to the table root or absolute URIs. -:param partition_filters: the partition filters that will be used for +:param partition_filters: the partition filters that will be used for getting the matched files -:return: list of the .parquet files referenced for the current version +:return: list of the .parquet files referenced for the current version of the DeltaTable {_DNF_filter_doc} """ @@ -666,6 +666,20 @@ def get_add_actions(self, flatten: bool = False) -> pyarrow.RecordBatch: """ return self._table.get_add_actions(flatten) + def delete(self, predicate: Optional[str] = None) -> Dict[str, Any]: + """Delete records from a Delta Table that statisfy a predicate. + + When a predicate is not provided then all records are deleted from the Delta + Table. Otherwise a scan of the Delta table is performed to mark any files + that contain records that satisfy the predicate. Once files are determined + they are rewritten without the records. + + :param predicate: a SQL where clause. If not passed, will delete all rows. + :return: the metrics from delete. + """ + metrics = self._table.delete(predicate) + return json.loads(metrics) + class TableOptimizer: """API for various table optimization commands.""" diff --git a/python/src/lib.rs b/python/src/lib.rs index 8115c1bb76..ebdc395497 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -21,6 +21,7 @@ use deltalake::checkpoints::create_checkpoint; use deltalake::datafusion::prelude::SessionContext; use deltalake::delta_datafusion::DeltaDataChecker; use deltalake::errors::DeltaTableError; +use deltalake::operations::delete::DeleteBuilder; use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType}; use deltalake::operations::restore::RestoreBuilder; use deltalake::operations::transaction::commit; @@ -594,6 +595,20 @@ impl RawDeltaTable { .map_err(PythonError::from)?, )) } + + /// Run the delete command on the delta table: delete records following a predicate and return the delete metrics. + #[pyo3(signature = (predicate = None))] + pub fn delete(&mut self, predicate: Option) -> PyResult { + let mut cmd = DeleteBuilder::new(self._table.object_store(), self._table.state.clone()); + if let Some(predicate) = predicate { + cmd = cmd.with_predicate(predicate); + } + let (table, metrics) = rt()? + .block_on(cmd.into_future()) + .map_err(PythonError::from)?; + self._table.state = table.state; + Ok(serde_json::to_string(&metrics).unwrap()) + } } fn convert_partition_filters<'a>( diff --git a/python/tests/test_delete.py b/python/tests/test_delete.py new file mode 100644 index 0000000000..f90125b2fb --- /dev/null +++ b/python/tests/test_delete.py @@ -0,0 +1,58 @@ +import pathlib + +import pyarrow as pa +import pyarrow.compute as pc + +from deltalake.table import DeltaTable +from deltalake.writer import write_deltalake + + +def test_delete_no_predicates(existing_table: DeltaTable): + old_version = existing_table.version() + + existing_table.delete() + + last_action = existing_table.history(1)[0] + assert last_action["operation"] == "DELETE" + assert existing_table.version() == old_version + 1 + + dataset = existing_table.to_pyarrow_dataset() + assert dataset.count_rows() == 0 + assert len(existing_table.files()) == 0 + + +def test_delete_a_partition(tmp_path: pathlib.Path, sample_data: pa.Table): + write_deltalake(tmp_path, sample_data, partition_by=["bool"]) + + dt = DeltaTable(tmp_path) + old_version = dt.version() + + mask = pc.equal(sample_data["bool"], False) + expected_table = sample_data.filter(mask) + + dt.delete(predicate="bool = true") + + last_action = dt.history(1)[0] + assert last_action["operation"] == "DELETE" + assert dt.version() == old_version + 1 + + table = dt.to_pyarrow_table() + assert table.equals(expected_table) + assert len(dt.files()) == 1 + + +def test_delete_some_rows(existing_table: DeltaTable): + old_version = existing_table.version() + + existing = existing_table.to_pyarrow_table() + mask = pc.invert(pc.is_in(existing["utf8"], pa.array(["0", "1"]))) + expected_table = existing.filter(mask) + + existing_table.delete(predicate="utf8 in ('0', '1')") + + last_action = existing_table.history(1)[0] + assert last_action["operation"] == "DELETE" + assert existing_table.version() == old_version + 1 + + table = existing_table.to_pyarrow_table() + assert table.equals(expected_table) diff --git a/rust/src/operations/delete.rs b/rust/src/operations/delete.rs index f07c92e442..550e97e6ba 100644 --- a/rust/src/operations/delete.rs +++ b/rust/src/operations/delete.rs @@ -31,6 +31,7 @@ use datafusion_common::scalar::ScalarValue; use datafusion_common::DFSchema; use futures::future::BoxFuture; use parquet::file::properties::WriterProperties; +use serde::Serialize; use serde_json::Map; use serde_json::Value; @@ -63,7 +64,7 @@ pub struct DeleteBuilder { app_metadata: Option>, } -#[derive(Default, Debug)] +#[derive(Default, Debug, Serialize)] /// Metrics for the Delete Operation pub struct DeleteMetrics { /// Number of files added @@ -116,7 +117,7 @@ impl DeleteBuilder { self } - /// Writer properties passed to parquet writer for when fiiles are rewritten + /// Writer properties passed to parquet writer for when files are rewritten pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self { self.writer_properties = Some(writer_properties); self