diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 12a9aa44c996..40c095689048 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -18,21 +18,22 @@ use std::cmp; use std::fmt::Debug; use std::fmt::Formatter; -use std::io; use std::sync::Arc; use std::task::ready; use std::task::Context; use std::task::Poll; use async_trait::async_trait; -use bytes::Bytes; +use crate::raw::oio::BufferReader; use crate::raw::oio::FileReader; use crate::raw::oio::FlatLister; +use crate::raw::oio::FourWaysReader; use crate::raw::oio::LazyReader; use crate::raw::oio::PrefixLister; use crate::raw::oio::RangeReader; use crate::raw::oio::StreamableReader; +use crate::raw::oio::TwoWaysReader; use crate::raw::*; use crate::*; @@ -293,28 +294,35 @@ impl CompleteAccessor { let seekable = capability.read_can_seek; let streamable = capability.read_can_next; + let buffer_cap = args.buffer(); - match (seekable, streamable) { + let r = match (seekable, streamable) { (true, true) => { let r = LazyReader::new(self.inner.clone(), path, args); - Ok((RpRead::new(), CompleteReader::AlreadyComplete(r))) + InnerCompleteReader::One(r) } (true, false) => { let r = FileReader::new(self.inner.clone(), path, args); - - Ok((RpRead::new(), CompleteReader::NeedStreamable(r))) + InnerCompleteReader::Two(r) } _ => { let r = RangeReader::new(self.inner.clone(), path, args); if streamable { - Ok((RpRead::new(), CompleteReader::NeedSeekable(r))) + InnerCompleteReader::Three(r) } else { let r = oio::into_streamable_read(r, 256 * 1024); - Ok((RpRead::new(), CompleteReader::NeedBoth(r))) + InnerCompleteReader::Four(r) } } - } + }; + + let r = match buffer_cap { + None => CompleteReader::One(r), + Some(cap) => CompleteReader::Two(BufferReader::new(r, cap)), + }; + + Ok((RpRead::new(), r)) } fn complete_blocking_read( @@ -329,27 +337,35 @@ impl CompleteAccessor { let seekable = capability.read_can_seek; let streamable = capability.read_can_next; + let buffer_cap = args.buffer(); - match (seekable, streamable) { + let r = match (seekable, streamable) { (true, true) => { let r = LazyReader::new(self.inner.clone(), path, args); - Ok((RpRead::new(), CompleteReader::AlreadyComplete(r))) + InnerCompleteReader::One(r) } (true, false) => { let r = FileReader::new(self.inner.clone(), path, args); - Ok((RpRead::new(), CompleteReader::NeedStreamable(r))) + InnerCompleteReader::Two(r) } _ => { let r = RangeReader::new(self.inner.clone(), path, args); if streamable { - Ok((RpRead::new(), CompleteReader::NeedSeekable(r))) + InnerCompleteReader::Three(r) } else { let r = oio::into_streamable_read(r, 256 * 1024); - Ok((RpRead::new(), CompleteReader::NeedBoth(r))) + InnerCompleteReader::Four(r) } } - } + }; + + let r = match buffer_cap { + None => CompleteReader::One(r), + Some(cap) => CompleteReader::Two(BufferReader::new(r, cap)), + }; + + Ok((RpRead::new(), r)) } async fn complete_list( @@ -659,91 +675,15 @@ impl LayeredAccessor for CompleteAccessor { } } -pub enum CompleteReader { - AlreadyComplete(LazyReader), - NeedSeekable(RangeReader), - NeedStreamable(FileReader), - NeedBoth(StreamableReader>), -} - -impl oio::Read for CompleteReader -where - A: Accessor, - R: oio::Read, -{ - #[inline] - fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { - use CompleteReader::*; - - match self { - AlreadyComplete(r) => r.poll_read(cx, buf), - NeedSeekable(r) => r.poll_read(cx, buf), - NeedStreamable(r) => r.poll_read(cx, buf), - NeedBoth(r) => r.poll_read(cx, buf), - } - } - - fn poll_seek(&mut self, cx: &mut Context<'_>, pos: io::SeekFrom) -> Poll> { - use CompleteReader::*; - - match self { - AlreadyComplete(r) => r.poll_seek(cx, pos), - NeedSeekable(r) => r.poll_seek(cx, pos), - NeedStreamable(r) => r.poll_seek(cx, pos), - NeedBoth(r) => r.poll_seek(cx, pos), - } - } +pub type CompleteReader = + TwoWaysReader, BufferReader>>; - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - use CompleteReader::*; - - match self { - AlreadyComplete(r) => r.poll_next(cx), - NeedSeekable(r) => r.poll_next(cx), - NeedStreamable(r) => r.poll_next(cx), - NeedBoth(r) => r.poll_next(cx), - } - } -} - -impl oio::BlockingRead for CompleteReader -where - A: Accessor, - R: oio::BlockingRead, -{ - fn read(&mut self, buf: &mut [u8]) -> Result { - use CompleteReader::*; - - match self { - AlreadyComplete(r) => r.read(buf), - NeedSeekable(r) => r.read(buf), - NeedStreamable(r) => r.read(buf), - NeedBoth(r) => r.read(buf), - } - } - - fn seek(&mut self, pos: io::SeekFrom) -> Result { - use CompleteReader::*; - - match self { - AlreadyComplete(r) => r.seek(pos), - NeedSeekable(r) => r.seek(pos), - NeedStreamable(r) => r.seek(pos), - NeedBoth(r) => r.seek(pos), - } - } - - fn next(&mut self) -> Option> { - use CompleteReader::*; - - match self { - AlreadyComplete(r) => r.next(), - NeedSeekable(r) => r.next(), - NeedStreamable(r) => r.next(), - NeedBoth(r) => r.next(), - } - } -} +type InnerCompleteReader = FourWaysReader< + LazyReader, + FileReader, + RangeReader, + StreamableReader>, +>; pub enum CompleteLister { AlreadyComplete(P), diff --git a/core/src/raw/oio/read/compose_read.rs b/core/src/raw/oio/read/compose_read.rs index 7b4722c50b3f..b398c6fee849 100644 --- a/core/src/raw/oio/read/compose_read.rs +++ b/core/src/raw/oio/read/compose_read.rs @@ -78,7 +78,7 @@ impl oio::BlockingRead for TwoWa } } -/// FourWaysReader is used to implement [`Read`] based on two ways. +/// FourWaysReader is used to implement [`Read`] based on four ways. /// /// Users can wrap two different readers together. pub enum FourWaysReader {