Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Add translation of IOConfig to PyArrow filesystem arguments #1592

Merged
merged 18 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,21 @@ class S3Config:
I/O configuration for accessing an S3-compatible system.
"""

region_name: str | None
endpoint_url: str | None
key_id: str | None
session_token: str | None
access_key: str | None
max_connections: int
retry_initial_backoff_ms: int
connect_timeout_ms: int
read_timeout_ms: int
num_tries: int
retry_mode: str | None
anonymous: bool
verify_ssl: bool
check_hostname_ssl: bool

def __init__(
self,
region_name: str | None = None,
Expand Down Expand Up @@ -313,6 +328,9 @@ class GCSConfig:
I/O configuration for accessing Google Cloud Storage.
"""

project_id: str | None
anonymous: bool

def __init__(self, project_id: str | None = None, anonymous: bool | None = None): ...

class IOConfig:
Expand Down Expand Up @@ -347,10 +365,9 @@ class PythonStorageConfig:
Storage configuration for the legacy Python I/O layer.
"""

fs: fsspec.AbstractFileSystem
io_config: IOConfig

def __init__(self, fs: fsspec.AbstractFileSystem | None = None, io_config: IOConfig | None = None): ...
def __init__(self, io_config: IOConfig | None = None): ...

class StorageConfig:
"""
Expand Down Expand Up @@ -882,6 +899,7 @@ class LogicalPlanBuilder:
file_format: FileFormat,
partition_cols: list[PyExpr] | None = None,
compression: str | None = None,
io_config: IOConfig | None = None,
) -> LogicalPlanBuilder: ...
def schema(self) -> PySchema: ...
def optimize(self) -> LogicalPlanBuilder: ...
Expand Down
12 changes: 10 additions & 2 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from daft.api_annotations import DataframePublicAPI
from daft.context import get_context
from daft.convert import InputListType
from daft.daft import FileFormat, JoinType, PartitionScheme, ResourceRequest
from daft.daft import FileFormat, IOConfig, JoinType, PartitionScheme, ResourceRequest
from daft.dataframe.preview import DataFramePreview
from daft.datatype import DataType
from daft.errors import ExpressionTypeError
Expand Down Expand Up @@ -316,6 +316,7 @@ def write_parquet(
root_dir: Union[str, pathlib.Path],
compression: str = "snappy",
partition_cols: Optional[List[ColumnInputType]] = None,
io_config: Optional[IOConfig] = None,
) -> "DataFrame":
"""Writes the DataFrame as parquet files, returning a new DataFrame with paths to the files that were written

Expand All @@ -330,6 +331,7 @@ def write_parquet(
root_dir (str): root file path to write parquet files to.
compression (str, optional): compression algorithm. Defaults to "snappy".
partition_cols (Optional[List[ColumnInputType]], optional): How to subpartition each partition further. Currently only supports Column Expressions with any calls. Defaults to None.
io_config (Optional[IOConfig], optional): configurations to use when interacting with remote storage.

Returns:
DataFrame: The filenames that were written out as strings.
Expand All @@ -350,6 +352,7 @@ def write_parquet(
partition_cols=cols,
file_format=FileFormat.Parquet,
compression=compression,
io_config=io_config,
)
# Block and write, then retrieve data and return a new disconnected DataFrame
write_df = DataFrame(builder)
Expand All @@ -359,7 +362,10 @@ def write_parquet(

@DataframePublicAPI
def write_csv(
self, root_dir: Union[str, pathlib.Path], partition_cols: Optional[List[ColumnInputType]] = None
self,
root_dir: Union[str, pathlib.Path],
partition_cols: Optional[List[ColumnInputType]] = None,
io_config: Optional[IOConfig] = None,
) -> "DataFrame":
"""Writes the DataFrame as CSV files, returning a new DataFrame with paths to the files that were written

Expand All @@ -374,6 +380,7 @@ def write_csv(
root_dir (str): root file path to write parquet files to.
compression (str, optional): compression algorithm. Defaults to "snappy".
partition_cols (Optional[List[ColumnInputType]], optional): How to subpartition each partition further. Currently only supports Column Expressions with any calls. Defaults to None.
io_config (Optional[IOConfig], optional): configurations to use when interacting with remote storage.

Returns:
DataFrame: The filenames that were written out as strings.
Expand All @@ -390,6 +397,7 @@ def write_csv(
root_dir=root_dir,
partition_cols=cols,
file_format=FileFormat.Csv,
io_config=io_config,
)

# Block and write, then retrieve data and return a new disconnected DataFrame
Expand Down
4 changes: 4 additions & 0 deletions daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
CsvSourceConfig,
FileFormat,
FileFormatConfig,
IOConfig,
JoinType,
JsonSourceConfig,
ParquetSourceConfig,
Expand Down Expand Up @@ -429,6 +430,7 @@ class WriteFile(SingleOutputInstruction):
root_dir: str | pathlib.Path
compression: str | None
partition_cols: ExpressionsProjection | None
io_config: IOConfig | None

def run(self, inputs: list[Table]) -> list[Table]:
return self._write_file(inputs)
Expand Down Expand Up @@ -456,13 +458,15 @@ def _handle_file_write(self, input: Table) -> Table:
path=self.root_dir,
compression=self.compression,
partition_cols=self.partition_cols,
io_config=self.io_config,
)
elif self.file_format == FileFormat.Csv:
file_names = table_io.write_csv(
input,
path=self.root_dir,
compression=self.compression,
partition_cols=self.partition_cols,
io_config=self.io_config,
)
else:
raise ValueError(
Expand Down
3 changes: 3 additions & 0 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from daft.daft import (
FileFormat,
FileFormatConfig,
IOConfig,
JoinType,
ResourceRequest,
StorageConfig,
Expand Down Expand Up @@ -137,6 +138,7 @@ def file_write(
root_dir: str | pathlib.Path,
compression: str | None,
partition_cols: ExpressionsProjection | None,
io_config: IOConfig | None,
) -> InProgressPhysicalPlan[PartitionT]:
"""Write the results of `child_plan` into files described by `write_info`."""

Expand All @@ -148,6 +150,7 @@ def file_write(
root_dir=root_dir,
compression=compression,
partition_cols=partition_cols,
io_config=io_config,
),
)
if isinstance(step, PartitionTaskBuilder)
Expand Down
10 changes: 9 additions & 1 deletion daft/execution/rust_physical_plan_shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from daft.daft import (
FileFormat,
FileFormatConfig,
IOConfig,
JoinType,
PyExpr,
PySchema,
Expand Down Expand Up @@ -214,11 +215,18 @@ def write_file(
root_dir: str,
compression: str | None,
partition_cols: list[PyExpr] | None,
io_config: IOConfig | None,
) -> physical_plan.InProgressPhysicalPlan[PartitionT]:
if partition_cols is not None:
expr_projection = ExpressionsProjection([Expression._from_pyexpr(expr) for expr in partition_cols])
else:
expr_projection = None
return physical_plan.file_write(
input, file_format, Schema._from_pyschema(schema), root_dir, compression, expr_projection
input,
file_format,
Schema._from_pyschema(schema),
root_dir,
compression,
expr_projection,
io_config,
)
Loading
Loading