Skip to content

Commit

Permalink
Imeplement task notify
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo committed Aug 1, 2024
1 parent cffb3f9 commit 5ff3d55
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
16 changes: 14 additions & 2 deletions integrations/object_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ impl ObjectStore for OpendalStore {
) -> object_store::Result<Box<dyn MultipartUpload>> {
let writer = self
.inner
.writer(location.as_ref())
.writer_with(location.as_ref())
.concurrent(8)
.into_send()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
Expand Down Expand Up @@ -432,6 +433,13 @@ impl ObjectStore for OpendalStore {
}

/// `MultipartUpload`'s impl based on `Writer` in opendal
///
/// # Notes
///
/// OpenDAL writer can handle concurrent internally we don't generate real `UploadPart` like existing
/// implementation do. Instead, we just write the part and notify the next task to be written.
///
/// The lock here doesn't really involve the write process, it's just for the notify mechanism.
struct OpendalMultipartUpload {
writer: Arc<Mutex<Writer>>,
location: Path,
Expand All @@ -453,12 +461,16 @@ impl MultipartUpload for OpendalMultipartUpload {
fn put_part(&mut self, data: PutPayload) -> UploadPart {
let writer = self.writer.clone();
let location = self.location.clone();

// Generate next notify which will be notified after the current part is written.
let next_notify = Arc::new(Notify::new());
// Fetch the notify for current part to wait for it to be written.
let current_notify = self.next_notify.replace(next_notify.clone());

async move {
// Wait for the previous part to be written
// current_notify == None means that it's the first part, we don't need to wait.
if let Some(notify) = current_notify {
// Wait for the previous part to be written
notify.notified().await;
}

Expand Down
8 changes: 5 additions & 3 deletions integrations/object_store/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use futures::Stream;
use object_store::ObjectMeta;
use opendal::{Entry, Metadata, Metakey};
use std::future::IntoFuture;

/// Conditionally add the `Send` marker trait for the wrapped type.
/// Only take effect when the `send_wrapper` feature is enabled.
Expand Down Expand Up @@ -88,11 +89,12 @@ pub trait IntoSendFuture {

impl<T> IntoSendFuture for T
where
T: futures::Future,
T: IntoFuture,
{
type Output = SendWrapper<T>;
type Output = SendWrapper<T::IntoFuture>;

fn into_send(self) -> Self::Output {
SendWrapper::new(self)
SendWrapper::new(self.into_future())
}
}

Expand Down

0 comments on commit 5ff3d55

Please sign in to comment.