From d485a721e42e82711cfaa78c98cfe7324c6ee4d0 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Mon, 17 Apr 2023 17:26:30 +0800 Subject: [PATCH 1/3] feat: add abort API to opendal::types::writer::Writer. Add behavoiral test for abort --- core/src/types/writer.rs | 13 +++++++++++++ core/tests/behavior/write.rs | 20 ++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs index 24ac279305d1..3705f4d67559 100644 --- a/core/src/types/writer.rs +++ b/core/src/types/writer.rs @@ -28,6 +28,7 @@ use futures::AsyncWrite; use futures::FutureExt; use crate::ops::OpWrite; +use crate::raw::oio::Write; use crate::raw::*; use crate::*; @@ -75,6 +76,18 @@ impl Writer { } } + /// Abort inner writer. + pub async fn abort(&mut self) -> Result<()> { + if let State::Idle(Some(w)) = &mut self.state { + w.abort().await + } else { + unreachable!( + "writer state invalid while abort, expect Idle, actual {}", + self.state + ); + } + } + /// Close the writer and make sure all data have been stored. pub async fn close(&mut self) -> Result<()> { if let State::Idle(Some(w)) = &mut self.state { diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs index bb682031fc84..9a835a3b49b1 100644 --- a/core/tests/behavior/write.rs +++ b/core/tests/behavior/write.rs @@ -98,6 +98,7 @@ macro_rules! behavior_write_tests { test_delete_not_existing, test_delete_stream, test_append, + test_abort_writer, ); )* }; @@ -578,6 +579,25 @@ pub async fn test_read_with_special_chars(op: Operator) -> Result<()> { Ok(()) } +// Delete existing file should succeed. +pub async fn test_abort_writer(op: Operator) -> Result<()> { + let path = uuid::Uuid::new_v4().to_string(); + let (content, _) = gen_bytes(); + + let mut writer = op.writer(&path).await.unwrap(); + if let Err(e) = writer.append(content).await { + assert_eq!(e.kind(), ErrorKind::Unsupported); + } + + if let Err(e) = writer.abort().await { + assert_eq!(e.kind(), ErrorKind::Unsupported); + } + + // Aborted writer should not write actual file. + assert!(!op.is_exist(&path).await?); + Ok(()) +} + // Delete existing file should succeed. pub async fn test_delete(op: Operator) -> Result<()> { let path = uuid::Uuid::new_v4().to_string(); From b0c4d0cd6ccb33402bd10cfb71d030ef75f2a741 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Mon, 17 Apr 2023 17:41:41 +0800 Subject: [PATCH 2/3] fix: skip test if abort/append is not supported yet --- core/tests/behavior/write.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs index 9a835a3b49b1..91c9ec08f999 100644 --- a/core/tests/behavior/write.rs +++ b/core/tests/behavior/write.rs @@ -584,13 +584,22 @@ pub async fn test_abort_writer(op: Operator) -> Result<()> { let path = uuid::Uuid::new_v4().to_string(); let (content, _) = gen_bytes(); - let mut writer = op.writer(&path).await.unwrap(); + let mut writer = match op.writer(&path).await { + Ok(writer) => writer, + Err(e) => { + assert_eq!(e.kind(), ErrorKind::Unsupported); + return Ok(()); + } + }; + if let Err(e) = writer.append(content).await { assert_eq!(e.kind(), ErrorKind::Unsupported); + return Ok(()); } if let Err(e) = writer.abort().await { assert_eq!(e.kind(), ErrorKind::Unsupported); + return Ok(()); } // Aborted writer should not write actual file. From 46ad5e90f2a414196b09595c0f61fd0386f64a15 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Mon, 17 Apr 2023 17:57:11 +0800 Subject: [PATCH 3/3] fix: s3 returns 204 when abort suceeds --- core/src/services/s3/writer.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs index ccb7e96d5543..d75906f9c85b 100644 --- a/core/src/services/s3/writer.rs +++ b/core/src/services/s3/writer.rs @@ -137,7 +137,8 @@ impl oio::Write for S3Writer { .s3_abort_multipart_upload(&self.path, upload_id) .await?; match resp.status() { - StatusCode::OK => { + // s3 returns code 204 if abort succeeds. + StatusCode::NO_CONTENT => { resp.into_body().consume().await?; Ok(()) }