Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Support group size and rows_per_file customization via write_dataset API #281

Merged
merged 2 commits into from
Nov 2, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 31 additions & 7 deletions python/lance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def dataset(
The uri to the lance data
version: optional, int
If specified, load a specific version of the dataset.
filesystem: pa.fs.FileSystem
filesystem: pyarrow.fs.FileSystem
File system instance to read.

Other Parameters
Expand Down Expand Up @@ -87,21 +87,37 @@ def write_dataset(
data: Union[pa.Table, pa.dataset.Dataset],
base_dir: Union[str, Path],
mode: str = "create",
filesystem: pa.fs.FileSystem = None,
filesystem: Optional[pa.fs.FileSystem] = None,
max_rows_per_file: int = 0,
min_rows_per_group: int = 0,
max_rows_per_group: int = 1024,
**kwargs,
):
"""Write a dataset.

Parameters
----------
data : pa.Table or pa.dataset.Dataset
data : pyarrow.Table or pyarrow.dataset.Dataset
The data to write.
base_dir : str or Path
The root directory to write dataset to.
mode : str, 'create' | 'append' | 'overwrite'
Write mode.
filesystem : pa.fs.FileSystem, optional
mode : str
Write mode, accepts values are, **'create' | 'append' | 'overwrite'**
filesystem : pyarrow.fs.FileSystem, optional
The filesystem to write the dataset
max_rows_per_file : int
Maximum number of rows per file. If greater than 0 then this will limit how many rows
are placed in any single file. Otherwise there will be no limit and one file will be
created in each output directory unless files need to be closed to respect max_open_files.
min_rows_per_group : int
Minimum number of rows per group. When the value is greater than 0,
the dataset writer will batch incoming data and only write the row groups
to the disk when sufficient rows have accumulated.
max_rows_per_group : int
Maximum number of rows per group. If the value is greater than 0,
then the dataset writer may split up large incoming batches into multiple row groups.
If this value is set, then min_rows_per_group should also be set.
Otherwise it could end up with very small row groups.
"""
from pyarrow.fs import _resolve_filesystem_and_path

Expand All @@ -110,4 +126,12 @@ def write_dataset(

filesystem, base_dir = _resolve_filesystem_and_path(base_dir, filesystem)

_lance_dataset_write(data, base_dir, filesystem, mode)
_lance_dataset_write(
data,
base_dir,
filesystem,
mode,
max_rows_per_file,
min_rows_per_group,
max_rows_per_group,
)
29 changes: 24 additions & 5 deletions python/lance/_lib.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,26 @@ def _lance_dataset_write(
Dataset data,
object base_dir not None,
FileSystem filesystem not None,
str mode not None
str mode not None,
int max_rows_per_file,
int min_rows_per_group,
int max_rows_per_group,
):
"""Wraps 'LanceDataset::Write'.

Parameters
----------
data : pyarrow.dataset.Dataset
Dataset to write.
base_dir : str or Path
The directory to write dataset to.
filesystem : pyarrow.fs.FileSystem
Pyarrow File System instance to write dataset.
mode : str
Write mode, can be "create", "append" or "overwrite".
max_rows_per_file : int
min_rows_per_group : int
max_rows_per_group : int
"""

cdef:
Expand All @@ -248,10 +262,15 @@ def _lance_dataset_write(
c_options.filesystem = filesystem.unwrap()
c_options.file_write_options = write_options.unwrap()
c_options.create_dir = True

if mode == "create":
c_mode = CLanceDataset.WriteMode.CREATE
elif mode == "append":
if max_rows_per_file > 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a simple check the max_rows_per_file >= max_rows_per_group?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm

c_options.max_rows_per_file = max_rows_per_file
if min_rows_per_group > 0:
c_options.min_rows_per_group = min_rows_per_group
c_options.max_rows_per_group = max_rows_per_group

# Default value
c_mode = CLanceDataset.WriteMode.CREATE
if mode == "append":
c_mode = CLanceDataset.WriteMode.APPEND
elif mode == "overwrite":
c_mode = CLanceDataset.WriteMode.OVERWRITE
Expand Down