Skip to content

Commit

Permalink
fix byte range and iterator bug (#422)
Browse files Browse the repository at this point in the history
* fix byte range and iterator bug

* projection

* fmt
  • Loading branch information
changhiskhan authored Jan 9, 2023
1 parent 25cf897 commit bf5cb63
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 4 deletions.
4 changes: 3 additions & 1 deletion rust/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ impl<'a> Scanner<'a> {
// Only support 1 data file (no schema evolution for now);
assert!(fragment.files.len() == 1);
let data_file = &fragment.files[0];
self.fragment_idx += 1;
let path = self.dataset.data_dir().child(data_file.path.clone());
if self.reader.is_none() {
self.reader = Some(
Expand All @@ -94,6 +93,9 @@ impl<'a> Scanner<'a> {
.await
.unwrap(),
);
self.reader
.as_mut()
.map(|reader| reader.set_projection(self.projections.clone()));
}

if let Some(reader) = &self.reader {
Expand Down
22 changes: 19 additions & 3 deletions rust/src/io/object_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,26 @@ impl<'a> ObjectReader<'a> {
}

let range = pos..min(pos + self.prefetch_size, file_size);
let buf = self.object_store.inner.get_range(&self.path, range).await?;

let buf = self
.object_store
.inner
.get_range(&self.path, range.clone())
.await?;
let msg_len = LittleEndian::read_u32(&buf) as usize;
Ok(M::decode(&buf[4..4 + msg_len])?)

if msg_len + 4 > buf.len() {
let remaining_range = range.end..min(4 + pos + msg_len, file_size);
let remaining_bytes = self
.object_store
.inner
.get_range(&self.path, remaining_range)
.await?;
let buf = [buf, remaining_bytes].concat();
assert!(buf.len() >= msg_len + 4);
Ok(M::decode(&buf[4..4 + msg_len])?)
} else {
Ok(M::decode(&buf[4..4 + msg_len])?)
}
}

pub async fn get_range(&self, range: Range<usize>) -> Result<Bytes> {
Expand Down

0 comments on commit bf5cb63

Please sign in to comment.