Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core): Split buffer logic from underlying storage operations #2903

Merged
merged 12 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 41 additions & 38 deletions .github/workflows/service_test_s3.yml
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,18 @@ jobs:
run: |
docker-compose -f docker-compose-minio.yml up -d
- name: Setup test bucket
env:
AWS_ACCESS_KEY_ID: "minioadmin"
AWS_SECRET_ACCESS_KEY: "minioadmin"
AWS_EC2_METADATA_DISABLED: "true"
run: |
aws --endpoint-url http://127.0.0.1:9000/ s3 mb s3://test

curl -O https://dl.min.io/client/mc/release/linux-amd64/mc
chmod +x mc
./mc alias set local http://127.0.0.1:9000/ minioadmin minioadmin
./mc mb local/test
./mc anonymous set public local/test
while :; do
echo "waiting minio to be ready"
[[ "$(./mc ping --count 1 local | awk '{print $6}' | tr -d '\n')" == "errors=1" ]] || break
sleep 1
done

- name: Setup Rust toolchain
uses: ./.github/actions/setup
with:
Expand All @@ -173,35 +174,37 @@ jobs:
OPENDAL_S3_ALLOW_ANONYMOUS: on
OPENDAL_S3_REGION: us-east-1

r2:
runs-on: ubuntu-latest
if: github.event_name == 'push' || !github.event.pull_request.head.repo.fork
steps:
- uses: actions/checkout@v3
- name: Setup Rust toolchain
uses: ./.github/actions/setup
with:
need-nextest: true

- name: Load secret
id: op-load-secret
uses: 1password/load-secrets-action@v1
with:
export-env: true
env:
OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }}
OPENDAL_S3_TEST: op://services/r2/test
OPENDAL_S3_BUCKET: op://services/r2/bucket
OPENDAL_S3_ENDPOINT: op://services/r2/endpoint
OPENDAL_S3_ACCESS_KEY_ID: op://services/r2/access_key_id
OPENDAL_S3_SECRET_ACCESS_KEY: op://services/r2/secret_access_key

- name: Test
shell: bash
working-directory: core
run: cargo nextest run s3
env:
OPENDAL_S3_REGION: auto
# This is the R2's limitation
# Refer to https://opendal.apache.org/docs/services/s3#compatible-services for more information
OPENDAL_S3_BATCH_MAX_OPERATIONS: 700
# Disable this test until we addressed https://github.com/apache/incubator-opendal/issues/2904
#
# r2:
# runs-on: ubuntu-latest
# if: github.event_name == 'push' || !github.event.pull_request.head.repo.fork
# steps:
# - uses: actions/checkout@v3
# - name: Setup Rust toolchain
# uses: ./.github/actions/setup
# with:
# need-nextest: true
#
# - name: Load secret
# id: op-load-secret
# uses: 1password/load-secrets-action@v1
# with:
# export-env: true
# env:
# OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }}
# OPENDAL_S3_TEST: op://services/r2/test
# OPENDAL_S3_BUCKET: op://services/r2/bucket
# OPENDAL_S3_ENDPOINT: op://services/r2/endpoint
# OPENDAL_S3_ACCESS_KEY_ID: op://services/r2/access_key_id
# OPENDAL_S3_SECRET_ACCESS_KEY: op://services/r2/secret_access_key
#
# - name: Test
# shell: bash
# working-directory: core
# run: cargo nextest run s3
# env:
# OPENDAL_S3_REGION: auto
# # This is the R2's limitation
# # Refer to https://opendal.apache.org/docs/services/s3#compatible-services for more information
# OPENDAL_S3_BATCH_MAX_OPERATIONS: 700
4 changes: 2 additions & 2 deletions core/fuzz/fuzz_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async fn fuzz_writer(op: Operator, input: FuzzInput) -> Result<()> {

let checker = WriteChecker::new(total_size);

let mut writer = op.writer(&path).await?;
let mut writer = op.writer_with(&path).buffer_size(8 * 1024 * 1024).await?;

for chunk in &checker.chunks {
writer.write(chunk.clone()).await?;
Expand All @@ -132,7 +132,7 @@ fuzz_target!(|input: FuzzInput| {
runtime.block_on(async {
fuzz_writer(op, input.clone())
.await
.unwrap_or_else(|_| panic!("fuzz reader must succeed"));
.unwrap_or_else(|_| panic!("fuzz writer must succeed"));
})
}
});
93 changes: 92 additions & 1 deletion core/src/raw/oio/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ impl Cursor {
let len = self.pos.min(self.inner.len() as u64) as usize;
&self.inner.as_ref()[len..]
}

/// Return the length of remaining slice.
pub fn len(&self) -> usize {
self.inner.len() - self.pos as usize
}
}

impl From<Bytes> for Cursor {
Expand Down Expand Up @@ -148,7 +153,93 @@ impl oio::BlockingRead for Cursor {
}
}

/// VectorCursor is the cursor for [`Vec<Bytes>`] that implements [`oio::Read`]
impl oio::Stream for Cursor {
fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
if self.is_empty() {
return Poll::Ready(None);
}

let bs = self.inner.clone();
self.pos += bs.len() as u64;
Poll::Ready(Some(Ok(bs)))
}

fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
self.pos = 0;
Poll::Ready(Ok(()))
}
}

/// # TODO
///
/// we can do some compaction during runtime. For example, merge 4K data
/// into the same bytes instead.
#[derive(Clone)]
pub struct ChunkedCursor {
inner: VecDeque<Bytes>,
idx: usize,
}

impl Default for ChunkedCursor {
fn default() -> Self {
Self::new()
}
}

impl ChunkedCursor {
/// Create a new chunked cursor.
pub fn new() -> Self {
Self {
inner: VecDeque::new(),
idx: 0,
}
}

/// Returns `true` if current cursor is empty.
pub fn is_empty(&self) -> bool {
self.inner.len() <= self.idx
}

/// Return current bytes size of cursor.
pub fn len(&self) -> usize {
self.inner.iter().skip(self.idx).map(|v| v.len()).sum()
}

/// Reset current cursor to start.
pub fn reset(&mut self) {
self.idx = 0;
}

/// Clear the entire cursor.
pub fn clear(&mut self) {
self.idx = 0;
self.inner.clear();
}

/// Push a new bytes into vector cursor.
pub fn push(&mut self, bs: Bytes) {
self.inner.push_back(bs);
}
}

impl oio::Stream for ChunkedCursor {
fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
if self.is_empty() {
return Poll::Ready(None);
}

let bs = self.inner[self.idx].clone();
self.idx += 1;
Poll::Ready(Some(Ok(bs)))
}

fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
self.reset();
Poll::Ready(Ok(()))
}
}

/// VectorCursor is the cursor for [`Vec<Bytes>`] that implements [`oio::Stream`]
pub struct VectorCursor {
inner: VecDeque<Bytes>,
size: usize,
Expand Down
1 change: 1 addition & 0 deletions core/src/raw/oio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ mod page;
pub use page::*;

mod cursor;
pub use cursor::ChunkedCursor;
pub use cursor::Cursor;
pub use cursor::VectorCursor;

Expand Down
79 changes: 77 additions & 2 deletions core/src/raw/oio/stream/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::task::{ready, Context};

use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use pin_project::pin_project;

use crate::*;
Expand Down Expand Up @@ -135,6 +135,29 @@ pub trait StreamExt: Stream {
fn reset(&mut self) -> ResetFuture<'_, Self> {
ResetFuture { inner: self }
}

/// Chain this stream with another stream.
fn chain<S>(self, other: S) -> Chain<Self, S>
where
Self: Sized,
S: Stream,
{
Chain {
first: Some(self),
second: other,
}
}

/// Collect all items from this stream into a single bytes.
fn collect(self) -> Collect<Self>
where
Self: Sized,
{
Collect {
stream: self,
buf: BytesMut::new(),
}
}
}

/// Make this future `!Unpin` for compatibility with async trait methods.
Expand Down Expand Up @@ -172,3 +195,55 @@ where
Pin::new(this.inner).poll_reset(cx)
}
}

/// Stream for the [`chain`](StreamExt::chain) method.
#[must_use = "streams do nothing unless polled"]
pub struct Chain<S1: Stream, S2: Stream> {
first: Option<S1>,
second: S2,
}
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved

impl<S1: Stream, S2: Stream> Stream for Chain<S1, S2> {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
if let Some(first) = self.first.as_mut() {
if let Some(item) = ready!(first.poll_next(cx)) {
return Poll::Ready(Some(item));
}

self.first = None;
}
self.second.poll_next(cx)
}

fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Err(Error::new(
ErrorKind::Unsupported,
"chained stream doesn't support reset",
)))
}
}

/// Stream for the [`collect`](StreamExt::collect) method.
#[must_use = "streams do nothing unless polled"]
pub struct Collect<S> {
stream: S,
buf: BytesMut,
}

impl<S> Future for Collect<S>
where
S: Stream,
{
type Output = Result<Bytes>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut();
loop {
match ready!(this.stream.poll_next(cx)) {
Some(Ok(bs)) => this.buf.extend(bs),
Some(Err(err)) => return Poll::Ready(Err(err)),
None => return Poll::Ready(Ok(self.buf.split().freeze())),
}
}
}
}
Loading