Skip to content

Commit

Permalink
feat(service/cos): add multipart upload function support (#2697)
Browse files Browse the repository at this point in the history
  • Loading branch information
ArmandoZ authored Jul 24, 2023
1 parent c5c670f commit d94707f
Show file tree
Hide file tree
Showing 3 changed files with 395 additions and 16 deletions.
35 changes: 27 additions & 8 deletions core/src/services/cos/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ use crate::raw::*;
use crate::services::cos::appender::CosAppender;
use crate::*;

const DEFAULT_WRITE_MIN_SIZE: usize = 1024 * 1024;

/// Tencent-Cloud COS services support.
#[doc = include_str!("docs.md")]
#[derive(Default, Clone)]
Expand All @@ -46,6 +48,10 @@ pub struct CosBuilder {
bucket: Option<String>,
http_client: Option<HttpClient>,

/// the part size of cos multipart upload, which should be 1 MB to 5 GB.
/// There is no minimum size limit on the last part of your multipart upload
write_min_size: Option<usize>,

disable_config_load: bool,
}

Expand Down Expand Up @@ -120,6 +126,14 @@ impl CosBuilder {
self
}

/// set the minimum size of unsized write, it should be greater than 1 MB.
/// Reference: [Upload Part | Tencent Cloud](https://www.tencentcloud.com/document/product/436/7750)
pub fn write_min_size(&mut self, write_min_size: usize) -> &mut Self {
self.write_min_size = Some(write_min_size);

self
}

/// Disable config load so that opendal will not load config from
/// environment.
///
Expand Down Expand Up @@ -155,6 +169,8 @@ impl Builder for CosBuilder {
map.get("endpoint").map(|v| builder.endpoint(v));
map.get("secret_id").map(|v| builder.secret_id(v));
map.get("secret_key").map(|v| builder.secret_key(v));
map.get("write_min_size")
.map(|v| builder.write_min_size(v.parse().expect("input must be a number")));

builder
}
Expand Down Expand Up @@ -218,6 +234,14 @@ impl Builder for CosBuilder {
let cred_loader = TencentCosCredentialLoader::new(client.client(), cfg);

let signer = TencentCosSigner::new();
let write_min_size = self.write_min_size.unwrap_or(DEFAULT_WRITE_MIN_SIZE);
if write_min_size < 1024 * 1024 {
return Err(Error::new(
ErrorKind::ConfigInvalid,
"The write minimum buffer size is misconfigured",
)
.with_context("service", Scheme::Cos));
}

debug!("backend build finished");
Ok(CosBackend {
Expand All @@ -228,6 +252,7 @@ impl Builder for CosBuilder {
signer,
loader: cred_loader,
client,
write_min_size,
}),
})
}
Expand Down Expand Up @@ -269,6 +294,7 @@ impl Accessor for CosBackend {
write_can_sink: true,
write_with_content_type: true,
write_with_cache_control: true,
write_without_content_length: true,

append: true,
append_with_cache_control: true,
Expand Down Expand Up @@ -332,16 +358,9 @@ impl Accessor for CosBackend {
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
if args.content_length().is_none() {
return Err(Error::new(
ErrorKind::Unsupported,
"write without content length is not supported",
));
}

Ok((
RpWrite::default(),
CosWriter::new(self.core.clone(), args, path.to_string()),
CosWriter::new(self.core.clone(), path, args),
))
}

Expand Down
178 changes: 178 additions & 0 deletions core/src/services/cos/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::fmt::Debug;
use std::fmt::Formatter;
use std::time::Duration;

use bytes::Bytes;
use http::header::CACHE_CONTROL;
use http::header::CONTENT_DISPOSITION;
use http::header::CONTENT_LENGTH;
Expand All @@ -30,6 +31,8 @@ use http::Response;
use reqsign::TencentCosCredential;
use reqsign::TencentCosCredentialLoader;
use reqsign::TencentCosSigner;
use serde::Deserialize;
use serde::Serialize;

use crate::raw::*;
use crate::*;
Expand All @@ -42,6 +45,7 @@ pub struct CosCore {
pub signer: TencentCosSigner,
pub loader: TencentCosCredentialLoader,
pub client: HttpClient,
pub write_min_size: usize,
}

impl Debug for CosCore {
Expand Down Expand Up @@ -324,4 +328,178 @@ impl CosCore {

self.send(req).await
}

pub async fn cos_initiate_multipart_upload(
&self,
path: &str,
content_type: Option<&str>,
content_disposition: Option<&str>,
cache_control: Option<&str>,
) -> Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);

let url = format!("{}/{}?uploads", self.endpoint, percent_encode_path(&p));

let mut req = Request::post(&url);

if let Some(mime) = content_type {
req = req.header(CONTENT_TYPE, mime)
}

if let Some(content_disposition) = content_disposition {
req = req.header(CONTENT_DISPOSITION, content_disposition)
}

if let Some(cache_control) = cache_control {
req = req.header(CACHE_CONTROL, cache_control)
}

let mut req = req
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;

self.sign(&mut req).await?;

self.send(req).await
}

pub async fn cos_upload_part_request(
&self,
path: &str,
upload_id: &str,
part_number: usize,
size: Option<u64>,
body: AsyncBody,
) -> Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);

let url = format!(
"{}/{}?partNumber={}&uploadId={}",
self.endpoint,
percent_encode_path(&p),
part_number,
percent_encode_path(upload_id)
);

let mut req = Request::put(&url);

if let Some(size) = size {
req = req.header(CONTENT_LENGTH, size);
}

// Set body
let mut req = req.body(body).map_err(new_request_build_error)?;

self.sign(&mut req).await?;

self.send(req).await
}

pub async fn cos_complete_multipart_upload(
&self,
path: &str,
upload_id: &str,
parts: &[CompleteMultipartUploadRequestPart],
) -> Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);

let url = format!(
"{}/{}?uploadId={}",
self.endpoint,
percent_encode_path(&p),
percent_encode_path(upload_id)
);

let req = Request::post(&url);

let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest {
part: parts.to_vec(),
})
.map_err(new_xml_deserialize_error)?;
// Make sure content length has been set to avoid post with chunked encoding.
let req = req.header(CONTENT_LENGTH, content.len());
// Set content-type to `application/xml` to avoid mixed with form post.
let req = req.header(CONTENT_TYPE, "application/xml");

let mut req = req
.body(AsyncBody::Bytes(Bytes::from(content)))
.map_err(new_request_build_error)?;

self.sign(&mut req).await?;

self.send(req).await
}

/// Abort an on-going multipart upload.
pub async fn cos_abort_multipart_upload(
&self,
path: &str,
upload_id: &str,
) -> Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);

let url = format!(
"{}/{}?uploadId={}",
self.endpoint,
percent_encode_path(&p),
percent_encode_path(upload_id)
);

let mut req = Request::delete(&url)
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
self.sign(&mut req).await?;
self.send(req).await
}
}

/// Result of CreateMultipartUpload
#[derive(Default, Debug, Deserialize)]
#[serde(default, rename_all = "PascalCase")]
pub struct InitiateMultipartUploadResult {
pub upload_id: String,
}

/// Request of CompleteMultipartUploadRequest
#[derive(Default, Debug, Serialize)]
#[serde(default, rename = "CompleteMultipartUpload", rename_all = "PascalCase")]
pub struct CompleteMultipartUploadRequest {
pub part: Vec<CompleteMultipartUploadRequestPart>,
}

#[derive(Clone, Default, Debug, Serialize)]
#[serde(default, rename_all = "PascalCase")]
pub struct CompleteMultipartUploadRequestPart {
#[serde(rename = "PartNumber")]
pub part_number: usize,
/// # TODO
///
/// quick-xml will do escape on `"` which leads to our serialized output is
/// not the same as aws s3's example.
///
/// Ideally, we could use `serialize_with` to address this (buf failed)
///
/// ```ignore
/// #[derive(Default, Debug, Serialize)]
/// #[serde(default, rename_all = "PascalCase")]
/// struct CompleteMultipartUploadRequestPart {
/// #[serde(rename = "PartNumber")]
/// part_number: usize,
/// #[serde(rename = "ETag", serialize_with = "partial_escape")]
/// etag: String,
/// }
///
/// fn partial_escape<S>(s: &str, ser: S) -> std::result::Result<S::Ok, S::Error>
/// where
/// S: serde::Serializer,
/// {
/// ser.serialize_str(&String::from_utf8_lossy(
/// &quick_xml::escape::partial_escape(s.as_bytes()),
/// ))
/// }
/// ```
///
/// ref: <https://github.com/tafia/quick-xml/issues/362>
#[serde(rename = "ETag")]
pub etag: String,
}
Loading

0 comments on commit d94707f

Please sign in to comment.