From 3594f6640721c6d122050b09b0b69b7835850c5f Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 1 May 2023 16:34:40 -0700 Subject: [PATCH 1/2] generalize fix Signed-off-by: Eric Liang --- python/ray/data/_internal/arrow_block.py | 16 ++++++++++++-- .../ray/data/_internal/planner/map_batches.py | 12 ----------- python/ray/data/tests/test_strict_mode.py | 21 ++++++++++++++++++- 3 files changed, 34 insertions(+), 15 deletions(-) diff --git a/python/ray/data/_internal/arrow_block.py b/python/ray/data/_internal/arrow_block.py index cb97eb0d67a4..839712bda053 100644 --- a/python/ray/data/_internal/arrow_block.py +++ b/python/ray/data/_internal/arrow_block.py @@ -153,7 +153,7 @@ def from_bytes(cls, data: bytes) -> "ArrowBlockAccessor": @staticmethod def numpy_to_block( - batch: Union[np.ndarray, Dict[str, np.ndarray]], + batch: Union[np.ndarray, Dict[str, np.ndarray], Dict[str, list]], passthrough_arrow_not_implemented_errors: bool = False, ) -> "pyarrow.Table": import pyarrow as pa @@ -163,7 +163,7 @@ def numpy_to_block( if isinstance(batch, np.ndarray): batch = {TENSOR_COLUMN_NAME: batch} elif not isinstance(batch, collections.abc.Mapping) or any( - not isinstance(col, np.ndarray) for col in batch.values() + not isinstance(col, (list, np.ndarray)) for col in batch.values() ): raise ValueError( "Batch must be an ndarray or dictionary of ndarrays when converting " @@ -172,6 +172,18 @@ def numpy_to_block( ) new_batch = {} for col_name, col in batch.items(): + if isinstance(col, list): + # Try to convert list values into an numpy array via + # np.array(), so users don't need to manually cast. + # NOTE: we don't cast generic iterables, since types like + # `str` are also Iterable. + try: + col = np.array(col) + except Exception: + raise ValueError( + "Failed to convert column values to numpy array: " + f"({_truncated_repr(col)})." + ) # Use Arrow's native *List types for 1-dimensional ndarrays. if col.dtype.type is np.object_ or col.ndim > 1: try: diff --git a/python/ray/data/_internal/planner/map_batches.py b/python/ray/data/_internal/planner/map_batches.py index 27597c550bdb..d1a8c09896bf 100644 --- a/python/ray/data/_internal/planner/map_batches.py +++ b/python/ray/data/_internal/planner/map_batches.py @@ -62,18 +62,6 @@ def validate_batch(batch: Block) -> None: f"{type(value)}. To fix this issue, convert " f"the {type(value)} to a `np.ndarray`." ) - if isinstance(value, list): - # Try to convert list values into an numpy array via - # np.array(), so users don't need to manually cast. - # NOTE: we don't cast generic iterables, since types like - # `str` are also Iterable. - try: - batch[key] = np.array(value) - except Exception: - raise ValueError( - "Failed to convert column values to numpy array: " - f"({_truncated_repr(value)})." - ) def process_next_batch(batch: DataBatch) -> Iterator[Block]: # Apply UDF. diff --git a/python/ray/data/tests/test_strict_mode.py b/python/ray/data/tests/test_strict_mode.py index bbbf3c6b1b23..5b9b898f6426 100644 --- a/python/ray/data/tests/test_strict_mode.py +++ b/python/ray/data/tests/test_strict_mode.py @@ -86,7 +86,9 @@ def test_strict_convert_map_output(ray_start_regular_shared, enable_strict_mode) with pytest.raises(ValueError): # Strings not converted into array. - ray.data.range(1).map_batches(lambda x: {"id": "string"}).materialize() + ray.data.range(1).map_batches( + lambda x: {"id": "string"}, max_retries=0 + ).materialize() class UserObj: def __eq__(self, other): @@ -100,6 +102,23 @@ def __eq__(self, other): assert ds.take_batch()["id"].tolist() == [0, 1, 2, UserObj()] +def test_strict_convert_map_groups(ray_start_regular_shared, enable_strict_mode): + ds = ray.data.read_csv("example://iris.csv") + + def process_group(group): + variety = group["variety"][0] + count = len(group["variety"]) + + # Test implicit list->array conversion here. + return { + "variety": [variety], + "count": [count], + } + + ds = ds.groupby("variety").map_groups(process_group) + ds.show() + + def test_strict_default_batch_format(ray_start_regular_shared, enable_strict_mode): ds = ray.data.range(1) From cab11bc39cb4b14eef0bf25eeb894769cc7d2b49 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 1 May 2023 18:02:25 -0700 Subject: [PATCH 2/2] change more test Signed-off-by: Eric Liang --- python/ray/data/tests/test_strict_mode.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/ray/data/tests/test_strict_mode.py b/python/ray/data/tests/test_strict_mode.py index 5b9b898f6426..30cc5966ee30 100644 --- a/python/ray/data/tests/test_strict_mode.py +++ b/python/ray/data/tests/test_strict_mode.py @@ -53,8 +53,8 @@ def test_strict_map_output(ray_start_regular_shared, enable_strict_mode): with pytest.raises(StrictModeError): ds.map_batches(lambda x: np.array([0]), max_retries=0).materialize() - ds.map_batches(lambda x: {"id": np.array([0])}).materialize() - ds.map_batches(lambda x: UserDict({"id": np.array([0])})).materialize() + ds.map_batches(lambda x: {"id": [0]}).materialize() + ds.map_batches(lambda x: UserDict({"id": [0]})).materialize() with pytest.raises(StrictModeError): ds.map(lambda x: np.ones(10), max_retries=0).materialize() @@ -71,8 +71,8 @@ def test_strict_map_output(ray_start_regular_shared, enable_strict_mode): ds.map_batches(lambda x: object(), max_retries=0).materialize() with pytest.raises(ValueError): ds.map_batches(lambda x: {"x": object()}, max_retries=0).materialize() - ds.map_batches(lambda x: {"x": np.array([object()])}).materialize() - ds.map_batches(lambda x: UserDict({"x": np.array([object()])})).materialize() + ds.map_batches(lambda x: {"x": [object()]}).materialize() + ds.map_batches(lambda x: UserDict({"x": [object()]})).materialize() with pytest.raises(StrictModeError): ds.map(lambda x: object(), max_retries=0).materialize()