Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python): expose delete operation #1687

Merged
merged 7 commits into from
Oct 5, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat(python): expose delete operation
Naively expose the delete operation, with the option to provide a
predicate.
guilhem-dvr committed Oct 2, 2023
commit fdabd1abc06985ec6eec16be4dc2f87d27699e71
26 changes: 20 additions & 6 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
@@ -199,9 +199,9 @@ def _filters_to_expression(filters: FilterType) -> Expression:
Predicates are expressed in disjunctive normal form (DNF), like [("x", "=", "a"), ...].
DNF allows arbitrary boolean logical combinations of single partition predicates.
The innermost tuples each describe a single partition predicate. The list of inner
predicates is interpreted as a conjunction (AND), forming a more selective and
multiple partition predicates. Each tuple has format: (key, op, value) and compares
the key with the value. The supported op are: `=`, `!=`, `in`, and `not in`. If
predicates is interpreted as a conjunction (AND), forming a more selective and
multiple partition predicates. Each tuple has format: (key, op, value) and compares
the key with the value. The supported op are: `=`, `!=`, `in`, and `not in`. If
the op is in or not in, the value must be a collection such as a list, a set or a tuple.
The supported type for value is str. Use empty string `''` for Null partition value.

@@ -302,13 +302,13 @@ def files(

files.__doc__ = f"""
Get the .parquet files of the DeltaTable.

The paths are as they are saved in the delta log, which may either be
relative to the table root or absolute URIs.

:param partition_filters: the partition filters that will be used for
:param partition_filters: the partition filters that will be used for
getting the matched files
:return: list of the .parquet files referenced for the current version
:return: list of the .parquet files referenced for the current version
of the DeltaTable
{_DNF_filter_doc}
"""
@@ -666,6 +666,20 @@ def get_add_actions(self, flatten: bool = False) -> pyarrow.RecordBatch:
"""
return self._table.get_add_actions(flatten)

def delete(self, predicate: Optional[str] = None) -> Dict[str, Any]:
"""Delete records from a Delta Table that statisfy a predicate.

When a predicate is not provided then all records are deleted from the Delta
Table. Otherwise a scan of the Delta table is performed to mark any files
that contain records that satisfy the predicate. Once files are determined
they are rewritten without the records.

:param predicate: a logical expression, defaults to None
:return: the metrics from delete
"""
metrics = self._table.delete(predicate)
return json.loads(metrics)


class TableOptimizer:
"""API for various table optimization commands."""
15 changes: 15 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ use deltalake::checkpoints::create_checkpoint;
use deltalake::datafusion::prelude::SessionContext;
use deltalake::delta_datafusion::DeltaDataChecker;
use deltalake::errors::DeltaTableError;
use deltalake::operations::delete::DeleteBuilder;
use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType};
use deltalake::operations::restore::RestoreBuilder;
use deltalake::operations::transaction::commit;
@@ -594,6 +595,20 @@ impl RawDeltaTable {
.map_err(PythonError::from)?,
))
}

/// Run the delete command on the delta table: delete records following a predicate and return the delete metrics.
#[pyo3(signature = (predicate = None))]
pub fn delete(&mut self, predicate: Option<String>) -> PyResult<String> {
let mut cmd = DeleteBuilder::new(self._table.object_store(), self._table.state.clone());
if let Some(predicate) = predicate {
cmd = cmd.with_predicate(predicate);
}
let (table, metrics) = rt()?
.block_on(cmd.into_future())
.map_err(PythonError::from)?;
self._table.state = table.state;
Ok(serde_json::to_string(&metrics).unwrap())
}
}

fn convert_partition_filters<'a>(
5 changes: 3 additions & 2 deletions rust/src/operations/delete.rs
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@ use datafusion_common::scalar::ScalarValue;
use datafusion_common::DFSchema;
use futures::future::BoxFuture;
use parquet::file::properties::WriterProperties;
use serde::Serialize;
use serde_json::Map;
use serde_json::Value;

@@ -62,7 +63,7 @@ pub struct DeleteBuilder {
app_metadata: Option<Map<String, serde_json::Value>>,
}

#[derive(Default, Debug)]
#[derive(Default, Debug, Serialize)]
/// Metrics for the Delete Operation
pub struct DeleteMetrics {
/// Number of files added
@@ -115,7 +116,7 @@ impl DeleteBuilder {
self
}

/// Writer properties passed to parquet writer for when fiiles are rewritten
/// Writer properties passed to parquet writer for when files are rewritten
pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self {
self.writer_properties = Some(writer_properties);
self