Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow setting offset without limit #892

Merged
merged 3 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def scanner(
self,
columns: Optional[list[str]] = None,
filter: Optional[Union[str, pa.compute.Expression]] = None,
limit: int = 0,
limit: Optional[int] = None,
offset: Optional[int] = None,
nearest: Optional[dict] = None,
batch_readahead: Optional[int] = None,
Expand All @@ -96,8 +96,8 @@ def scanner(
Currently only >, <, >=, <=, ==, !=, |, & are supported.
is_null, is_valid, ~, and others are not yet supported.
Specifying these will result in an expression parsing error
limit: int, default 0
Fetch up to this many rows. All rows if 0 or unspecified.
limit: int, default None
Fetch up to this many rows. All rows if None or unspecified.
offset: int, default None
Fetch starting with this row. 0 if None or unspecified.
nearest: dict, default None
Expand Down Expand Up @@ -168,7 +168,7 @@ def to_table(
self,
columns: Optional[list[str]] = None,
filter: Optional[Union[str, pa.compute.Expression]] = None,
limit: int = 0,
limit: Optional[int] = None,
offset: Optional[int] = None,
nearest: Optional[dict] = None,
batch_readahead: Optional[int] = None,
Expand All @@ -187,8 +187,8 @@ def to_table(
Currently only >, <, >=, <=, ==, !=, |, & are supported.
is_null, is_valid, ~, and others are not yet supported.
Specifying these will result in an expression parsing error
limit: int, default 0
Fetch up to this many rows. All rows if 0 or unspecified.
limit: int, default None
Fetch up to this many rows. All rows if None or unspecified.
offset: int, default None
Fetch starting with this row. 0 if None or unspecified.
nearest: dict, default None
Expand Down Expand Up @@ -259,7 +259,7 @@ def to_batches(
self,
columns: Optional[list[str]] = None,
filter: Optional[Union[str, pa.compute.Expression]] = None,
limit: int = 0,
limit: Optional[int]= None,
offset: Optional[int] = None,
nearest: Optional[dict] = None,
batch_readahead: Optional[int] = None,
Expand Down Expand Up @@ -571,8 +571,8 @@ def scan_in_order(self, scan_in_order: bool = True) -> ScannerBuilder:
self._scan_in_order = scan_in_order
return self

def limit(self, n: int = 0) -> ScannerBuilder:
if int(n) < 0:
def limit(self, n: Optional[int] = None) -> ScannerBuilder:
if n is not None and int(n) < 0:
raise ValueError("Limit must be non-negative")
self._limit = n
return self
Expand Down
16 changes: 16 additions & 0 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,22 @@ def test_filter(tmp_path: Path):
assert actual_tab == pa.Table.from_pydict({"a": range(51, 100)})


def test_limit_offset(tmp_path: Path):
table = pa.Table.from_pydict({"a": range(100), "b": range(100)})
base_dir = tmp_path / "test"
lance.write_dataset(table, base_dir)
dataset = lance.dataset(base_dir)

# test just limit
assert dataset.to_table(limit=10) == table.slice(0, 10)

# test just offset
assert dataset.to_table(offset=10) == table.slice(10, 100)

# test both
assert dataset.to_table(offset=10, limit=10) == table.slice(10, 10)


def test_relative_paths(tmp_path: Path):
# relative paths get coerced to the full absolute path
current_dir = os.getcwd()
Expand Down
10 changes: 5 additions & 5 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,11 @@ impl Dataset {
.filter(f.as_str())
.map_err(|err| PyValueError::new_err(err.to_string()))?;
}
if let Some(limit) = limit {
scanner
.limit(limit, offset)
.map_err(|err| PyValueError::new_err(err.to_string()))?;
}

scanner
.limit(limit, offset)
.map_err(|err| PyValueError::new_err(err.to_string()))?;

if let Some(batch_readahead) = batch_readahead {
scanner.batch_readahead(batch_readahead);
}
Expand Down
10 changes: 5 additions & 5 deletions python/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,11 @@ impl FileFragment {
.filter(&f)
.map_err(|err| PyValueError::new_err(err.to_string()))?;
}
if let Some(l) = limit {
scanner
.limit(l, offset)
.map_err(|err| PyValueError::new_err(err.to_string()))?;
}

scanner
.limit(limit, offset)
.map_err(|err| PyValueError::new_err(err.to_string()))?;

let scn = Arc::new(scanner);
Ok(Scanner::new(scn, rt))
}
Expand Down
2 changes: 1 addition & 1 deletion rust/src/bin/lq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ async fn main() -> Result<()> {
Commands::Query { uri, n } => {
let dataset = Dataset::open(uri).await.unwrap();
let mut scanner = dataset.scan();
scanner.limit(*n, None).unwrap();
scanner.limit(Some(*n), None).unwrap();
let stream = scanner.try_into_stream().await.unwrap();
let batch: Vec<RecordBatch> = stream.take(1).try_collect::<Vec<_>>().await.unwrap();

Expand Down
13 changes: 9 additions & 4 deletions rust/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,16 +216,21 @@ impl Scanner {
}

/// Set limit and offset.
pub fn limit(&mut self, limit: i64, offset: Option<i64>) -> Result<&mut Self> {
if limit < 0 {
///
/// If offset is set, the first offset rows will be skipped. If limit is set,
/// only the provided number of rows will be returned. These can be set
/// independently. For example, setting offset to 10 and limit to None will
/// skip the first 10 rows and return the rest of the rows in the dataset.
pub fn limit(&mut self, limit: Option<i64>, offset: Option<i64>) -> Result<&mut Self> {
if limit.unwrap_or_default() < 0 {
return Err(Error::IO("Limit must be non-negative".to_string()));
}
if let Some(off) = offset {
if off < 0 {
return Err(Error::IO("Offset must be non-negative".to_string()));
}
}
self.limit = Some(limit);
self.limit = limit;
self.offset = offset;
Ok(self)
}
Expand Down Expand Up @@ -769,7 +774,7 @@ mod test {

let dataset = Dataset::open(path).await.unwrap();
let mut scanner = dataset.scan();
scanner.limit(2, Some(19)).unwrap();
scanner.limit(Some(2), Some(19)).unwrap();
wjones127 marked this conversation as resolved.
Show resolved Hide resolved
let actual_batches: Vec<RecordBatch> = scanner
.try_into_stream()
.await
Expand Down