Skip to content

Commit

Permalink
fix: cast null arrays to the appropriate type when coercing to a table (
Browse files Browse the repository at this point in the history
#3362)

@eddyxu @westonpace @Jay-ju Don't ask me why :) just try testing `None`
with `LanceDatasink`.

See this comment:
#3308 (comment)
Ref issue: #3308

---------

Co-authored-by: Weston Pace <[email protected]>
  • Loading branch information
andrijazz and westonpace authored Jan 10, 2025
1 parent 69d4610 commit 2142594
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 18 deletions.
24 changes: 6 additions & 18 deletions python/python/lance/ray/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,8 @@ def _pd_to_arrow(
tbl.schema = tbl.schema.remove_metadata()
return tbl
elif isinstance(df, pa.Table):
fields = df.schema.names
new_columns = []
new_fields = []
for field in fields:
col = df[field]
new_field = pa.field(field, col.type)
if (
pa.types.is_null(col.type)
and schema.field_by_name(field).type == pa.string()
):
new_field = pa.field(field, pa.string())
col = pa.compute.if_else(pa.compute.is_null(col), NONE_ARROW_STR, col)
new_columns.append(col)
new_fields.append(new_field)
new_schema = pa.schema(fields=new_fields)
new_table = pa.Table.from_arrays(new_columns, schema=new_schema)
return new_table
if schema is not None:
return df.cast(schema)
return df


Expand Down Expand Up @@ -439,6 +424,7 @@ def write_lance(
output_uri: str,
*,
schema: Optional[pa.Schema] = None,
mode: Literal["create", "append", "overwrite"] = "create",
transform: Optional[
Callable[[pa.Table], Union[pa.Table, Generator[None, pa.Table, None]]]
] = None,
Expand Down Expand Up @@ -485,7 +471,9 @@ def write_lance(
),
batch_size=max_rows_per_file,
).write_datasink(
LanceCommitter(output_uri, schema=schema, storage_options=storage_options)
LanceCommitter(
output_uri, schema=schema, mode=mode, storage_options=storage_options
)
)


Expand Down
23 changes: 23 additions & 0 deletions python/python/tests/test_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,26 @@ def f(row):
assert len(pylist) == 10
for item in pylist:
assert item is None


@pytest.mark.filterwarnings("ignore::DeprecationWarning")
def test_ray_write_lance_none_str_datasink(tmp_path: Path):
def f(row):
return {
"id": row["id"],
"str": None,
}

schema = pa.schema([pa.field("id", pa.int64()), pa.field("str", pa.string())])

sink = LanceDatasink(tmp_path, schema=schema)
(ray.data.range(10).map(f).write_datasink(sink))
ds = lance.dataset(tmp_path)
ds.count_rows() == 10
assert ds.schema == schema

tbl = ds.to_table()
pylist = tbl["str"].to_pylist()
assert len(pylist) == 10
for item in pylist:
assert item is None

0 comments on commit 2142594

Please sign in to comment.