Skip to content

Commit

Permalink
refactor: Implement RFC-0554 Write Refactor (#556)
Browse files Browse the repository at this point in the history
* Save work

Signed-off-by: Xuanwo <[email protected]>

* Save work

Signed-off-by: Xuanwo <[email protected]>

* Remove writer

Signed-off-by: Xuanwo <[email protected]>

* Cleanup build

Signed-off-by: Xuanwo <[email protected]>

* Cargo fix

Signed-off-by: Xuanwo <[email protected]>

* Cargo fix

Signed-off-by: Xuanwo <[email protected]>

* Remove not used structs

Signed-off-by: Xuanwo <[email protected]>

* Fix tests

Signed-off-by: Xuanwo <[email protected]>

* Format code

Signed-off-by: Xuanwo <[email protected]>

* Polish code

Signed-off-by: Xuanwo <[email protected]>

* Content length should be set before signing

Signed-off-by: Xuanwo <[email protected]>

* Refactor code

Signed-off-by: Xuanwo <[email protected]>

* Fix s3

Signed-off-by: Xuanwo <[email protected]>

* Fix oay http

Signed-off-by: Xuanwo <[email protected]>

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Aug 23, 2022
1 parent d5f58d7 commit 06262de
Show file tree
Hide file tree
Showing 24 changed files with 397 additions and 437 deletions.
2 changes: 1 addition & 1 deletion benches/ops/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ fn bench_write_once(c: &mut Criterion, op: Operator) {
&(op.clone(), &path, content),
|b, (op, path, content)| {
b.to_async(&*TOKIO).iter(|| async {
op.object(path).write(&content).await.unwrap();
op.object(path).write(content.clone()).await.unwrap();
})
},
);
Expand Down
1 change: 1 addition & 0 deletions oay/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions oay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ futures = "0.3"
log = "0.4"
opendal = "0.13.1"
tokio = { version = "1.20", features = ["rt-multi-thread", "macros"] }
sluice = "0.5.5"

# Please comment the following patch while releasing.
[patch.crates-io]
Expand Down
41 changes: 23 additions & 18 deletions oay/src/services/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use actix_web::HttpResponse;
use actix_web::HttpServer;
use anyhow::anyhow;
use futures::stream;
use futures::try_join;
use futures::AsyncWriteExt;
use futures::StreamExt;
use log::error;
Expand Down Expand Up @@ -99,7 +100,7 @@ impl Service {
Ok(HttpResponse::Ok().body(SizedStream::new(size, into_stream(r, 8 * 1024))))
}

async fn put(&self, req: HttpRequest, body: web::Payload) -> Result<HttpResponse> {
async fn put(&self, req: HttpRequest, mut body: web::Payload) -> Result<HttpResponse> {
let o = self.op.object(req.path());

let content_length: u64 = req
Expand Down Expand Up @@ -129,23 +130,27 @@ impl Service {
if content_length == 0 {
o.create().await?
} else {
let mut w = o.writer(content_length).await?;
let mut r = body;

let mut n = 0u64;
while let Some(bs) = r.next().await {
let bs = bs.map_err(|e| {
Error::new(ErrorKind::UnexpectedEof, anyhow!("read body: {e:?}"))
})?;
w.write_all(&bs).await?;
n += bs.len() as u64;
}

w.close().await?;

if content_length != n {
return Err(Error::new(ErrorKind::UnexpectedEof, anyhow!("short read")));
}
let (pr, mut pw) = sluice::pipe::pipe();

try_join!(
async {
o.write_from(content_length, pr).await?;

Ok::<(), Error>(())
},
async {
while let Some(bs) = body.next().await {
let bs = bs.map_err(|e| {
Error::new(ErrorKind::UnexpectedEof, anyhow!("read body: {e:?}"))
})?;
pw.write_all(&bs).await?;
}

pw.close().await?;

Ok::<(), Error>(())
}
)?;
}

Ok(HttpResponse::new(StatusCode::CREATED))
Expand Down
11 changes: 5 additions & 6 deletions src/accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use crate::ops::OpStat;
use crate::ops::OpWrite;
use crate::ops::PresignedRequest;
use crate::BytesReader;
use crate::BytesWriter;
use crate::DirStreamer;
use crate::ObjectMetadata;
use crate::Scheme;
Expand Down Expand Up @@ -88,13 +87,13 @@ pub trait Accessor: Send + Sync + Debug {
}

/// Invoke the `write` operation on the specified path, returns a
/// [`BytesWriter`][crate::BytesWriter] if operate successful.
/// written size if operate successful.
///
/// # Behavior
///
/// - Input path MUST be file path, DON'T NEED to check object mode.
async fn write(&self, args: &OpWrite) -> Result<BytesWriter> {
let _ = args;
async fn write(&self, args: &OpWrite, r: BytesReader) -> Result<u64> {
let (_, _) = (args, r);
unimplemented!()
}

Expand Down Expand Up @@ -159,8 +158,8 @@ impl<T: Accessor> Accessor for Arc<T> {
async fn read(&self, args: &OpRead) -> Result<BytesReader> {
self.as_ref().read(args).await
}
async fn write(&self, args: &OpWrite) -> Result<BytesWriter> {
self.as_ref().write(args).await
async fn write(&self, args: &OpWrite, r: BytesReader) -> Result<u64> {
self.as_ref().write(args, r).await
}
async fn stat(&self, args: &OpStat) -> Result<ObjectMetadata> {
self.as_ref().stat(args).await
Expand Down
193 changes: 0 additions & 193 deletions src/http_util/body.rs

This file was deleted.

8 changes: 8 additions & 0 deletions src/http_util/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ pub fn new_request_send_error(op: &'static str, path: &str, err: isahc::Error) -
)
}

/// Create error happened during consuming http response.
pub fn new_response_consume_error(op: &'static str, path: &str, err: Error) -> Error {
Error::new(
err.kind(),
ObjectError::new(op, path, anyhow!("consuming response: {err:?}")),
)
}

/// ErrorResponse carries HTTP status code, headers and body.
///
/// This struct should only be used to parse error response which is small.
Expand Down
5 changes: 1 addition & 4 deletions src/http_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod body;
pub use body::new_http_channel;
pub use body::HttpBodyWriter;

mod client;
pub use client::HttpClient;
pub use client::HttpResponseFuture;
Expand All @@ -32,6 +28,7 @@ mod error;
pub use error::new_request_build_error;
pub use error::new_request_send_error;
pub use error::new_request_sign_error;
pub use error::new_response_consume_error;
pub use error::parse_error_response;
pub use error::ErrorResponse;
pub use error::ErrorResponseFuture;
3 changes: 3 additions & 0 deletions src/io_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,6 @@ pub use compress::DecompressState;
mod walk;
pub use walk::BottomUpWalker;
pub use walk::TopDownWalker;

mod unshared_reader;
pub use unshared_reader::unshared_reader;
Loading

1 comment on commit 06262de

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for opendal ready!

✅ Preview
https://opendal-opl2krgzt-databend.vercel.app

Built with commit 06262de.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.