Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: bulk delete for vacuum #1556

Merged
merged 2 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,17 +414,13 @@ def vacuum(
retention_hours: Optional[int] = None,
dry_run: bool = True,
enforce_retention_duration: bool = True,
max_concurrent_requests: int = 10,
) -> List[str]:
"""
Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold.

:param retention_hours: the retention threshold in hours, if none then the value from `configuration.deletedFileRetentionDuration` is used or default of 1 week otherwise.
:param dry_run: when activated, list only the files, delete otherwise
:param enforce_retention_duration: when disabled, accepts retention hours smaller than the value from `configuration.deletedFileRetentionDuration`.
:param max_concurrent_requests: the maximum number of concurrent requests to send to the backend.
Increasing this number may improve performance of vacuuming large tables, however it might also
increase the risk of hitting rate limits.
:return: the list of files no longer referenced by the Delta Table and are older than the retention threshold.
"""
if retention_hours:
Expand All @@ -435,7 +431,6 @@ def vacuum(
dry_run,
retention_hours,
enforce_retention_duration,
max_concurrent_requests,
)

@property
Expand Down
6 changes: 2 additions & 4 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,18 +244,16 @@ impl RawDeltaTable {

/// Run the Vacuum command on the Delta Table: list and delete files no longer referenced
/// by the Delta table and are older than the retention threshold.
#[pyo3(signature = (dry_run, retention_hours = None, enforce_retention_duration = true, max_concurrent_requests = 10))]
#[pyo3(signature = (dry_run, retention_hours = None, enforce_retention_duration = true))]
pub fn vacuum(
&mut self,
dry_run: bool,
retention_hours: Option<u64>,
enforce_retention_duration: bool,
max_concurrent_requests: usize,
) -> PyResult<Vec<String>> {
let mut cmd = VacuumBuilder::new(self._table.object_store(), self._table.state.clone())
.with_enforce_retention_duration(enforce_retention_duration)
.with_dry_run(dry_run)
.with_max_concurrent_requests(max_concurrent_requests);
.with_dry_run(dry_run);
if let Some(retention_period) = retention_hours {
cmd = cmd.with_retention_period(Duration::hours(retention_period as i64));
}
Expand Down
33 changes: 12 additions & 21 deletions rust/src/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::sync::Arc;
use chrono::{Duration, Utc};
use futures::future::BoxFuture;
use futures::{StreamExt, TryStreamExt};
use object_store::Error;
use object_store::{path::Path, ObjectStore};

use crate::errors::{DeltaResult, DeltaTableError};
Expand Down Expand Up @@ -80,8 +81,6 @@ pub struct VacuumBuilder {
retention_period: Option<Duration>,
/// Validate the retention period is not below the retention period configured in the table
enforce_retention_duration: bool,
/// Maximum number of concurrent requests to make
max_concurrent_requests: usize,
/// Don't delete the files. Just determine which files can be deleted
dry_run: bool,
/// Override the source of time
Expand All @@ -106,7 +105,6 @@ impl VacuumBuilder {
store,
retention_period: None,
enforce_retention_duration: true,
max_concurrent_requests: 10,
dry_run: false,
clock: None,
}
Expand All @@ -130,12 +128,6 @@ impl VacuumBuilder {
self
}

/// Set the maximum number of concurrent requests to make
pub fn with_max_concurrent_requests(mut self, max_concurrent_requests: usize) -> Self {
self.max_concurrent_requests = max_concurrent_requests;
self
}

/// add a time source for testing
#[doc(hidden)]
pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
Expand Down Expand Up @@ -186,10 +178,7 @@ impl VacuumBuilder {
files_to_delete.push(obj_meta.location);
}

Ok(VacuumPlan {
files_to_delete,
max_concurrent_requests: self.max_concurrent_requests,
})
Ok(VacuumPlan { files_to_delete })
}
}

Expand Down Expand Up @@ -225,8 +214,6 @@ impl std::future::IntoFuture for VacuumBuilder {
struct VacuumPlan {
/// What files are to be deleted
pub files_to_delete: Vec<Path>,
/// How many concurrent requests to make
pub max_concurrent_requests: usize,
}

impl VacuumPlan {
Expand All @@ -239,13 +226,17 @@ impl VacuumPlan {
});
}

let files_deleted = futures::stream::iter(self.files_to_delete)
.map(|path| {
let store = store.clone();
async move { store.delete(&path).await.map(|_| path) }
let locations = futures::stream::iter(self.files_to_delete)
.map(Result::Ok)
.boxed();

let files_deleted = store
.delete_stream(locations)
.map(|res| match res {
Ok(path) => Ok(path.to_string()),
Err(Error::NotFound { path, .. }) => Ok(path),
Err(err) => Err(err),
})
.buffer_unordered(self.max_concurrent_requests)
.map_ok(|path| path.to_string())
.try_collect::<Vec<_>>()
.await?;

Expand Down