diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 82c58eaafed..9f3b64909b1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -570,6 +570,7 @@ jobs: sudo docker pull postgres sudo docker run --name some-postgres -e POSTGRES_USER=sa -e POSTGRES_PASSWORD=Strong.Pwd-123 -e POSTGRES_DB=postgres -d -p 2345:5432 postgres - run: MODIN_BENCHMARK_MODE=True mpiexec -n 1 python -m pytest modin/pandas/test/internals/test_benchmark_mode.py + - run: mpiexec -n 1 python -m pytest modin/pandas/test/internals/test_repartition.py - run: mpiexec -n 1 python -m pytest modin/pandas/test/dataframe/test_binary.py - run: mpiexec -n 1 python -m pytest modin/pandas/test/dataframe/test_default.py - run: mpiexec -n 1 python -m pytest modin/pandas/test/dataframe/test_indexing.py @@ -653,6 +654,7 @@ jobs: sudo docker pull postgres sudo docker run --name some-postgres -e POSTGRES_USER=sa -e POSTGRES_PASSWORD=Strong.Pwd-123 -e POSTGRES_DB=postgres -d -p 2345:5432 postgres - run: MODIN_BENCHMARK_MODE=True pytest modin/pandas/test/internals/test_benchmark_mode.py + - run: pytest modin/pandas/test/internals/test_repartition.py - run: pytest -n 2 modin/experimental/xgboost/test/test_default.py - run: pytest -n 2 modin/experimental/xgboost/test/test_xgboost.py if: matrix.engine == 'ray' diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml index 97d6f6999ec..d3d2bbda378 100644 --- a/.github/workflows/push.yml +++ b/.github/workflows/push.yml @@ -197,6 +197,7 @@ jobs: sudo docker pull postgres sudo docker run --name some-postgres -e POSTGRES_USER=sa -e POSTGRES_PASSWORD=Strong.Pwd-123 -e POSTGRES_DB=postgres -d -p 2345:5432 postgres - run: MODIN_BENCHMARK_MODE=True mpiexec -n 1 python -m pytest modin/pandas/test/internals/test_benchmark_mode.py + - run: mpiexec -n 1 python -m pytest modin/pandas/test/internals/test_repartition.py - run: mpiexec -n 1 python -m pytest modin/pandas/test/dataframe/test_binary.py - run: mpiexec -n 1 python -m pytest modin/pandas/test/dataframe/test_default.py - run: mpiexec -n 1 python -m pytest modin/pandas/test/dataframe/test_indexing.py @@ -278,6 +279,8 @@ jobs: run: | sudo docker pull postgres sudo docker run --name some-postgres -e POSTGRES_USER=sa -e POSTGRES_PASSWORD=Strong.Pwd-123 -e POSTGRES_DB=postgres -d -p 2345:5432 postgres + - run: MODIN_BENCHMARK_MODE=True pytest modin/pandas/test/internals/test_benchmark_mode.py + - run: pytest modin/pandas/test/internals/test_repartition.py - run: pytest -n 2 modin/experimental/xgboost/test/test_default.py - run: pytest -n 2 modin/experimental/xgboost/test/test_xgboost.py if: matrix.engine == 'ray' diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index cdb8d3eac2c..8c2daa61e4f 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -2181,6 +2181,7 @@ def apply_full_axis( new_index=None, new_columns=None, dtypes=None, + keep_partitioning=True, ): """ Perform a function across an entire axis. @@ -2201,6 +2202,9 @@ def apply_full_axis( The data types of the result. This is an optimization because there are functions that always result in a particular data type, and allows us to avoid (re)computing it. + keep_partitioning : boolean, default: True + The flag to keep partition boundaries for Modin Frame. + Setting it to True disables shuffling data from one partition to another. Returns ------- @@ -2218,6 +2222,7 @@ def apply_full_axis( new_columns=new_columns, dtypes=dtypes, other=None, + keep_partitioning=keep_partitioning, ) @lazy_metadata_decorator(apply_axis="both") @@ -2608,6 +2613,7 @@ def broadcast_apply_full_axis( apply_indices=None, enumerate_partitions=False, dtypes=None, + keep_partitioning=True, ): """ Broadcast partitions of `other` Modin DataFrame and apply a function along full axis. @@ -2635,6 +2641,9 @@ def broadcast_apply_full_axis( Data types of the result. This is an optimization because there are functions that always result in a particular data type, and allows us to avoid (re)computing it. + keep_partitioning : boolean, default: True + The flag to keep partition boundaries for Modin Frame. + Setting it to True disables shuffling data from one partition to another. Returns ------- @@ -2659,7 +2668,7 @@ def broadcast_apply_full_axis( apply_func=self._build_treereduce_func(axis, func), apply_indices=apply_indices, enumerate_partitions=enumerate_partitions, - keep_partitioning=True, + keep_partitioning=keep_partitioning, ) # Index objects for new object creation. This is shorter than if..else kw = self.__make_init_labels_args(new_partitions, new_index, new_columns) diff --git a/modin/core/storage_formats/base/query_compiler.py b/modin/core/storage_formats/base/query_compiler.py index 12553b2e4a1..b6659526c8d 100644 --- a/modin/core/storage_formats/base/query_compiler.py +++ b/modin/core/storage_formats/base/query_compiler.py @@ -34,6 +34,7 @@ from . import doc_utils from modin.logging import ClassLogger from modin.utils import MODIN_UNNAMED_SERIES_LABEL +from modin.config import StorageFormat from pandas.core.dtypes.common import is_scalar import pandas.core.resample @@ -4766,4 +4767,38 @@ def compare(self, other, align_axis, keep_shape, keep_equal, result_names): result_names=result_names, ) + def repartition(self, axis=None): + """ + Repartitioning QueryCompiler objects to get ideal partitions inside. + + Allows to improve performance where the query compiler can't improve + yet by doing implicit repartitioning. + + Parameters + ---------- + axis : {0, 1, None}, optional + The axis along which the repartitioning occurs. + `None` is used for repartitioning along both axes. + + Returns + ------- + BaseQueryCompiler + The repartitioned BaseQueryCompiler. + """ + if StorageFormat.get() == "Hdk": + # Hdk uses only one partition, it makes + # no sense for it to repartition the dataframe. + return self + + axes = [0, 1] if axis is None else [axis] + + new_query_compiler = self + for _ax in axes: + new_query_compiler = new_query_compiler.__constructor__( + new_query_compiler._modin_frame.apply_full_axis( + _ax, lambda df: df, keep_partitioning=False + ) + ) + return new_query_compiler + # End of DataFrame methods diff --git a/modin/pandas/base.py b/modin/pandas/base.py index f96fe35f49c..e111f21d6df 100644 --- a/modin/pandas/base.py +++ b/modin/pandas/base.py @@ -90,6 +90,7 @@ "_default_to_pandas", "_query_compiler", "_to_pandas", + "_repartition", "_build_repr_df", "_reduce_dimension", "__repr__", @@ -3856,6 +3857,33 @@ def values(self): # noqa: RT01, D200 """ return self.to_numpy() + def _repartition(self, axis: Optional[int] = None): + """ + Repartitioning Modin objects to get ideal partitions inside. + + Allows to improve performance where the query compiler can't improve + yet by doing implicit repartitioning. + + Parameters + ---------- + axis : {0, 1, None}, optional + The axis along which the repartitioning occurs. + `None` is used for repartitioning along both axes. + + Returns + ------- + DataFrame or Series + The repartitioned dataframe or series, depending on the original type. + """ + allowed_axis_values = (0, 1, None) + if axis not in allowed_axis_values: + raise ValueError( + f"Passed `axis` parameter: {axis}, but should be one of {allowed_axis_values}" + ) + return self.__constructor__( + query_compiler=self._query_compiler.repartition(axis=axis) + ) + @disable_logging def __getattribute__(self, item): """ diff --git a/modin/pandas/series.py b/modin/pandas/series.py index c6a48a6d16c..e93a91e0b59 100644 --- a/modin/pandas/series.py +++ b/modin/pandas/series.py @@ -2512,6 +2512,20 @@ def _getitem(self, key): return self._reduce_dimension(result) return self.__constructor__(query_compiler=result) + def _repartition(self): + """ + Repartitioning Series to get ideal partitions inside. + + Allows to improve performance where the query compiler can't improve + yet by doing implicit repartitioning. + + Returns + ------- + Series + The repartitioned Series. + """ + return super()._repartition(axis=0) + # Persistance support methods - BEGIN @classmethod def _inflate_light(cls, query_compiler, name): diff --git a/modin/pandas/test/internals/test_repartition.py b/modin/pandas/test/internals/test_repartition.py new file mode 100644 index 00000000000..389541a9d0d --- /dev/null +++ b/modin/pandas/test/internals/test_repartition.py @@ -0,0 +1,60 @@ +# Licensed to Modin Development Team under one or more contributor license agreements. +# See the NOTICE file distributed with this work for additional information regarding +# copyright ownership. The Modin Development Team licenses this file to you under the +# Apache License, Version 2.0 (the "License"); you may not use this file except in +# compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under +# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific language +# governing permissions and limitations under the License. + +import pytest + +import modin.pandas as pd +from modin.config import NPartitions + +NPartitions.put(4) + + +@pytest.mark.parametrize("axis", [0, 1, None]) +@pytest.mark.parametrize("dtype", ["DataFrame", "Series"]) +def test_repartition(axis, dtype): + if axis in (1, None) and dtype == "Series": + # no sense for Series + return + + df = pd.DataFrame({"col1": [1, 2], "col2": [5, 6]}) + df2 = pd.DataFrame({"col3": [9, 4]}) + + df = pd.concat([df, df2], axis=1) + df = pd.concat([df, df], axis=0) + + obj = df if dtype == "DataFrame" else df["col1"] + + source_shapes = { + "DataFrame": (2, 2), + "Series": (2, 1), + } + # check that the test makes sense + assert obj._query_compiler._modin_frame._partitions.shape == source_shapes[dtype] + + kwargs = {"axis": axis} if dtype == "DataFrame" else {} + obj = obj._repartition(**kwargs) + + if dtype == "DataFrame": + results = { + None: (1, 1), + 0: (1, 2), + 1: (2, 1), + } + else: + results = { + None: (1, 1), + 0: (1, 1), + 1: (2, 1), + } + + assert obj._query_compiler._modin_frame._partitions.shape == results[axis]