-
Notifications
You must be signed in to change notification settings - Fork 508
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(core/types): Implement concurrent read for blocking read #4545
base: main
Are you sure you want to change the base?
Conversation
@Xuanwo Plz take a review for this design. |
let (interval_size, mut intervals) = end | ||
.map(|end| { | ||
// let interval_size = (end - start + concurrent - 1) / concurrent; | ||
let interval_size = (end - start) / concurrent; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interval_size should be decided by chunk
.
274fa3d
to
5a597d3
Compare
d7640e3
to
9720e2b
Compare
/// BlockingReader is designed to read data from given path in an blocking | ||
/// manner. | ||
pub struct BlockingReader { | ||
pub(crate) inner: oio::BlockingReader, | ||
pub(crate) inner: Arc<dyn oio::BlockingRead>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please change oio::BlockingReader
to Arc<dyn oio::BlockingRead>
, maybe worth a seperate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you like to submit this PR first?
let mut bufs = Vec::with_capacity(self.concurrent); | ||
let interval_size = self.chunk.unwrap_or(4 * 1024 * 1024) as u64; | ||
|
||
let intervals: Vec<(u64, u64)> = (0..self.concurrent as u64) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't calcluate ranges at once.
We can build a RangeIterator
that only generate the next range to read. BufferIterator
stores RangeIterator
, every time users call next
, BufferIterator
fetchs a new range and than read as a new buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't calcluate ranges at once.
How can this be paralleled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can this be paralleled?
After re-visit the rayon API docs, I found that we can't call next
on a ParallelIterator
directly. So we can change the RangeIterator
to return a ParallelIterator
instead.
Every time users call next
on BufferIterator
, we build a new ParallelIterator
, extend into buffer and yield from it.
The whole workflow looks like:
RangeIterator::next() -> Range<u64>
struct BufferIterator {
it: RangeIterator,
buf: Vec<Buffer>
}
impl Iterator for BufferIterator {
type Item = Result<Buffer>;
fn next(&mut self) -> Option<Self::Item> {
// fecth from buffer first.
// Build a new ParallelIterator and collect it.
// Return None if ranges have been all consumed.
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Xuanwo Plz review.
9720e2b
to
d500961
Compare
/// BlockingReader is designed to read data from given path in an blocking | ||
/// manner. | ||
pub struct BlockingReader { | ||
pub(crate) inner: oio::BlockingReader, | ||
pub(crate) inner: Arc<dyn oio::BlockingRead>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you like to submit this PR first?
@@ -74,24 +84,26 @@ impl BlockingReader { | |||
} | |||
} | |||
|
|||
let iter = BufferIterator::new( | |||
self.inner.clone(), | |||
self.options.chunk(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can accpet an OpReader
directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you like to submit this PR first?
Do you mean submit a new PR to change oio::BlockingReader to Arc first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean submit a new PR to change oio::BlockingReader to Arc first?
Yes
offset += n as u64; | ||
if Some(offset) == end { | ||
return Ok(bufs.into_iter().flatten().collect()); | ||
for buffer in iter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can use collect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How to return err early when using collect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How to return err early when using collect?
collect will exit once error happened. See impl<A, E, V> FromIterator<Result<A, E>> for Result<V, E>
:
Takes each element in the Iterator: if it is an Err, no further elements are taken, and the Err is returned. Should no Err occur, a container with the values of each Result is returned.
return Ok(read as _); | ||
let mut total_len = 0; | ||
for buffer in iter { | ||
match buffer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please try use if-let-else:
let mut read = 0;
loop {
let Some(bs) = iter.try_next()? else {
return Ok(read);
};
read += bs.len();
buf.put(bs);
}
} | ||
|
||
/// Convert reader into [`StdReader`] which implements [`futures::AsyncRead`], | ||
/// [`futures::AsyncSeek`] and [`futures::AsyncBufRead`]. | ||
#[inline] | ||
pub fn into_std_read(self, range: Range<u64>) -> StdReader { | ||
// TODO: the capacity should be decided by services. | ||
StdReader::new(self.inner, range) | ||
StdReader::new(self.inner.clone(), range) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should pass OpReader?
|
||
let mut bufs = Vec::with_capacity(self.concurrent); | ||
|
||
let intervals: Vec<Range<u64>> = (0..self.concurrent) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should put self.finished
inside the RangeIterator
should it can know when to stop returning new ranges.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should put
self.finished
inside theRangeIterator
should it can know when to stop returning new ranges.
When end
is None, We only know end when after read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When
end
is None, We only know end when after read.
That's why we need to store finished
in RangeIterator
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean read
also put into RangeIterator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Xuanwo I submit a new one, is this what you want?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Close but not what I want. Can I push changes to this PR directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Close but not what I want. Can I push changes to this PR directly?
Of course, thank for your patient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Xuanwo Do you still remember this?
661ba73
to
40322ce
Compare
40322ce
to
cb882aa
Compare
No description provided.