From b7e5f8f97ef0eb95368122f29ac8915ae692f94e Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Tue, 18 Jul 2023 15:15:55 +0200 Subject: [PATCH] Fix shuffle code to work with pyarrow 13 (#8009) --- distributed/shuffle/_worker_plugin.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/shuffle/_worker_plugin.py b/distributed/shuffle/_worker_plugin.py index 097aa2237ed..c6348673495 100644 --- a/distributed/shuffle/_worker_plugin.py +++ b/distributed/shuffle/_worker_plugin.py @@ -951,7 +951,7 @@ def split_by_worker( # bytestream such that it cannot be deserialized anymore t = pa.Table.from_pandas(df, preserve_index=True) t = t.sort_by("_worker") - codes = np.asarray(t.select(["_worker"]))[0] + codes = np.asarray(t["_worker"]) t = t.drop(["_worker"]) del df @@ -983,7 +983,7 @@ def split_by_partition(t: pa.Table, column: str) -> dict[Any, pa.Table]: partitions.sort() t = t.sort_by(column) - partition = np.asarray(t.select([column]))[0] + partition = np.asarray(t[column]) splits = np.where(partition[1:] != partition[:-1])[0] + 1 splits = np.concatenate([[0], splits])