Skip to content

Commit

Permalink
refactor: refactor CompleteReader
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 30, 2023
1 parent 7633b43 commit b9dd733
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 100 deletions.
138 changes: 39 additions & 99 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@
use std::cmp;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::io;
use std::sync::Arc;
use std::task::ready;
use std::task::Context;
use std::task::Poll;

use async_trait::async_trait;
use bytes::Bytes;

use crate::raw::oio::BufferReader;
use crate::raw::oio::FileReader;
use crate::raw::oio::FlatLister;
use crate::raw::oio::FourWaysReader;
use crate::raw::oio::LazyReader;
use crate::raw::oio::PrefixLister;
use crate::raw::oio::RangeReader;
use crate::raw::oio::StreamableReader;
use crate::raw::oio::TwoWaysReader;
use crate::raw::*;
use crate::*;

Expand Down Expand Up @@ -293,28 +294,35 @@ impl<A: Accessor> CompleteAccessor<A> {

let seekable = capability.read_can_seek;
let streamable = capability.read_can_next;
let buffer_cap = args.buffer();

match (seekable, streamable) {
let r = match (seekable, streamable) {
(true, true) => {
let r = LazyReader::new(self.inner.clone(), path, args);
Ok((RpRead::new(), CompleteReader::AlreadyComplete(r)))
InnerCompleteReader::One(r)
}
(true, false) => {
let r = FileReader::new(self.inner.clone(), path, args);

Ok((RpRead::new(), CompleteReader::NeedStreamable(r)))
InnerCompleteReader::Two(r)
}
_ => {
let r = RangeReader::new(self.inner.clone(), path, args);

if streamable {
Ok((RpRead::new(), CompleteReader::NeedSeekable(r)))
InnerCompleteReader::Three(r)
} else {
let r = oio::into_streamable_read(r, 256 * 1024);
Ok((RpRead::new(), CompleteReader::NeedBoth(r)))
InnerCompleteReader::Four(r)
}
}
}
};

let r = match buffer_cap {
None => CompleteReader::One(r),
Some(cap) => CompleteReader::Two(BufferReader::new(r, cap)),
};

Ok((RpRead::new(), r))
}

fn complete_blocking_read(
Expand All @@ -329,27 +337,35 @@ impl<A: Accessor> CompleteAccessor<A> {

let seekable = capability.read_can_seek;
let streamable = capability.read_can_next;
let buffer_cap = args.buffer();

match (seekable, streamable) {
let r = match (seekable, streamable) {
(true, true) => {
let r = LazyReader::new(self.inner.clone(), path, args);
Ok((RpRead::new(), CompleteReader::AlreadyComplete(r)))
InnerCompleteReader::One(r)
}
(true, false) => {
let r = FileReader::new(self.inner.clone(), path, args);
Ok((RpRead::new(), CompleteReader::NeedStreamable(r)))
InnerCompleteReader::Two(r)
}
_ => {
let r = RangeReader::new(self.inner.clone(), path, args);

if streamable {
Ok((RpRead::new(), CompleteReader::NeedSeekable(r)))
InnerCompleteReader::Three(r)
} else {
let r = oio::into_streamable_read(r, 256 * 1024);
Ok((RpRead::new(), CompleteReader::NeedBoth(r)))
InnerCompleteReader::Four(r)
}
}
}
};

let r = match buffer_cap {
None => CompleteReader::One(r),
Some(cap) => CompleteReader::Two(BufferReader::new(r, cap)),
};

Ok((RpRead::new(), r))
}

async fn complete_list(
Expand Down Expand Up @@ -659,91 +675,15 @@ impl<A: Accessor> LayeredAccessor for CompleteAccessor<A> {
}
}

pub enum CompleteReader<A: Accessor, R> {
AlreadyComplete(LazyReader<A, R>),
NeedSeekable(RangeReader<A, R>),
NeedStreamable(FileReader<A, R>),
NeedBoth(StreamableReader<RangeReader<A, R>>),
}

impl<A, R> oio::Read for CompleteReader<A, R>
where
A: Accessor<Reader = R>,
R: oio::Read,
{
#[inline]
fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
use CompleteReader::*;

match self {
AlreadyComplete(r) => r.poll_read(cx, buf),
NeedSeekable(r) => r.poll_read(cx, buf),
NeedStreamable(r) => r.poll_read(cx, buf),
NeedBoth(r) => r.poll_read(cx, buf),
}
}

fn poll_seek(&mut self, cx: &mut Context<'_>, pos: io::SeekFrom) -> Poll<Result<u64>> {
use CompleteReader::*;

match self {
AlreadyComplete(r) => r.poll_seek(cx, pos),
NeedSeekable(r) => r.poll_seek(cx, pos),
NeedStreamable(r) => r.poll_seek(cx, pos),
NeedBoth(r) => r.poll_seek(cx, pos),
}
}
pub type CompleteReader<A, R> =
TwoWaysReader<InnerCompleteReader<A, R>, BufferReader<InnerCompleteReader<A, R>>>;

fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
use CompleteReader::*;

match self {
AlreadyComplete(r) => r.poll_next(cx),
NeedSeekable(r) => r.poll_next(cx),
NeedStreamable(r) => r.poll_next(cx),
NeedBoth(r) => r.poll_next(cx),
}
}
}

impl<A, R> oio::BlockingRead for CompleteReader<A, R>
where
A: Accessor<BlockingReader = R>,
R: oio::BlockingRead,
{
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
use CompleteReader::*;

match self {
AlreadyComplete(r) => r.read(buf),
NeedSeekable(r) => r.read(buf),
NeedStreamable(r) => r.read(buf),
NeedBoth(r) => r.read(buf),
}
}

fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
use CompleteReader::*;

match self {
AlreadyComplete(r) => r.seek(pos),
NeedSeekable(r) => r.seek(pos),
NeedStreamable(r) => r.seek(pos),
NeedBoth(r) => r.seek(pos),
}
}

fn next(&mut self) -> Option<Result<Bytes>> {
use CompleteReader::*;

match self {
AlreadyComplete(r) => r.next(),
NeedSeekable(r) => r.next(),
NeedStreamable(r) => r.next(),
NeedBoth(r) => r.next(),
}
}
}
type InnerCompleteReader<A, R> = FourWaysReader<
LazyReader<A, R>,
FileReader<A, R>,
RangeReader<A, R>,
StreamableReader<RangeReader<A, R>>,
>;

pub enum CompleteLister<A: Accessor, P> {
AlreadyComplete(P),
Expand Down
2 changes: 1 addition & 1 deletion core/src/raw/oio/read/compose_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl<ONE: oio::BlockingRead, TWO: oio::BlockingRead> oio::BlockingRead for TwoWa
}
}

/// FourWaysReader is used to implement [`Read`] based on two ways.
/// FourWaysReader is used to implement [`Read`] based on four ways.
///
/// Users can wrap two different readers together.
pub enum FourWaysReader<ONE, TWO, THREE, FOUR> {
Expand Down

0 comments on commit b9dd733

Please sign in to comment.