Skip to content

Commit

Permalink
feat(services/azblob): add append support (#2302)
Browse files Browse the repository at this point in the history
* feat(services/azblob): add append support

Signed-off-by: suyanhanx <[email protected]>

* fix

Signed-off-by: suyanhanx <[email protected]>

* fix

Signed-off-by: suyanhanx <[email protected]>

---------

Signed-off-by: suyanhanx <[email protected]>
  • Loading branch information
suyanhanx authored May 24, 2023
1 parent 583de2f commit 88e8709
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 3 deletions.
146 changes: 146 additions & 0 deletions core/src/services/azblob/appender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
use http::StatusCode;

use super::core::*;
use super::error::parse_error;
use crate::ops::OpAppend;
use crate::raw::*;
use crate::*;

const X_MS_BLOB_TYPE: &str = "x-ms-blob-type";
const X_MS_BLOB_APPEND_OFFSET: &str = "x-ms-blob-append-offset";

pub struct AzblobAppender {
core: Arc<AzblobCore>,

op: OpAppend,
path: String,

position: Option<u64>,
}

impl AzblobAppender {
pub fn new(core: Arc<AzblobCore>, path: &str, op: OpAppend) -> Self {
Self {
core,
op,
path: path.to_string(),
position: None,
}
}
}

#[async_trait]
impl oio::Append for AzblobAppender {
async fn append(&mut self, bs: Bytes) -> Result<()> {
// If the position is not set, we need to check the blob.
// Any successful append operation will set the position.
if self.position.is_none() {
let resp = self
.core
.azblob_get_blob_properties(&self.path, None, None)
.await?;

let status = resp.status();

match status {
// Just check the blob type.
// If it is not an appendable blob, return an error.
// We can not get the append position of the blob here.
StatusCode::OK => {
let headers = resp.headers();
let blob_type = headers.get(X_MS_BLOB_TYPE).and_then(|v| v.to_str().ok());
if blob_type != Some("AppendBlob") {
return Err(Error::new(
ErrorKind::ConditionNotMatch,
"the blob is not an appendable blob.",
));
}
}

// If the blob is not existing, we need to create one.
StatusCode::NOT_FOUND => {
let mut req = self.core.azblob_init_appendable_blob_request(
&self.path,
self.op.content_type(),
self.op.cache_control(),
)?;

self.core.sign(&mut req).await?;

let resp = self.core.client.send(req).await?;

let status = resp.status();
match status {
StatusCode::CREATED => {
// do nothing
}
_ => {
return Err(parse_error(resp).await?);
}
}

self.position = Some(0);
}

_ => {
return Err(parse_error(resp).await?);
}
}
}

let size = bs.len();

let mut req = self.core.azblob_append_blob_request(
&self.path,
size,
self.position,
AsyncBody::Bytes(bs),
)?;

self.core.sign(&mut req).await?;

let resp = self.core.send(req).await?;

let status = resp.status();
match status {
StatusCode::CREATED => {
let headers = resp.headers();
let position = headers
.get(X_MS_BLOB_APPEND_OFFSET)
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok());
self.position = position.map(|v| v + size as u64);
}
_ => {
return Err(parse_error(resp).await?);
}
}

Ok(())
}

async fn close(&mut self) -> Result<()> {
Ok(())
}
}
14 changes: 13 additions & 1 deletion core/src/services/azblob/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use reqsign::AzureStorageConfig;
use reqsign::AzureStorageLoader;
use reqsign::AzureStorageSigner;

use super::appender::AzblobAppender;
use super::batch::parse_batch_delete_response;
use super::error::parse_error;
use super::pager::AzblobPager;
Expand Down Expand Up @@ -373,7 +374,7 @@ impl Accessor for AzblobBackend {
type BlockingReader = ();
type Writer = AzblobWriter;
type BlockingWriter = ();
type Appender = ();
type Appender = AzblobAppender;
type Pager = AzblobPager;
type BlockingPager = ();

Expand All @@ -398,6 +399,10 @@ impl Accessor for AzblobBackend {
write_with_cache_control: true,
write_with_content_type: true,

append: true,
append_with_cache_control: true,
append_with_content_type: true,

delete: true,
create_dir: true,
copy: true,
Expand Down Expand Up @@ -479,6 +484,13 @@ impl Accessor for AzblobBackend {
))
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
Ok((
RpAppend::default(),
AzblobAppender::new(self.core.clone(), path, args),
))
}

async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
let resp = self.core.azblob_copy_blob(from, to).await?;

Expand Down
107 changes: 107 additions & 0 deletions core/src/services/azblob/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ mod constants {
pub const X_MS_BLOB_TYPE: &str = "x-ms-blob-type";
pub const X_MS_COPY_SOURCE: &str = "x-ms-copy-source";
pub const X_MS_BLOB_CACHE_CONTROL: &str = "x-ms-blob-cache-control";
pub const X_MS_BLOB_CONDITION_APPENDPOS: &str = "x-ms-blob-condition-appendpos";
}

pub struct AzblobCore {
Expand Down Expand Up @@ -228,6 +229,112 @@ impl AzblobCore {
Ok(req)
}

/// For appendable object, it could be created by `put` an empty blob
/// with `x-ms-blob-type` header set to `AppendBlob`.
/// And it's just initialized with empty content.
///
/// If want to append content to it, we should use the following method `azblob_append_blob_request`.
///
/// # Notes
///
/// Appendable blob's custom header could only be set when it's created.
///
/// The following custom header could be set:
/// - `content-type`
/// - `x-ms-blob-cache-control`
///
/// # Reference
///
/// https://learn.microsoft.com/en-us/rest/api/storageservices/put-blob
pub fn azblob_init_appendable_blob_request(
&self,
path: &str,
content_type: Option<&str>,
cache_control: Option<&str>,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);

let url = format!(
"{}/{}/{}",
self.endpoint,
self.container,
percent_encode_path(&p)
);

let mut req = Request::put(&url);

// The content-length header must be set to zero
// when creating an appendable blob.
req = req.header(CONTENT_LENGTH, 0);
req = req.header(
HeaderName::from_static(constants::X_MS_BLOB_TYPE),
"AppendBlob",
);

if let Some(ty) = content_type {
req = req.header(CONTENT_TYPE, ty)
}

if let Some(cache_control) = cache_control {
req = req.header(constants::X_MS_BLOB_CACHE_CONTROL, cache_control);
}

let req = req
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;

Ok(req)
}

/// Append content to an appendable blob.
/// The content will be appended to the end of the blob.
///
/// # Notes
///
/// - The maximum size of the content could be appended is 4MB.
/// - `Append Block` succeeds only if the blob already exists.
/// - It does not need to provide append position.
/// - But it could use append position to verify the content is appended to the right position.
///
/// Since the `appendpos` only returned by the append operation response,
/// we could not use it when we want to append content to the blob first time.
/// (The first time of the appender, not the blob)
///
/// # Reference
///
/// https://learn.microsoft.com/en-us/rest/api/storageservices/append-block
pub fn azblob_append_blob_request(
&self,
path: &str,
size: usize,
position: Option<u64>,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);

let url = format!(
"{}/{}/{}?comp=appendblock",
self.endpoint,
self.container,
percent_encode_path(&p)
);

let mut req = Request::put(&url);

req = req.header(CONTENT_LENGTH, size);

if let Some(pos) = position {
req = req.header(
HeaderName::from_static(constants::X_MS_BLOB_CONDITION_APPENDPOS),
pos.to_string(),
);
}

let req = req.body(body).map_err(new_request_build_error)?;

Ok(req)
}

pub fn azblob_head_blob_request(
&self,
path: &str,
Expand Down
1 change: 1 addition & 0 deletions core/src/services/azblob/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ This service can be used to:
- [x] stat
- [x] read
- [x] write
- [x] append
- [x] create_dir
- [x] delete
- [x] copy
Expand Down
1 change: 1 addition & 0 deletions core/src/services/azblob/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
mod backend;
pub use backend::AzblobBuilder as Azblob;

mod appender;
mod batch;
mod core;
mod error;
Expand Down
4 changes: 2 additions & 2 deletions core/tests/behavior/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ pub async fn test_append_with_dir_path(op: Operator) -> Result<()> {

/// Test append with cache control must success.
pub async fn test_append_with_cache_control(op: Operator) -> Result<()> {
if !op.info().capability().write_with_cache_control {
if !op.info().capability().append_with_cache_control {
return Ok(());
}

Expand Down Expand Up @@ -174,7 +174,7 @@ pub async fn test_append_with_content_type(op: Operator) -> Result<()> {

/// Write a single file with content disposition should succeed.
pub async fn test_append_with_content_disposition(op: Operator) -> Result<()> {
if !op.info().capability().write_with_content_disposition {
if !op.info().capability().append_with_content_disposition {
return Ok(());
}

Expand Down

0 comments on commit 88e8709

Please sign in to comment.