From 5ff3d553f8f09fedbefca1c1aef6f44a82245167 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 1 Aug 2024 16:35:39 +0800 Subject: [PATCH] Imeplement task notify Signed-off-by: Xuanwo --- integrations/object_store/src/store.rs | 16 ++++++++++++++-- integrations/object_store/src/utils.rs | 8 +++++--- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/integrations/object_store/src/store.rs b/integrations/object_store/src/store.rs index 907205ea31d0..d7dc8193a108 100644 --- a/integrations/object_store/src/store.rs +++ b/integrations/object_store/src/store.rs @@ -184,7 +184,8 @@ impl ObjectStore for OpendalStore { ) -> object_store::Result> { let writer = self .inner - .writer(location.as_ref()) + .writer_with(location.as_ref()) + .concurrent(8) .into_send() .await .map_err(|err| format_object_store_error(err, location.as_ref()))?; @@ -432,6 +433,13 @@ impl ObjectStore for OpendalStore { } /// `MultipartUpload`'s impl based on `Writer` in opendal +/// +/// # Notes +/// +/// OpenDAL writer can handle concurrent internally we don't generate real `UploadPart` like existing +/// implementation do. Instead, we just write the part and notify the next task to be written. +/// +/// The lock here doesn't really involve the write process, it's just for the notify mechanism. struct OpendalMultipartUpload { writer: Arc>, location: Path, @@ -453,12 +461,16 @@ impl MultipartUpload for OpendalMultipartUpload { fn put_part(&mut self, data: PutPayload) -> UploadPart { let writer = self.writer.clone(); let location = self.location.clone(); + + // Generate next notify which will be notified after the current part is written. let next_notify = Arc::new(Notify::new()); + // Fetch the notify for current part to wait for it to be written. let current_notify = self.next_notify.replace(next_notify.clone()); async move { - // Wait for the previous part to be written + // current_notify == None means that it's the first part, we don't need to wait. if let Some(notify) = current_notify { + // Wait for the previous part to be written notify.notified().await; } diff --git a/integrations/object_store/src/utils.rs b/integrations/object_store/src/utils.rs index c4c593797ee6..5bfdc7d23fc0 100644 --- a/integrations/object_store/src/utils.rs +++ b/integrations/object_store/src/utils.rs @@ -18,6 +18,7 @@ use futures::Stream; use object_store::ObjectMeta; use opendal::{Entry, Metadata, Metakey}; +use std::future::IntoFuture; /// Conditionally add the `Send` marker trait for the wrapped type. /// Only take effect when the `send_wrapper` feature is enabled. @@ -88,11 +89,12 @@ pub trait IntoSendFuture { impl IntoSendFuture for T where - T: futures::Future, + T: IntoFuture, { - type Output = SendWrapper; + type Output = SendWrapper; + fn into_send(self) -> Self::Output { - SendWrapper::new(self) + SendWrapper::new(self.into_future()) } }