From bdc9fc59dcdde02b5c973c3eb2cda3573f5a1d17 Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Sun, 5 Jan 2025 18:50:49 +0800 Subject: [PATCH 1/3] fix: scan out of range --- rust/lance-io/src/lib.rs | 12 +++++ rust/lance/src/dataset/fragment.rs | 72 +++++++++++++++++++++++++++--- 2 files changed, 78 insertions(+), 6 deletions(-) diff --git a/rust/lance-io/src/lib.rs b/rust/lance-io/src/lib.rs index 8e7a7694d8..c6fc73bc81 100644 --- a/rust/lance-io/src/lib.rs +++ b/rust/lance-io/src/lib.rs @@ -200,6 +200,18 @@ impl ReadBatchParams { )), } } + + pub fn to_offsets_total(&self, total: u32) -> Result> { + match self { + Self::Indices(indices) => Ok(indices.clone()), + Self::Range(r) => Ok(UInt32Array::from(Vec::from_iter( + r.start as u32..r.end as u32, + ))), + Self::RangeFull => Ok(UInt32Array::from(Vec::from_iter(0 as u32..total))), + Self::RangeTo(r) => Ok(UInt32Array::from(Vec::from_iter(0..r.end as u32))), + Self::RangeFrom(r) => Ok(UInt32Array::from(Vec::from_iter(r.start as u32..total))), + } + } } #[cfg(test)] diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index a3aa9af1c7..ec87bcedf9 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -1981,12 +1981,7 @@ impl FragmentReader { let merged = if self.with_row_addr as usize + self.with_row_id as usize == self.output_schema.fields.len() { - let selected_rows = params - .slice(0, total_num_rows as usize) - .unwrap() - .to_offsets() - .unwrap() - .len(); + let selected_rows = params.to_offsets_total(total_num_rows).unwrap().len(); let tasks = (0..selected_rows) .step_by(batch_size as usize) .map(move |offset| { @@ -2389,6 +2384,71 @@ mod tests { } } + #[tokio::test] + async fn test_rowid_rowaddr_only() { + let test_dir = tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + // Creates 400 rows in 10 fragments + let mut dataset = create_dataset(test_uri, LanceFileVersion::Legacy).await; + // Delete last 20 rows in first fragment + dataset.delete("i >= 20").await.unwrap(); + // Last fragment has 20 rows but 40 addressable rows + let fragment = &dataset.get_fragments()[0]; + assert_eq!(fragment.metadata.num_rows().unwrap(), 20); + + // Test with take_range (all rows addressable) + for (with_row_id, with_row_address) in [(false, true), (true, false), (true, true)] { + let reader = fragment + .open( + &fragment.schema().project::<&str>(&[]).unwrap(), + FragReadConfig::default() + .with_row_id(with_row_id) + .with_row_address(with_row_address), + None, + ) + .await + .unwrap(); + for valid_range in [0..40, 20..40] { + reader + .take_range(valid_range, 100) + .unwrap() + .buffered(1) + .try_collect::>() + .await + .unwrap(); + } + for invalid_range in [0..41, 41..42] { + assert!(reader.take_range(invalid_range, 100).is_err()); + } + } + + // Test with read_range (only non-deleted rows addressable) + for (with_row_id, with_row_address) in [(false, true), (true, false), (true, true)] { + let reader = fragment + .open( + &fragment.schema().project::<&str>(&[]).unwrap(), + FragReadConfig::default() + .with_row_id(with_row_id) + .with_row_address(with_row_address), + None, + ) + .await + .unwrap(); + for valid_range in [0..20, 0..10, 10..20] { + reader + .read_range(valid_range, 100) + .unwrap() + .buffered(1) + .try_collect::>() + .await + .unwrap(); + } + for invalid_range in [0..21, 21..22] { + assert!(reader.read_range(invalid_range, 100).is_err()); + } + } + } + #[rstest] #[tokio::test] async fn test_fragment_take_range_deletions( From cdbc3e7a2612007aefb9e13215337065261cf4ec Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Sun, 5 Jan 2025 18:57:41 +0800 Subject: [PATCH 2/3] format --- rust/lance-io/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance-io/src/lib.rs b/rust/lance-io/src/lib.rs index c6fc73bc81..68a19742c5 100644 --- a/rust/lance-io/src/lib.rs +++ b/rust/lance-io/src/lib.rs @@ -207,7 +207,7 @@ impl ReadBatchParams { Self::Range(r) => Ok(UInt32Array::from(Vec::from_iter( r.start as u32..r.end as u32, ))), - Self::RangeFull => Ok(UInt32Array::from(Vec::from_iter(0 as u32..total))), + Self::RangeFull => Ok(UInt32Array::from(Vec::from_iter(0_u32..total))), Self::RangeTo(r) => Ok(UInt32Array::from(Vec::from_iter(0..r.end as u32))), Self::RangeFrom(r) => Ok(UInt32Array::from(Vec::from_iter(r.start as u32..total))), } From 78391a22de13c879d3f71468e8d32df3e1ecc1ff Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Tue, 7 Jan 2025 07:57:22 +0800 Subject: [PATCH 3/3] update --- rust/lance-io/src/lib.rs | 14 ++++++-------- rust/lance/src/dataset/fragment.rs | 2 +- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/rust/lance-io/src/lib.rs b/rust/lance-io/src/lib.rs index 68a19742c5..b61c820f88 100644 --- a/rust/lance-io/src/lib.rs +++ b/rust/lance-io/src/lib.rs @@ -201,15 +201,13 @@ impl ReadBatchParams { } } - pub fn to_offsets_total(&self, total: u32) -> Result> { + pub fn to_offsets_total(&self, total: u32) -> PrimitiveArray { match self { - Self::Indices(indices) => Ok(indices.clone()), - Self::Range(r) => Ok(UInt32Array::from(Vec::from_iter( - r.start as u32..r.end as u32, - ))), - Self::RangeFull => Ok(UInt32Array::from(Vec::from_iter(0_u32..total))), - Self::RangeTo(r) => Ok(UInt32Array::from(Vec::from_iter(0..r.end as u32))), - Self::RangeFrom(r) => Ok(UInt32Array::from(Vec::from_iter(r.start as u32..total))), + Self::Indices(indices) => indices.clone(), + Self::Range(r) => UInt32Array::from_iter_values(r.start as u32..r.end as u32), + Self::RangeFull => UInt32Array::from_iter_values(0_u32..total), + Self::RangeTo(r) => UInt32Array::from_iter_values(0..r.end as u32), + Self::RangeFrom(r) => UInt32Array::from_iter_values(r.start as u32..total), } } } diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index ec87bcedf9..71f590498d 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -1981,7 +1981,7 @@ impl FragmentReader { let merged = if self.with_row_addr as usize + self.with_row_id as usize == self.output_schema.fields.len() { - let selected_rows = params.to_offsets_total(total_num_rows).unwrap().len(); + let selected_rows = params.to_offsets_total(total_num_rows).len(); let tasks = (0..selected_rows) .step_by(batch_size as usize) .map(move |offset| {