From cc3e442b50fd0efa23193ad9af304ed6d2d32eda Mon Sep 17 00:00:00 2001 From: Chang She <759245+changhiskhan@users.noreply.github.com> Date: Mon, 12 Sep 2022 14:14:05 -0700 Subject: [PATCH] lance dataset that overrides Dataset.scanner and Dataset.head (#158) * lance dataset that overrides Dataset.scanner and Dataset.head * PR comments --- python/lance/__init__.py | 52 +--------- python/lance/_lib.pyx | 180 ++++++++++++++++++++++++++++++--- python/lance/pytorch/data.py | 11 +- python/lance/tests/test_api.py | 10 ++ python/lance/types/box.py | 8 +- 5 files changed, 193 insertions(+), 68 deletions(-) diff --git a/python/lance/__init__.py b/python/lance/__init__.py index 216a469a06..6786fd0320 100644 --- a/python/lance/__init__.py +++ b/python/lance/__init__.py @@ -24,7 +24,7 @@ __version__ = lance.version.__version__ -from lance.lib import BuildScanner, LanceFileFormat, WriteTable +from lance.lib import BuildScanner, LanceFileFormat, WriteTable, _wrap_dataset from lance.types import register_extension_types if platform.system() == "Linux": @@ -34,9 +34,7 @@ __all__ = ["dataset", "write_table", "scanner", "LanceFileFormat", "__version__"] -def dataset( - uri: str, -) -> ds.FileSystemDataset: +def dataset(uri: str, **kwargs) -> ds.FileSystemDataset: """ Create an Arrow Dataset from the given lance uri. @@ -46,50 +44,8 @@ def dataset( The uri to the lance data """ fmt = LanceFileFormat() - return ds.dataset(uri, format=fmt) - - -def scanner( - data: Union[str, Path, ds.Dataset], - columns: Optional[List[str]] = None, - filter: Optional[pc.Expression] = None, - batch_size: Optional[int] = None, - limit: Optional[int] = None, - offset: int = 0, -) -> ds.Scanner: - """Build a PyArrow Dataset scanner. - - It extends PyArrow Scanner with limit pushdown. - - Parameters - ---------- - data: uri, path or pyarrow dataset - The Dataset - columns: List[str], optional - Specify the columns to read. - filter: pc.Expression, optional - Apply filter to the scanner. - batch_size: int - The maximum number of records to scan for each batch. - limit: int - Limit the number of records to return in total. - offset: int - The offset to read the data from. - - See Also - -------- - https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Scanner.html#pyarrow.dataset.Scanner - """ - if isinstance(data, (str, Path)): - data = dataset(str(data)) - return BuildScanner( - data, - columns=columns, - filter=filter, - batch_size=batch_size, - limit=limit, - offset=offset, - ) + dataset = ds.dataset(uri, format=fmt, **kwargs) + return _wrap_dataset(dataset) def write_table(table: pa.Table, destination: Union[str, Path], batch_size: int = 1024): diff --git a/python/lance/_lib.pyx b/python/lance/_lib.pyx index d376d1315c..9013c99cc8 100644 --- a/python/lance/_lib.pyx +++ b/python/lance/_lib.pyx @@ -17,26 +17,38 @@ from pyarrow._dataset cimport ( CScanner, Dataset, FileFormat, + FileFragment, FileWriteOptions, + Partitioning, ) -from pyarrow._dataset import Scanner +from pyarrow._dataset import Scanner, _forbid_instantiation from pyarrow._compute cimport Expression, _bind +from pyarrow._fs cimport FileSystem from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport COutputStream, CTable -from pyarrow.includes.libarrow_dataset cimport CFileFormat +from pyarrow.includes.libarrow_dataset cimport ( + CFileFormat, + CFileFragment, + CFileSystem, + CFileSystemDataset, + CFragment, +) from pyarrow.lib cimport ( CExpression, GetResultValue, RecordBatchReader, + Schema, check_status, get_writer, pyarrow_unwrap_table, ) -from pyarrow.lib import tobytes +from pyarrow.lib import frombytes, tobytes + +cdef Expression _true = Expression._scalar(True) cdef extern from "" namespace "std" nogil: # Backport https://github.com/cython/cython/blob/master/Cython/Includes/libcpp/optional.pxd @@ -76,9 +88,9 @@ cdef extern from "lance/arrow/file_lance.h" namespace "lance" nogil: cdef extern from "lance/arrow/writer.h" namespace "lance::arrow" nogil: CStatus CWriteTable "::lance::arrow::WriteTable"( - const CTable& table, - shared_ptr[COutputStream] sink, - CLanceFileWriteOptions options) + const CTable& table, + shared_ptr[COutputStream] sink, + CLanceFileWriteOptions options) cdef extern from "lance/arrow/scanner.h" namespace "lance::arrow" nogil: @@ -91,12 +103,12 @@ cdef extern from "lance/arrow/scanner.h" namespace "lance::arrow" nogil: CResult[shared_ptr[CScanner]] Finish() def BuildScanner( - dataset: Dataset, - columns: Optional[list[str]] = None, - filter: Optional[Expression] = None, - batch_size: Optional[int] = None, - limit: Optional[int] = None, - offset: int = 0, + dataset: Dataset, + columns: Optional[list[str]] = None, + filter: Optional[Expression] = None, + batch_size: Optional[int] = None, + limit: Optional[int] = None, + offset: int = 0, ): cdef shared_ptr[CDataset] cdataset = dataset.unwrap() cdef shared_ptr[LScannerBuilder] builder = shared_ptr[LScannerBuilder]( @@ -155,3 +167,147 @@ def WriteTable(table: Table, options.batch_size = batch_size with nogil: check_status(CWriteTable(deref(arrow_table), out, options)) + +cdef class FileSystemDataset(Dataset): + """ + A Dataset of Lance fragments. + + A LanceDataset is composed of one or more FileFragment. + + Parameters + ---------- + fragments : list[Fragments] + List of fragments to consume. + schema : Schema + The top-level schema of the Dataset. + filesystem : FileSystem + FileSystem of the fragments. + root_partition : Expression, optional + The top-level partition of the DataDataset. + """ + + cdef: + CFileSystemDataset * filesystem_dataset + + def __init__(self): + _forbid_instantiation(self.__class__) + + @property + def filesystem(self): + return FileSystem.wrap(self.filesystem_dataset.filesystem()) + + @property + def partitioning(self): + """ + The partitioning of the Dataset source, if discovered. + + If the FileSystemDataset is created using the ``dataset()`` factory + function with a partitioning specified, this will return the + finalized Partitioning object from the dataset discovery. In all + other cases, this returns None. + """ + c_partitioning = self.filesystem_dataset.partitioning() + if c_partitioning.get() == nullptr: + return None + try: + return Partitioning.wrap(c_partitioning) + except TypeError: + # e.g. type_name "default" + return None + + cdef void init(self, const shared_ptr[CDataset]& sp): + Dataset.init(self, sp) + self.filesystem_dataset = sp.get() + + @staticmethod + cdef wrap(const shared_ptr[CDataset]& sp): + cdef Dataset ds = FileSystemDataset.__new__(FileSystemDataset) + ds.init(sp) + return ds + + def __reduce__(self): + return FileSystemDataset, ( + list(self.get_fragments()), + self.schema, + self.format, + self.filesystem, + self.partition_expression + ) + + @classmethod + def from_paths(cls, paths, schema=None, format=None, + filesystem=None, partitions=None, root_partition=None): + """A Dataset created from a list of paths on a particular filesystem. + + Parameters + ---------- + paths : list of str + List of file paths to create the fragments from. + schema : Schema + The top-level schema of the DataDataset. + format : FileFormat + File format to create fragments from, currently only + ParquetFileFormat, IpcFileFormat, and CsvFileFormat are supported. + filesystem : FileSystem + The filesystem which files are from. + partitions : list[Expression], optional + Attach additional partition information for the file paths. + root_partition : Expression, optional + The top-level partition of the DataDataset. + """ + cdef: + FileFragment fragment + + if root_partition is None: + root_partition = _true + + for arg, class_, name in [ + (schema, Schema, 'schema'), + (format, FileFormat, 'format'), + (filesystem, FileSystem, 'filesystem'), + (root_partition, Expression, 'root_partition') + ]: + if not isinstance(arg, class_): + raise TypeError( + "Argument '{0}' has incorrect type (expected {1}, " + "got {2})".format(name, class_.__name__, type(arg)) + ) + + partitions = partitions or [_true] * len(paths) + + if len(paths) != len(partitions): + raise ValueError( + 'The number of files resulting from paths_or_selector ' + 'must be equal to the number of partitions.' + ) + + fragments = [ + format.make_fragment(path, filesystem, partitions[i]) + for i, path in enumerate(paths) + ] + return FileSystemDataset(fragments, schema, format, + filesystem, root_partition) + + @property + def files(self): + """List of the files""" + cdef vector[c_string] files = self.filesystem_dataset.files() + return [frombytes(f) for f in files] + + @property + def format(self): + """The FileFormat of this source.""" + cdef FileFormat format = LanceFileFormat.__new__(LanceFileFormat) + format.init(self.filesystem_dataset.format()) + return format + + def head(self, n: int, offset: int = 0) -> Table: + scanner = self.scanner(limit=n, offset=offset) + return scanner.to_table() + + def scanner(self, *args, **kwargs): + return BuildScanner(self, *args, **kwargs) + +def _wrap_dataset(Dataset dataset not None): + cdef shared_ptr[CDataset] copy = dataset.unwrap() + return FileSystemDataset.wrap(move(copy)) diff --git a/python/lance/pytorch/data.py b/python/lance/pytorch/data.py index aaa303f806..2db6c5920a 100644 --- a/python/lance/pytorch/data.py +++ b/python/lance/pytorch/data.py @@ -18,8 +18,8 @@ import numpy as np import pyarrow as pa import pyarrow.compute as pc -import pyarrow.fs import pyarrow.dataset +import pyarrow.fs try: import torch @@ -91,11 +91,12 @@ def __iter__(self): """Yield dataset""" self._setup_dataset() for file_uri in self._files: - ds = pyarrow.dataset.dataset( - file_uri, filesystem=self._fs, format=lance.LanceFileFormat() + ds = lance.dataset( + file_uri, + filesystem=self._fs, ) - scan = lance.scanner( - ds, columns=self.columns, batch_size=self.batch_size, filter=self.filter + scan = ds.scanner( + columns=self.columns, batch_size=self.batch_size, filter=self.filter ) for batch in scan.to_reader(): yield [to_numpy(arr) for arr in batch.columns] diff --git a/python/lance/tests/test_api.py b/python/lance/tests/test_api.py index 1a4cc10db2..b3c6538125 100644 --- a/python/lance/tests/test_api.py +++ b/python/lance/tests/test_api.py @@ -36,6 +36,16 @@ def test_simple_round_trips(tmp_path: Path): assert table == actual +def test_head(tmp_path: Path): + table = pa.Table.from_pandas( + pd.DataFrame({"label": [123, 456, 789], "values": [22, 33, 2.24]}) + ) + write_table(table, tmp_path / "test.lance") + ds = dataset(str(tmp_path / "test.lance")) + actual = ds.head(2) + assert table[:2] == actual + + def test_write_categorical_values(tmp_path: Path): df = pd.DataFrame({"label": ["cat", "cat", "dog", "person"]}) df["label"] = df["label"].astype("category") diff --git a/python/lance/types/box.py b/python/lance/types/box.py index 5dc7ffeb34..68029767b5 100644 --- a/python/lance/types/box.py +++ b/python/lance/types/box.py @@ -86,9 +86,11 @@ def flatten(self): return pc.list_flatten(self.storage) def to_numpy(self, zero_copy_only=True): - return (self.flatten() - .to_numpy(zero_copy_only=zero_copy_only) - .reshape((len(self), 4))) + return ( + self.flatten() + .to_numpy(zero_copy_only=zero_copy_only) + .reshape((len(self), 4)) + ) @property def xmin(self) -> np.ndarray: