diff --git a/rust/src/dataset/scanner.rs b/rust/src/dataset/scanner.rs index 7b34727c8c..c598f2a618 100644 --- a/rust/src/dataset/scanner.rs +++ b/rust/src/dataset/scanner.rs @@ -82,7 +82,6 @@ impl<'a> Scanner<'a> { // Only support 1 data file (no schema evolution for now); assert!(fragment.files.len() == 1); let data_file = &fragment.files[0]; - self.fragment_idx += 1; let path = self.dataset.data_dir().child(data_file.path.clone()); if self.reader.is_none() { self.reader = Some( @@ -94,6 +93,9 @@ impl<'a> Scanner<'a> { .await .unwrap(), ); + self.reader + .as_mut() + .map(|reader| reader.set_projection(self.projections.clone())); } if let Some(reader) = &self.reader { diff --git a/rust/src/io/object_reader.rs b/rust/src/io/object_reader.rs index 575c85f23c..28541bc93f 100644 --- a/rust/src/io/object_reader.rs +++ b/rust/src/io/object_reader.rs @@ -90,10 +90,26 @@ impl<'a> ObjectReader<'a> { } let range = pos..min(pos + self.prefetch_size, file_size); - let buf = self.object_store.inner.get_range(&self.path, range).await?; - + let buf = self + .object_store + .inner + .get_range(&self.path, range.clone()) + .await?; let msg_len = LittleEndian::read_u32(&buf) as usize; - Ok(M::decode(&buf[4..4 + msg_len])?) + + if msg_len + 4 > buf.len() { + let remaining_range = range.end..min(4 + pos + msg_len, file_size); + let remaining_bytes = self + .object_store + .inner + .get_range(&self.path, remaining_range) + .await?; + let buf = [buf, remaining_bytes].concat(); + assert!(buf.len() >= msg_len + 4); + Ok(M::decode(&buf[4..4 + msg_len])?) + } else { + Ok(M::decode(&buf[4..4 + msg_len])?) + } } pub async fn get_range(&self, range: Range) -> Result {