Skip to content

Commit

Permalink
wip: allow scanning data in out of order (#874)
Browse files Browse the repository at this point in the history
* wip: allow scanning data in various orders

* fix lifetime issues

* adjust the API to get existing tests to pass

* test: test the unordered code path

* feat: expose unordered_scan in Python

* fix: keep debug

* test: add test to enforce scan ordering

* refactor: add fragment readahead parameter and rename to scan_in_order

* fix: enforce batch_readahead across fragments
  • Loading branch information
wjones127 authored May 18, 2023
1 parent 6af670a commit 8902a74
Show file tree
Hide file tree
Showing 6 changed files with 328 additions and 59 deletions.
47 changes: 47 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ def scanner(
offset: Optional[int] = None,
nearest: Optional[dict] = None,
batch_readahead: Optional[int] = None,
fragment_readahead: Optional[int] = None,
scan_in_order: bool = True,
) -> LanceScanner:
"""Return a Scanner that can support various pushdowns.
Expand Down Expand Up @@ -108,6 +110,12 @@ def scanner(
}`
batch_readahead: int, optional
The number of batches to read ahead.
fragment_readahead: int, optional
The number of fragments to read ahead.
scan_in_order: bool, default True
Whether to read the fragments and batches in order. If false,
throughput may be higher, but batches will be returned out of order
and memory use might increase.
Notes
-----
Expand Down Expand Up @@ -138,6 +146,7 @@ def scanner(
.offset(offset)
.nearest(**(nearest or {}))
.batch_readahead(batch_readahead)
.fragment_readahead(fragment_readahead)
.to_scanner()
)

Expand All @@ -156,6 +165,8 @@ def to_table(
offset: Optional[int] = None,
nearest: Optional[dict] = None,
batch_readahead: Optional[int] = None,
fragment_readahead: Optional[int] = None,
scan_in_order: bool = True,
) -> pa.Table:
"""Read the data into memory as a pyarrow Table.
Expand Down Expand Up @@ -183,6 +194,14 @@ def to_table(
"nprobes": 1,
"refine_factor": 1
}
batch_readahead: int, optional
The number of batches to read ahead.
fragment_readahead: int, optional
The number of fragments to read ahead.
scan_in_order: bool, default True
Whether to read the fragments and batches in order. If false,
throughput may be higher, but batches will be returned out of order
and memory use might increase.
Notes
-----
Expand All @@ -197,6 +216,8 @@ def to_table(
offset=offset,
nearest=nearest,
batch_readahead=batch_readahead,
fragment_readahead=fragment_readahead,
scan_in_order=scan_in_order,
).to_table()

@property
Expand Down Expand Up @@ -235,6 +256,8 @@ def to_batches(
offset: Optional[int] = None,
nearest: Optional[dict] = None,
batch_readahead: Optional[int] = None,
fragment_readahead: Optional[int] = None,
scan_in_order: bool = True,
**kwargs,
) -> Iterator[pa.RecordBatch]:
"""Read the dataset as materialized record batches.
Expand All @@ -255,6 +278,8 @@ def to_batches(
offset=offset,
nearest=nearest,
batch_readahead=batch_readahead,
fragment_readahead=fragment_readahead,
scan_in_order=scan_in_order,
).to_batches()

def take(self, indices, **kwargs):
Expand Down Expand Up @@ -512,12 +537,31 @@ def __init__(self, ds: LanceDataset):
self._columns = None
self._nearest = None
self._batch_readahead = None
self._fragment_readahead = None
self._scan_in_order = True

def batch_readahead(self, nbatches: Optional[int] = None) -> ScannerBuilder:
if nbatches is not None and int(nbatches) < 0:
raise ValueError("batch_readahead must be non-negative")
self._batch_readahead = nbatches
return self

def fragment_readahead(self, nfragments: Optional[int] = None) -> ScannerBuilder:
if nfragments is not None and int(nfragments) < 0:
raise ValueError("fragment_readahead must be non-negative")
self._fragment_readahead = nfragments
return self

def scan_in_order(self, scan_in_order: bool = True) -> ScannerBuilder:
"""
Whether to scan the dataset in order of fragments and batches.
If set to False, the scanner may read fragments concurrently and yield
batches out of order. This may improve performance since it allows more
concurrency in the scan, but can also use more memory.
"""
self._scan_in_order = scan_in_order
return self

def limit(self, n: int = 0) -> ScannerBuilder:
if int(n) < 0:
Expand Down Expand Up @@ -589,6 +633,8 @@ def to_scanner(self) -> LanceScanner:
self._offset,
self._nearest,
self._batch_readahead,
self._fragment_readahead,
self._scan_in_order,
)
return LanceScanner(scanner, self.ds)

Expand Down Expand Up @@ -641,6 +687,7 @@ def from_fragments(fragments: Iterable[LanceFragment], **kwargs):
-------
LanceScanner
"""
# TODO: can we take kwargs here for the scanner builder?
# TODO: peek instead of list
if not isinstance(fragments, list):
fragments = list(iter(fragments))
Expand Down
4 changes: 4 additions & 0 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ def test_to_batches(tmp_path: Path):
batches = dataset.to_batches(batch_readahead=20)
assert pa.Table.from_batches(batches) == table

unordered_batches = dataset.to_batches(batch_readahead=20, scan_in_order=False)
sorted_batches = pa.Table.from_batches(unordered_batches).sort_by("a")
assert sorted_batches == table


def test_pickle(tmp_path: Path):
table = pa.Table.from_pydict({"a": range(100), "b": range(100)})
Expand Down
9 changes: 9 additions & 0 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ impl Dataset {
offset: Option<i64>,
nearest: Option<&PyDict>,
batch_readahead: Option<usize>,
fragment_readahead: Option<usize>,
scan_in_order: Option<bool>,
) -> PyResult<Scanner> {
let mut scanner: LanceScanner = self_.ds.scan();
if let Some(c) = columns {
Expand All @@ -156,6 +158,13 @@ impl Dataset {
if let Some(batch_readahead) = batch_readahead {
scanner.batch_readahead(batch_readahead);
}

if let Some(fragment_readahead) = fragment_readahead {
scanner.fragment_readahead(fragment_readahead);
}

scanner.scan_in_order(scan_in_order.unwrap_or(true));

if let Some(nearest) = nearest {
let column = nearest
.get_item("column")
Expand Down
Loading

0 comments on commit 8902a74

Please sign in to comment.