diff --git a/python/lance/__init__.py b/python/lance/__init__.py index b22fdd27b7..5785d74844 100644 --- a/python/lance/__init__.py +++ b/python/lance/__init__.py @@ -58,7 +58,7 @@ def dataset( The uri to the lance data version: optional, int If specified, load a specific version of the dataset. - filesystem: pa.fs.FileSystem + filesystem: pyarrow.fs.FileSystem File system instance to read. Other Parameters @@ -87,27 +87,56 @@ def write_dataset( data: Union[pa.Table, pa.dataset.Dataset], base_dir: Union[str, Path], mode: str = "create", - filesystem: pa.fs.FileSystem = None, + filesystem: Optional[pa.fs.FileSystem] = None, + max_rows_per_file: int = 0, + min_rows_per_group: int = 0, + max_rows_per_group: int = 1024, **kwargs, ): """Write a dataset. Parameters ---------- - data : pa.Table or pa.dataset.Dataset + data : pyarrow.Table or pyarrow.dataset.Dataset The data to write. base_dir : str or Path The root directory to write dataset to. - mode : str, 'create' | 'append' | 'overwrite' - Write mode. - filesystem : pa.fs.FileSystem, optional + mode : str + Write mode, accepts values are, **'create' | 'append' | 'overwrite'** + filesystem : pyarrow.fs.FileSystem, optional The filesystem to write the dataset + max_rows_per_file : int + Maximum number of rows per file. If greater than 0 then this will limit how many rows + are placed in any single file. Otherwise there will be no limit and one file will be + created in each output directory unless files need to be closed to respect max_open_files. + min_rows_per_group : int + Minimum number of rows per group. When the value is greater than 0, + the dataset writer will batch incoming data and only write the row groups + to the disk when sufficient rows have accumulated. + max_rows_per_group : int + Maximum number of rows per group. If the value is greater than 0, + then the dataset writer may split up large incoming batches into multiple row groups. + If this value is set, then min_rows_per_group should also be set. + Otherwise it could end up with very small row groups. """ from pyarrow.fs import _resolve_filesystem_and_path + if 0 < max_rows_per_file < max_rows_per_group: + raise ValueError( + "Max rows per file must be larger or equal to max_rows_per_group" + ) + if isinstance(data, pa.Table): data = pa.dataset.InMemoryDataset(data) filesystem, base_dir = _resolve_filesystem_and_path(base_dir, filesystem) - _lance_dataset_write(data, base_dir, filesystem, mode) + _lance_dataset_write( + data, + base_dir, + filesystem, + mode, + max_rows_per_file, + min_rows_per_group, + max_rows_per_group, + ) diff --git a/python/lance/_lib.pyx b/python/lance/_lib.pyx index be525069ce..96cb6692f2 100644 --- a/python/lance/_lib.pyx +++ b/python/lance/_lib.pyx @@ -224,12 +224,26 @@ def _lance_dataset_write( Dataset data, object base_dir not None, FileSystem filesystem not None, - str mode not None + str mode not None, + int max_rows_per_file, + int min_rows_per_group, + int max_rows_per_group, ): """Wraps 'LanceDataset::Write'. Parameters ---------- + data : pyarrow.dataset.Dataset + Dataset to write. + base_dir : str or Path + The directory to write dataset to. + filesystem : pyarrow.fs.FileSystem + Pyarrow File System instance to write dataset. + mode : str + Write mode, can be "create", "append" or "overwrite". + max_rows_per_file : int + min_rows_per_group : int + max_rows_per_group : int """ cdef: @@ -248,10 +262,15 @@ def _lance_dataset_write( c_options.filesystem = filesystem.unwrap() c_options.file_write_options = write_options.unwrap() c_options.create_dir = True - - if mode == "create": - c_mode = CLanceDataset.WriteMode.CREATE - elif mode == "append": + if max_rows_per_file > 0: + c_options.max_rows_per_file = max_rows_per_file + if min_rows_per_group > 0: + c_options.min_rows_per_group = min_rows_per_group + c_options.max_rows_per_group = max_rows_per_group + + # Default value + c_mode = CLanceDataset.WriteMode.CREATE + if mode == "append": c_mode = CLanceDataset.WriteMode.APPEND elif mode == "overwrite": c_mode = CLanceDataset.WriteMode.OVERWRITE