From 18e1ce5f2202bcd1ea30faca052fe4a5b62363e3 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 22 May 2023 13:45:46 -0700 Subject: [PATCH 1/3] feat: allow setting offset without limit --- python/python/lance/dataset.py | 18 +++++++++--------- python/python/tests/test_dataset.py | 16 ++++++++++++++++ python/src/dataset.rs | 10 +++++----- python/src/fragment.rs | 10 +++++----- rust/src/dataset/scanner.rs | 8 ++++---- 5 files changed, 39 insertions(+), 23 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 39d4f22404..16bd4075a2 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -76,7 +76,7 @@ def scanner( self, columns: Optional[list[str]] = None, filter: Optional[Union[str, pa.compute.Expression]] = None, - limit: int = 0, + limit: Optional[int] = None, offset: Optional[int] = None, nearest: Optional[dict] = None, batch_readahead: Optional[int] = None, @@ -96,8 +96,8 @@ def scanner( Currently only >, <, >=, <=, ==, !=, |, & are supported. is_null, is_valid, ~, and others are not yet supported. Specifying these will result in an expression parsing error - limit: int, default 0 - Fetch up to this many rows. All rows if 0 or unspecified. + limit: int, default None + Fetch up to this many rows. All rows if None or unspecified. offset: int, default None Fetch starting with this row. 0 if None or unspecified. nearest: dict, default None @@ -168,7 +168,7 @@ def to_table( self, columns: Optional[list[str]] = None, filter: Optional[Union[str, pa.compute.Expression]] = None, - limit: int = 0, + limit: Optional[int] = None, offset: Optional[int] = None, nearest: Optional[dict] = None, batch_readahead: Optional[int] = None, @@ -187,8 +187,8 @@ def to_table( Currently only >, <, >=, <=, ==, !=, |, & are supported. is_null, is_valid, ~, and others are not yet supported. Specifying these will result in an expression parsing error - limit: int, default 0 - Fetch up to this many rows. All rows if 0 or unspecified. + limit: int, default None + Fetch up to this many rows. All rows if None or unspecified. offset: int, default None Fetch starting with this row. 0 if None or unspecified. nearest: dict, default None @@ -259,7 +259,7 @@ def to_batches( self, columns: Optional[list[str]] = None, filter: Optional[Union[str, pa.compute.Expression]] = None, - limit: int = 0, + limit: Optional[int]= None, offset: Optional[int] = None, nearest: Optional[dict] = None, batch_readahead: Optional[int] = None, @@ -571,8 +571,8 @@ def scan_in_order(self, scan_in_order: bool = True) -> ScannerBuilder: self._scan_in_order = scan_in_order return self - def limit(self, n: int = 0) -> ScannerBuilder: - if int(n) < 0: + def limit(self, n: Optional[int] = None) -> ScannerBuilder: + if n is not None and int(n) < 0: raise ValueError("Limit must be non-negative") self._limit = n return self diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 29a2e8cb2c..88a5dfd68d 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -141,6 +141,22 @@ def test_filter(tmp_path: Path): assert actual_tab == pa.Table.from_pydict({"a": range(51, 100)}) +def test_limit_offset(tmp_path: Path): + table = pa.Table.from_pydict({"a": range(100), "b": range(100)}) + base_dir = tmp_path / "test" + lance.write_dataset(table, base_dir) + dataset = lance.dataset(base_dir) + + # test just limit + assert dataset.to_table(limit=10) == table.slice(0, 10) + + # test just offset + assert dataset.to_table(offset=10) == table.slice(10, 100) + + # test both + assert dataset.to_table(offset=10, limit=10) == table.slice(10, 10) + + def test_relative_paths(tmp_path: Path): # relative paths get coerced to the full absolute path current_dir = os.getcwd() diff --git a/python/src/dataset.rs b/python/src/dataset.rs index d29adc9574..4b730abb29 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -152,11 +152,11 @@ impl Dataset { .filter(f.as_str()) .map_err(|err| PyValueError::new_err(err.to_string()))?; } - if let Some(limit) = limit { - scanner - .limit(limit, offset) - .map_err(|err| PyValueError::new_err(err.to_string()))?; - } + + scanner + .limit(limit, offset) + .map_err(|err| PyValueError::new_err(err.to_string()))?; + if let Some(batch_readahead) = batch_readahead { scanner.batch_readahead(batch_readahead); } diff --git a/python/src/fragment.rs b/python/src/fragment.rs index 8a044318b1..a390481136 100644 --- a/python/src/fragment.rs +++ b/python/src/fragment.rs @@ -143,11 +143,11 @@ impl FileFragment { .filter(&f) .map_err(|err| PyValueError::new_err(err.to_string()))?; } - if let Some(l) = limit { - scanner - .limit(l, offset) - .map_err(|err| PyValueError::new_err(err.to_string()))?; - } + + scanner + .limit(limit, offset) + .map_err(|err| PyValueError::new_err(err.to_string()))?; + let scn = Arc::new(scanner); Ok(Scanner::new(scn, rt)) } diff --git a/rust/src/dataset/scanner.rs b/rust/src/dataset/scanner.rs index a32c1836f5..4b226af74e 100644 --- a/rust/src/dataset/scanner.rs +++ b/rust/src/dataset/scanner.rs @@ -216,8 +216,8 @@ impl Scanner { } /// Set limit and offset. - pub fn limit(&mut self, limit: i64, offset: Option) -> Result<&mut Self> { - if limit < 0 { + pub fn limit(&mut self, limit: Option, offset: Option) -> Result<&mut Self> { + if limit.unwrap_or(0) < 0 { return Err(Error::IO("Limit must be non-negative".to_string())); } if let Some(off) = offset { @@ -225,7 +225,7 @@ impl Scanner { return Err(Error::IO("Offset must be non-negative".to_string())); } } - self.limit = Some(limit); + self.limit = limit; self.offset = offset; Ok(self) } @@ -769,7 +769,7 @@ mod test { let dataset = Dataset::open(path).await.unwrap(); let mut scanner = dataset.scan(); - scanner.limit(2, Some(19)).unwrap(); + scanner.limit(Some(2), Some(19)).unwrap(); let actual_batches: Vec = scanner .try_into_stream() .await From aef080613cbd34e2ac3b684d45e93a4d7859fa7b Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 22 May 2023 13:50:24 -0700 Subject: [PATCH 2/3] fix bin --- rust/src/bin/lq.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/src/bin/lq.rs b/rust/src/bin/lq.rs index 9d4becb532..4610aa36df 100644 --- a/rust/src/bin/lq.rs +++ b/rust/src/bin/lq.rs @@ -118,7 +118,7 @@ async fn main() -> Result<()> { Commands::Query { uri, n } => { let dataset = Dataset::open(uri).await.unwrap(); let mut scanner = dataset.scan(); - scanner.limit(*n, None).unwrap(); + scanner.limit(Some(*n), None).unwrap(); let stream = scanner.try_into_stream().await.unwrap(); let batch: Vec = stream.take(1).try_collect::>().await.unwrap(); From 6b0badf851e403a3123174543307bec51cb9625f Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 22 May 2023 13:54:16 -0700 Subject: [PATCH 3/3] pr feedback --- rust/src/dataset/scanner.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/rust/src/dataset/scanner.rs b/rust/src/dataset/scanner.rs index 4b226af74e..8cf10b9d82 100644 --- a/rust/src/dataset/scanner.rs +++ b/rust/src/dataset/scanner.rs @@ -216,8 +216,13 @@ impl Scanner { } /// Set limit and offset. + /// + /// If offset is set, the first offset rows will be skipped. If limit is set, + /// only the provided number of rows will be returned. These can be set + /// independently. For example, setting offset to 10 and limit to None will + /// skip the first 10 rows and return the rest of the rows in the dataset. pub fn limit(&mut self, limit: Option, offset: Option) -> Result<&mut Self> { - if limit.unwrap_or(0) < 0 { + if limit.unwrap_or_default() < 0 { return Err(Error::IO("Limit must be non-negative".to_string())); } if let Some(off) = offset {