Skip to content

Commit

Permalink
lance dataset that overrides Dataset.scanner and Dataset.head (#158)
Browse files Browse the repository at this point in the history
* lance dataset that overrides Dataset.scanner and Dataset.head

* PR comments
  • Loading branch information
changhiskhan authored Sep 12, 2022
1 parent 0758195 commit cc3e442
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 68 deletions.
52 changes: 4 additions & 48 deletions python/lance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

__version__ = lance.version.__version__

from lance.lib import BuildScanner, LanceFileFormat, WriteTable
from lance.lib import BuildScanner, LanceFileFormat, WriteTable, _wrap_dataset
from lance.types import register_extension_types

if platform.system() == "Linux":
Expand All @@ -34,9 +34,7 @@
__all__ = ["dataset", "write_table", "scanner", "LanceFileFormat", "__version__"]


def dataset(
uri: str,
) -> ds.FileSystemDataset:
def dataset(uri: str, **kwargs) -> ds.FileSystemDataset:
"""
Create an Arrow Dataset from the given lance uri.
Expand All @@ -46,50 +44,8 @@ def dataset(
The uri to the lance data
"""
fmt = LanceFileFormat()
return ds.dataset(uri, format=fmt)


def scanner(
data: Union[str, Path, ds.Dataset],
columns: Optional[List[str]] = None,
filter: Optional[pc.Expression] = None,
batch_size: Optional[int] = None,
limit: Optional[int] = None,
offset: int = 0,
) -> ds.Scanner:
"""Build a PyArrow Dataset scanner.
It extends PyArrow Scanner with limit pushdown.
Parameters
----------
data: uri, path or pyarrow dataset
The Dataset
columns: List[str], optional
Specify the columns to read.
filter: pc.Expression, optional
Apply filter to the scanner.
batch_size: int
The maximum number of records to scan for each batch.
limit: int
Limit the number of records to return in total.
offset: int
The offset to read the data from.
See Also
--------
https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Scanner.html#pyarrow.dataset.Scanner
"""
if isinstance(data, (str, Path)):
data = dataset(str(data))
return BuildScanner(
data,
columns=columns,
filter=filter,
batch_size=batch_size,
limit=limit,
offset=offset,
)
dataset = ds.dataset(uri, format=fmt, **kwargs)
return _wrap_dataset(dataset)


def write_table(table: pa.Table, destination: Union[str, Path], batch_size: int = 1024):
Expand Down
180 changes: 168 additions & 12 deletions python/lance/_lib.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,38 @@ from pyarrow._dataset cimport (
CScanner,
Dataset,
FileFormat,
FileFragment,
FileWriteOptions,
Partitioning,
)

from pyarrow._dataset import Scanner
from pyarrow._dataset import Scanner, _forbid_instantiation

from pyarrow._compute cimport Expression, _bind
from pyarrow._fs cimport FileSystem
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport COutputStream, CTable
from pyarrow.includes.libarrow_dataset cimport CFileFormat
from pyarrow.includes.libarrow_dataset cimport (
CFileFormat,
CFileFragment,
CFileSystem,
CFileSystemDataset,
CFragment,
)
from pyarrow.lib cimport (
CExpression,
GetResultValue,
RecordBatchReader,
Schema,
check_status,
get_writer,
pyarrow_unwrap_table,
)

from pyarrow.lib import tobytes
from pyarrow.lib import frombytes, tobytes


cdef Expression _true = Expression._scalar(True)

cdef extern from "<optional>" namespace "std" nogil:
# Backport https://github.com/cython/cython/blob/master/Cython/Includes/libcpp/optional.pxd
Expand Down Expand Up @@ -76,9 +88,9 @@ cdef extern from "lance/arrow/file_lance.h" namespace "lance" nogil:

cdef extern from "lance/arrow/writer.h" namespace "lance::arrow" nogil:
CStatus CWriteTable "::lance::arrow::WriteTable"(
const CTable& table,
shared_ptr[COutputStream] sink,
CLanceFileWriteOptions options)
const CTable& table,
shared_ptr[COutputStream] sink,
CLanceFileWriteOptions options)


cdef extern from "lance/arrow/scanner.h" namespace "lance::arrow" nogil:
Expand All @@ -91,12 +103,12 @@ cdef extern from "lance/arrow/scanner.h" namespace "lance::arrow" nogil:
CResult[shared_ptr[CScanner]] Finish()

def BuildScanner(
dataset: Dataset,
columns: Optional[list[str]] = None,
filter: Optional[Expression] = None,
batch_size: Optional[int] = None,
limit: Optional[int] = None,
offset: int = 0,
dataset: Dataset,
columns: Optional[list[str]] = None,
filter: Optional[Expression] = None,
batch_size: Optional[int] = None,
limit: Optional[int] = None,
offset: int = 0,
):
cdef shared_ptr[CDataset] cdataset = dataset.unwrap()
cdef shared_ptr[LScannerBuilder] builder = shared_ptr[LScannerBuilder](
Expand Down Expand Up @@ -155,3 +167,147 @@ def WriteTable(table: Table,
options.batch_size = batch_size
with nogil:
check_status(CWriteTable(deref(arrow_table), out, options))

cdef class FileSystemDataset(Dataset):
"""
A Dataset of Lance fragments.
A LanceDataset is composed of one or more FileFragment.
Parameters
----------
fragments : list[Fragments]
List of fragments to consume.
schema : Schema
The top-level schema of the Dataset.
filesystem : FileSystem
FileSystem of the fragments.
root_partition : Expression, optional
The top-level partition of the DataDataset.
"""

cdef:
CFileSystemDataset * filesystem_dataset

def __init__(self):
_forbid_instantiation(self.__class__)

@property
def filesystem(self):
return FileSystem.wrap(self.filesystem_dataset.filesystem())

@property
def partitioning(self):
"""
The partitioning of the Dataset source, if discovered.
If the FileSystemDataset is created using the ``dataset()`` factory
function with a partitioning specified, this will return the
finalized Partitioning object from the dataset discovery. In all
other cases, this returns None.
"""
c_partitioning = self.filesystem_dataset.partitioning()
if c_partitioning.get() == nullptr:
return None
try:
return Partitioning.wrap(c_partitioning)
except TypeError:
# e.g. type_name "default"
return None

cdef void init(self, const shared_ptr[CDataset]& sp):
Dataset.init(self, sp)
self.filesystem_dataset = <CFileSystemDataset *> sp.get()

@staticmethod
cdef wrap(const shared_ptr[CDataset]& sp):
cdef Dataset ds = FileSystemDataset.__new__(FileSystemDataset)
ds.init(sp)
return ds

def __reduce__(self):
return FileSystemDataset, (
list(self.get_fragments()),
self.schema,
self.format,
self.filesystem,
self.partition_expression
)

@classmethod
def from_paths(cls, paths, schema=None, format=None,
filesystem=None, partitions=None, root_partition=None):
"""A Dataset created from a list of paths on a particular filesystem.
Parameters
----------
paths : list of str
List of file paths to create the fragments from.
schema : Schema
The top-level schema of the DataDataset.
format : FileFormat
File format to create fragments from, currently only
ParquetFileFormat, IpcFileFormat, and CsvFileFormat are supported.
filesystem : FileSystem
The filesystem which files are from.
partitions : list[Expression], optional
Attach additional partition information for the file paths.
root_partition : Expression, optional
The top-level partition of the DataDataset.
"""
cdef:
FileFragment fragment

if root_partition is None:
root_partition = _true

for arg, class_, name in [
(schema, Schema, 'schema'),
(format, FileFormat, 'format'),
(filesystem, FileSystem, 'filesystem'),
(root_partition, Expression, 'root_partition')
]:
if not isinstance(arg, class_):
raise TypeError(
"Argument '{0}' has incorrect type (expected {1}, "
"got {2})".format(name, class_.__name__, type(arg))
)

partitions = partitions or [_true] * len(paths)

if len(paths) != len(partitions):
raise ValueError(
'The number of files resulting from paths_or_selector '
'must be equal to the number of partitions.'
)

fragments = [
format.make_fragment(path, filesystem, partitions[i])
for i, path in enumerate(paths)
]
return FileSystemDataset(fragments, schema, format,
filesystem, root_partition)

@property
def files(self):
"""List of the files"""
cdef vector[c_string] files = self.filesystem_dataset.files()
return [frombytes(f) for f in files]

@property
def format(self):
"""The FileFormat of this source."""
cdef FileFormat format = LanceFileFormat.__new__(LanceFileFormat)
format.init(self.filesystem_dataset.format())
return format

def head(self, n: int, offset: int = 0) -> Table:
scanner = self.scanner(limit=n, offset=offset)
return scanner.to_table()

def scanner(self, *args, **kwargs):
return BuildScanner(self, *args, **kwargs)

def _wrap_dataset(Dataset dataset not None):
cdef shared_ptr[CDataset] copy = dataset.unwrap()
return FileSystemDataset.wrap(move(copy))
11 changes: 6 additions & 5 deletions python/lance/pytorch/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import numpy as np
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.fs
import pyarrow.dataset
import pyarrow.fs

try:
import torch
Expand Down Expand Up @@ -91,11 +91,12 @@ def __iter__(self):
"""Yield dataset"""
self._setup_dataset()
for file_uri in self._files:
ds = pyarrow.dataset.dataset(
file_uri, filesystem=self._fs, format=lance.LanceFileFormat()
ds = lance.dataset(
file_uri,
filesystem=self._fs,
)
scan = lance.scanner(
ds, columns=self.columns, batch_size=self.batch_size, filter=self.filter
scan = ds.scanner(
columns=self.columns, batch_size=self.batch_size, filter=self.filter
)
for batch in scan.to_reader():
yield [to_numpy(arr) for arr in batch.columns]
10 changes: 10 additions & 0 deletions python/lance/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ def test_simple_round_trips(tmp_path: Path):
assert table == actual


def test_head(tmp_path: Path):
table = pa.Table.from_pandas(
pd.DataFrame({"label": [123, 456, 789], "values": [22, 33, 2.24]})
)
write_table(table, tmp_path / "test.lance")
ds = dataset(str(tmp_path / "test.lance"))
actual = ds.head(2)
assert table[:2] == actual


def test_write_categorical_values(tmp_path: Path):
df = pd.DataFrame({"label": ["cat", "cat", "dog", "person"]})
df["label"] = df["label"].astype("category")
Expand Down
8 changes: 5 additions & 3 deletions python/lance/types/box.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@ def flatten(self):
return pc.list_flatten(self.storage)

def to_numpy(self, zero_copy_only=True):
return (self.flatten()
.to_numpy(zero_copy_only=zero_copy_only)
.reshape((len(self), 4)))
return (
self.flatten()
.to_numpy(zero_copy_only=zero_copy_only)
.reshape((len(self), 4))
)

@property
def xmin(self) -> np.ndarray:
Expand Down

0 comments on commit cc3e442

Please sign in to comment.