Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#5367: Introduce new API for repartitioning Modin objects #5366

Merged
merged 21 commits into from
Dec 10, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2181,6 +2181,7 @@ def apply_full_axis(
new_index=None,
new_columns=None,
dtypes=None,
keep_partitioning=True,
):
"""
Perform a function across an entire axis.
Expand All @@ -2201,6 +2202,9 @@ def apply_full_axis(
The data types of the result. This is an optimization
because there are functions that always result in a particular data
type, and allows us to avoid (re)computing it.
keep_partitioning : boolean, default: True
The flag to keep partition boundaries for Modin Frame.
Setting it to True disables shuffling data from one partition to another.

Returns
-------
Expand All @@ -2218,6 +2222,7 @@ def apply_full_axis(
new_columns=new_columns,
dtypes=dtypes,
other=None,
keep_partitioning=keep_partitioning,
)

@lazy_metadata_decorator(apply_axis="both")
Expand Down Expand Up @@ -2608,6 +2613,7 @@ def broadcast_apply_full_axis(
apply_indices=None,
enumerate_partitions=False,
dtypes=None,
keep_partitioning=True,
):
"""
Broadcast partitions of `other` Modin DataFrame and apply a function along full axis.
Expand Down Expand Up @@ -2635,6 +2641,9 @@ def broadcast_apply_full_axis(
Data types of the result. This is an optimization
because there are functions that always result in a particular data
type, and allows us to avoid (re)computing it.
keep_partitioning : boolean, default: True
The flag to keep partition boundaries for Modin Frame.
Setting it to True disables shuffling data from one partition to another.

Returns
-------
Expand All @@ -2659,7 +2668,7 @@ def broadcast_apply_full_axis(
apply_func=self._build_treereduce_func(axis, func),
apply_indices=apply_indices,
enumerate_partitions=enumerate_partitions,
keep_partitioning=True,
keep_partitioning=keep_partitioning,
)
# Index objects for new object creation. This is shorter than if..else
kw = self.__make_init_labels_args(new_partitions, new_index, new_columns)
Expand Down
44 changes: 41 additions & 3 deletions modin/test/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

import pytest
import modin.utils
import json

from textwrap import dedent, indent

import pytest

from modin.error_message import ErrorMessage
import modin.utils
import modin.pandas as pd


# Note: classes below are used for purely testing purposes - they
Expand Down Expand Up @@ -287,3 +289,39 @@ def test_warns_that_defaulting_to_pandas():

with warns_that_defaulting_to_pandas():
ErrorMessage.default_to_pandas(message="Function name")


@pytest.mark.parametrize("axis", [0, 1, None])
@pytest.mark.parametrize("dtype", ["DataFrame", "Series"])
def test_repartition(axis, dtype):
df = pd.DataFrame({"col1": [1, 2], "col2": [5, 6]})
df2 = pd.DataFrame({"col3": [9, 4]})

df = pd.concat([df, df2], axis=1)
df = pd.concat([df, df], axis=0)

obj = df if dtype == "DataFrame" else df["col1"]

source_shapes = {
"DataFrame": (2, 2),
"Series": (2, 1),
}
# check that the test makes sense
assert obj._query_compiler._modin_frame._partitions.shape == source_shapes[dtype]

obj = modin.utils.repartition(obj, axis=axis)

if dtype == "DataFrame":
results = {
None: (1, 1),
0: (1, 2),
1: (2, 1),
}
else:
results = {
None: (1, 1),
0: (1, 1),
1: (2, 1),
}

assert obj._query_compiler._modin_frame._partitions.shape == results[axis]
36 changes: 36 additions & 0 deletions modin/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,42 @@ def try_cast_to_pandas(obj: Any, squeeze: bool = False) -> Any:
return obj


def repartition(df, axis: Optional[int] = None): # type:ignore
"""
Repartitioning Modin objects to get ideal partitions inside.

Allows to improve performance where the query compiler cannot yet.

Parameters
----------
df : DataFrame or Series
axis : int, optional

Returns
-------
DataFrame or Series
The repartitioned dataframe or series, depending on the original type.
"""
if axis not in (0, 1, None):
raise NotImplementedError

from modin.pandas import DataFrame, Series

if not isinstance(df, (DataFrame, Series)):
raise NotImplementedError

list_axis = [0, 1] if axis is None else [axis]

new_query_compiler = df._query_compiler
for _ax in list_axis:
new_query_compiler = new_query_compiler.__constructor__(
new_query_compiler._modin_frame.apply_full_axis(
_ax, lambda df: df, keep_partitioning=False
)
)
return df.__constructor__(query_compiler=new_query_compiler)


def wrap_into_list(*args: Any, skipna: bool = True) -> List[Any]:
"""
Wrap a sequence of passed values in a flattened list.
Expand Down