Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#5367: Introduce new API for repartitioning Modin objects #5366

Merged
merged 21 commits into from
Dec 10, 2022
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ jobs:
sudo docker pull postgres
sudo docker run --name some-postgres -e POSTGRES_USER=sa -e POSTGRES_PASSWORD=Strong.Pwd-123 -e POSTGRES_DB=postgres -d -p 2345:5432 postgres
- run: MODIN_BENCHMARK_MODE=True mpiexec -n 1 python -m pytest modin/pandas/test/internals/test_benchmark_mode.py
- run: mpiexec -n 1 python -m pytest modin/pandas/test/internals/test_repartition.py
- run: mpiexec -n 1 python -m pytest modin/pandas/test/dataframe/test_binary.py
- run: mpiexec -n 1 python -m pytest modin/pandas/test/dataframe/test_default.py
- run: mpiexec -n 1 python -m pytest modin/pandas/test/dataframe/test_indexing.py
Expand Down Expand Up @@ -653,6 +654,7 @@ jobs:
sudo docker pull postgres
sudo docker run --name some-postgres -e POSTGRES_USER=sa -e POSTGRES_PASSWORD=Strong.Pwd-123 -e POSTGRES_DB=postgres -d -p 2345:5432 postgres
- run: MODIN_BENCHMARK_MODE=True pytest modin/pandas/test/internals/test_benchmark_mode.py
- run: pytest modin/pandas/test/internals/test_repartition.py
- run: pytest -n 2 modin/experimental/xgboost/test/test_default.py
- run: pytest -n 2 modin/experimental/xgboost/test/test_xgboost.py
if: matrix.engine == 'ray'
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ jobs:
sudo docker pull postgres
sudo docker run --name some-postgres -e POSTGRES_USER=sa -e POSTGRES_PASSWORD=Strong.Pwd-123 -e POSTGRES_DB=postgres -d -p 2345:5432 postgres
- run: MODIN_BENCHMARK_MODE=True mpiexec -n 1 python -m pytest modin/pandas/test/internals/test_benchmark_mode.py
- run: mpiexec -n 1 python -m pytest modin/pandas/test/internals/test_repartition.py
- run: mpiexec -n 1 python -m pytest modin/pandas/test/dataframe/test_binary.py
- run: mpiexec -n 1 python -m pytest modin/pandas/test/dataframe/test_default.py
- run: mpiexec -n 1 python -m pytest modin/pandas/test/dataframe/test_indexing.py
Expand Down Expand Up @@ -278,6 +279,8 @@ jobs:
run: |
sudo docker pull postgres
sudo docker run --name some-postgres -e POSTGRES_USER=sa -e POSTGRES_PASSWORD=Strong.Pwd-123 -e POSTGRES_DB=postgres -d -p 2345:5432 postgres
- run: MODIN_BENCHMARK_MODE=True pytest modin/pandas/test/internals/test_benchmark_mode.py
- run: pytest modin/pandas/test/internals/test_repartition.py
Comment on lines +282 to +283
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has not been tested before when pushing, which affects the stability of codecov results.

- run: pytest -n 2 modin/experimental/xgboost/test/test_default.py
- run: pytest -n 2 modin/experimental/xgboost/test/test_xgboost.py
if: matrix.engine == 'ray'
Expand Down
11 changes: 10 additions & 1 deletion modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2181,6 +2181,7 @@ def apply_full_axis(
new_index=None,
new_columns=None,
dtypes=None,
keep_partitioning=True,
):
"""
Perform a function across an entire axis.
Expand All @@ -2201,6 +2202,9 @@ def apply_full_axis(
The data types of the result. This is an optimization
because there are functions that always result in a particular data
type, and allows us to avoid (re)computing it.
keep_partitioning : boolean, default: True
The flag to keep partition boundaries for Modin Frame.
Setting it to True disables shuffling data from one partition to another.

Returns
-------
Expand All @@ -2218,6 +2222,7 @@ def apply_full_axis(
new_columns=new_columns,
dtypes=dtypes,
other=None,
keep_partitioning=keep_partitioning,
)

@lazy_metadata_decorator(apply_axis="both")
Expand Down Expand Up @@ -2608,6 +2613,7 @@ def broadcast_apply_full_axis(
apply_indices=None,
enumerate_partitions=False,
dtypes=None,
keep_partitioning=True,
):
"""
Broadcast partitions of `other` Modin DataFrame and apply a function along full axis.
Expand Down Expand Up @@ -2635,6 +2641,9 @@ def broadcast_apply_full_axis(
Data types of the result. This is an optimization
because there are functions that always result in a particular data
type, and allows us to avoid (re)computing it.
keep_partitioning : boolean, default: True
The flag to keep partition boundaries for Modin Frame.
Setting it to True disables shuffling data from one partition to another.

Returns
-------
Expand All @@ -2659,7 +2668,7 @@ def broadcast_apply_full_axis(
apply_func=self._build_treereduce_func(axis, func),
apply_indices=apply_indices,
enumerate_partitions=enumerate_partitions,
keep_partitioning=True,
keep_partitioning=keep_partitioning,
)
# Index objects for new object creation. This is shorter than if..else
kw = self.__make_init_labels_args(new_partitions, new_index, new_columns)
Expand Down
33 changes: 33 additions & 0 deletions modin/core/storage_formats/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from . import doc_utils
from modin.logging import ClassLogger
from modin.utils import MODIN_UNNAMED_SERIES_LABEL
from modin.config import StorageFormat

from pandas.core.dtypes.common import is_scalar
import pandas.core.resample
Expand Down Expand Up @@ -4766,4 +4767,36 @@ def compare(self, other, align_axis, keep_shape, keep_equal, result_names):
result_names=result_names,
)

def repartition(self, axis):
"""
Repartitioning QueryCompiler objects to get ideal partitions inside.

Allows to improve performance where the query compiler can't improve
yet by doing implicit repartitioning.

Parameters
----------
axis : int, optional

Returns
-------
BaseQueryCompiler
The repartitioned BaseQueryCompiler.
"""
if StorageFormat.get() == "Hdk":
# Hdk uses only one partition, it makes
# no sense for it to repartition the dataframe.
return self

list_axis = [0, 1] if axis is None else [axis]

new_query_compiler = self
for _ax in list_axis:
new_query_compiler = new_query_compiler.__constructor__(
new_query_compiler._modin_frame.apply_full_axis(
_ax, lambda df: df, keep_partitioning=False
)
)
return new_query_compiler

# End of DataFrame methods
26 changes: 26 additions & 0 deletions modin/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
"_default_to_pandas",
"_query_compiler",
"_to_pandas",
"_repartition",
"_build_repr_df",
"_reduce_dimension",
"__repr__",
Expand Down Expand Up @@ -3856,6 +3857,31 @@ def values(self): # noqa: RT01, D200
"""
return self.to_numpy()

def _repartition(self, axis: Optional[int] = None):
"""
Repartitioning Modin objects to get ideal partitions inside.

Allows to improve performance where the query compiler can't improve
yet by doing implicit repartitioning.

Parameters
----------
axis : {0, 1}, optional

Returns
-------
DataFrame or Series
The repartitioned dataframe or series, depending on the original type.
"""
allowed_axis_values = (0, 1, None)
if axis not in allowed_axis_values:
raise ValueError(
f"passed `axis` parameter: {axis}, but should be one of {allowed_axis_values}"
)
return self.__constructor__(
query_compiler=self._query_compiler.repartition(axis=axis)
)

@disable_logging
def __getattribute__(self, item):
"""
Expand Down
14 changes: 14 additions & 0 deletions modin/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -2512,6 +2512,20 @@ def _getitem(self, key):
return self._reduce_dimension(result)
return self.__constructor__(query_compiler=result)

def _repartition(self):
"""
Repartitioning Series to get ideal partitions inside.

Allows to improve performance where the query compiler can't improve
yet by doing implicit repartitioning.

Returns
-------
Series
The repartitioned Series.
"""
return super()._repartition(axis=0)

# Persistance support methods - BEGIN
@classmethod
def _inflate_light(cls, query_compiler, name):
Expand Down
57 changes: 57 additions & 0 deletions modin/pandas/test/internals/test_repartition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

import pytest

import modin.pandas as pd


@pytest.mark.parametrize("axis", [0, 1, None])
@pytest.mark.parametrize("dtype", ["DataFrame", "Series"])
def test_repartition(axis, dtype):
if axis in (1, None) and dtype == "Series":
# no sense for Series
return

df = pd.DataFrame({"col1": [1, 2], "col2": [5, 6]})
df2 = pd.DataFrame({"col3": [9, 4]})

df = pd.concat([df, df2], axis=1)
df = pd.concat([df, df], axis=0)

obj = df if dtype == "DataFrame" else df["col1"]

source_shapes = {
"DataFrame": (2, 2),
"Series": (2, 1),
}
# check that the test makes sense
assert obj._query_compiler._modin_frame._partitions.shape == source_shapes[dtype]

kwargs = {"axis": axis} if dtype == "DataFrame" else {}
obj = obj._repartition(**kwargs)

if dtype == "DataFrame":
results = {
None: (1, 1),
0: (1, 2),
1: (2, 1),
}
else:
results = {
None: (1, 1),
0: (1, 1),
1: (2, 1),
}

assert obj._query_compiler._modin_frame._partitions.shape == results[axis]