Skip to content

Commit

Permalink
fix: scan out of range (#3339)
Browse files Browse the repository at this point in the history
if we scan _rowid or _rowaddr without other columns, an error will come
up.

```
thread 'tokio-runtime-worker' panicked at rust/lance/src/dataset/[fragment.rs:1986](http://fragment.rs:1986/):18:
called `Result::unwrap()` on an `Err` value: InvalidInput { source: "Cannot slice from 0 with length 333275 given a selection of size 10", location: Location { file: "rust/lance-io/src/[lib.rs](http://lib.rs/)", line: 151, column: 27 } }
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread '<unnamed>' panicked at rust/lance/src/io/exec/[scan.rs:236](http://scan.rs:236/):40:
called `Result::unwrap()` on an `Err` value: JoinError::Panic(Id(13), "called `Result::unwrap()` on an `Err` value: InvalidInput { source: \"Cannot slice from 0 with length 333275 given a selection of size 10\", location: Location { file: \"rust/lance-io/src/[lib.rs](http://lib.rs/)\", line: 151, column: 27 } }", ...)
thread '<unnamed>' panicked at core/src/[panicking.rs:221](http://panicking.rs:221/):5:
```
  • Loading branch information
chenkovsky authored Jan 10, 2025
1 parent f478c46 commit 29db3bb
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 6 deletions.
10 changes: 10 additions & 0 deletions rust/lance-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,16 @@ impl ReadBatchParams {
)),
}
}

pub fn to_offsets_total(&self, total: u32) -> PrimitiveArray<UInt32Type> {
match self {
Self::Indices(indices) => indices.clone(),
Self::Range(r) => UInt32Array::from_iter_values(r.start as u32..r.end as u32),
Self::RangeFull => UInt32Array::from_iter_values(0_u32..total),
Self::RangeTo(r) => UInt32Array::from_iter_values(0..r.end as u32),
Self::RangeFrom(r) => UInt32Array::from_iter_values(r.start as u32..total),
}
}
}

#[cfg(test)]
Expand Down
72 changes: 66 additions & 6 deletions rust/lance/src/dataset/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1981,12 +1981,7 @@ impl FragmentReader {
let merged = if self.with_row_addr as usize + self.with_row_id as usize
== self.output_schema.fields.len()
{
let selected_rows = params
.slice(0, total_num_rows as usize)
.unwrap()
.to_offsets()
.unwrap()
.len();
let selected_rows = params.to_offsets_total(total_num_rows).len();
let tasks = (0..selected_rows)
.step_by(batch_size as usize)
.map(move |offset| {
Expand Down Expand Up @@ -2389,6 +2384,71 @@ mod tests {
}
}

#[tokio::test]
async fn test_rowid_rowaddr_only() {
let test_dir = tempdir().unwrap();
let test_uri = test_dir.path().to_str().unwrap();
// Creates 400 rows in 10 fragments
let mut dataset = create_dataset(test_uri, LanceFileVersion::Legacy).await;
// Delete last 20 rows in first fragment
dataset.delete("i >= 20").await.unwrap();
// Last fragment has 20 rows but 40 addressable rows
let fragment = &dataset.get_fragments()[0];
assert_eq!(fragment.metadata.num_rows().unwrap(), 20);

// Test with take_range (all rows addressable)
for (with_row_id, with_row_address) in [(false, true), (true, false), (true, true)] {
let reader = fragment
.open(
&fragment.schema().project::<&str>(&[]).unwrap(),
FragReadConfig::default()
.with_row_id(with_row_id)
.with_row_address(with_row_address),
None,
)
.await
.unwrap();
for valid_range in [0..40, 20..40] {
reader
.take_range(valid_range, 100)
.unwrap()
.buffered(1)
.try_collect::<Vec<_>>()
.await
.unwrap();
}
for invalid_range in [0..41, 41..42] {
assert!(reader.take_range(invalid_range, 100).is_err());
}
}

// Test with read_range (only non-deleted rows addressable)
for (with_row_id, with_row_address) in [(false, true), (true, false), (true, true)] {
let reader = fragment
.open(
&fragment.schema().project::<&str>(&[]).unwrap(),
FragReadConfig::default()
.with_row_id(with_row_id)
.with_row_address(with_row_address),
None,
)
.await
.unwrap();
for valid_range in [0..20, 0..10, 10..20] {
reader
.read_range(valid_range, 100)
.unwrap()
.buffered(1)
.try_collect::<Vec<_>>()
.await
.unwrap();
}
for invalid_range in [0..21, 21..22] {
assert!(reader.read_range(invalid_range, 100).is_err());
}
}
}

#[rstest]
#[tokio::test]
async fn test_fragment_take_range_deletions(
Expand Down

0 comments on commit 29db3bb

Please sign in to comment.