Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: spawn put writers tasks in separate runtime #2788

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion crates/core/src/operations/writer.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
//! Abstractions and implementations for writing data to delta tables

use std::collections::HashMap;
use std::sync::{Arc, OnceLock};

use arrow_array::RecordBatch;
use arrow_schema::{ArrowError, SchemaRef as ArrowSchemaRef};
use bytes::Bytes;
use delta_kernel::expressions::Scalar;
use futures::future::BoxFuture;
use futures::{FutureExt, TryFutureExt};
use indexmap::IndexMap;
use object_store::{path::Path, ObjectStore};
use parquet::arrow::ArrowWriter;
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
use tokio::runtime::Runtime;
use tracing::debug;

use crate::crate_version;
Expand Down Expand Up @@ -121,6 +125,12 @@ impl WriterConfig {
}
}

/// static write runtime
fn write_rt() -> &'static Runtime {
static WRITE_RT: OnceLock<Runtime> = OnceLock::new();
WRITE_RT.get_or_init(|| Runtime::new().expect("Failed to create a tokio runtime for writing."))
}

#[derive(Debug)]
/// A parquet writer implementation tailored to the needs of writing data to a delta table.
pub struct DeltaWriter {
Expand Down Expand Up @@ -332,6 +342,24 @@ impl PartitionWriter {
)
}

fn spawn<F, O>(&self, f: F, path: Path) -> BoxFuture<'_, DeltaResult<O>>
where
F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a, DeltaResult<O>>
+ Send
+ 'static,
O: Send + 'static,
{
let store = Arc::clone(&self.object_store);
let fut = write_rt().spawn(async move { f(&store, &path).await });
fut.unwrap_or_else(|e| match e.try_into_panic() {
Ok(p) => std::panic::resume_unwind(p),
Err(e) => Err(DeltaTableError::GenericError {
source: Box::new(e),
}),
})
.boxed()
}

fn reset_writer(&mut self) -> DeltaResult<(ArrowWriter<ShareableBuffer>, ShareableBuffer)> {
let new_buffer = ShareableBuffer::default();
let arrow_writer = ArrowWriter::try_new(
Expand Down Expand Up @@ -368,7 +396,12 @@ impl PartitionWriter {
let file_size = buffer.len() as i64;

// write file to object store
self.object_store.put(&path, buffer.into()).await?;
self.spawn(
|store, path| store.put(path, buffer.into()).map_err(|e| e.into()).boxed(),
path.clone(),
)
.await?;

self.files_written.push(
create_add(
&self.config.partition_values,
Expand Down
Loading