diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 3bc4fb521e0..3a4e5885b3b 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -56,7 +56,11 @@ from modin.logging import ClassLogger from modin.pandas.indexing import is_range_like -from modin.pandas.utils import check_both_not_none, is_full_grab_slice,apply_function_on_selected_items +from modin.pandas.utils import ( + apply_function_on_selected_items, + check_both_not_none, + is_full_grab_slice, +) from modin.utils import MODIN_UNNAMED_SERIES_LABEL @@ -188,10 +192,16 @@ def row_lengths(self): if self._row_lengths_cache is None: if len(self._partitions.T) > 0: row_parts = self._partitions.T[0] - self._row_lengths_cache = [part.length(materialize=False) for part in row_parts] - filter_condition = self._partition_mgr_cls._execution_wrapper.check_is_future - apply_function_on_selected_items(self._row_lengths_cache,filter_condition,self.materialize_func) - + self._row_lengths_cache = [ + part.length(materialize=False) for part in row_parts + ] + filter_condition = ( + self._partition_mgr_cls._execution_wrapper.check_is_future + ) + apply_function_on_selected_items( + self._row_lengths_cache, filter_condition, self.materialize_func + ) + else: self._row_lengths_cache = [] return self._row_lengths_cache @@ -220,14 +230,20 @@ def column_widths(self): list A list of column partitions widths. """ - + if self._column_widths_cache is None: - if len(self._partitions) > 0: + if len(self._partitions) > 0: col_parts = self._partitions[0] - - self._column_widths_cache = [part.width(materialize=False) for part in col_parts] - filter_condition = self._partition_mgr_cls._execution_wrapper.check_is_future - apply_function_on_selected_items(self._column_widths_cache,filter_condition,self.materialize_func) + + self._column_widths_cache = [ + part.width(materialize=False) for part in col_parts + ] + filter_condition = ( + self._partition_mgr_cls._execution_wrapper.check_is_future + ) + apply_function_on_selected_items( + self._column_widths_cache, filter_condition, self.materialize_func + ) else: self._column_widths_cache = [] return self._column_widths_cache @@ -3671,9 +3687,13 @@ def _compute_new_widths(): else: new_lengths = None break - filter_condition = self._partition_mgr_cls._execution_wrapper.check_is_future - apply_function_on_selected_items(new_lengths,filter_condition,self.materialize_func) - + filter_condition = ( + self._partition_mgr_cls._execution_wrapper.check_is_future + ) + apply_function_on_selected_items( + new_lengths, filter_condition, self.materialize_func + ) + else: if all(obj.has_materialized_columns for obj in (self, *others)): new_columns = self.columns.append([other.columns for other in others]) @@ -3698,12 +3718,13 @@ def _compute_new_widths(): else: new_widths = None break - filter_condition = self._partition_mgr_cls._execution_wrapper.check_is_future - apply_function_on_selected_items(new_widths,filter_condition,self.materialize_func) - - - - + filter_condition = ( + self._partition_mgr_cls._execution_wrapper.check_is_future + ) + apply_function_on_selected_items( + new_widths, filter_condition, self.materialize_func + ) + return self.__constructor__( new_partitions, new_index, new_columns, new_lengths, new_widths, new_dtypes ) diff --git a/modin/core/execution/dask/common/engine_wrapper.py b/modin/core/execution/dask/common/engine_wrapper.py index 027a3233621..145b9efafbf 100644 --- a/modin/core/execution/dask/common/engine_wrapper.py +++ b/modin/core/execution/dask/common/engine_wrapper.py @@ -16,8 +16,9 @@ from collections import UserDict from dask.distributed import wait -from distributed.client import default_client from distributed import Future +from distributed.client import default_client + def _deploy_dask_func(func, *args, **kwargs): # pragma: no cover """ @@ -89,11 +90,10 @@ def deploy( for i in range(num_returns) ] return remote_task_future - + @classmethod def check_is_future(cls, item): return isinstance(item, Future) - @classmethod def materialize(cls, future): diff --git a/modin/core/execution/python/common/engine_wrapper.py b/modin/core/execution/python/common/engine_wrapper.py index a3bc071a1fd..b1e949f10ab 100644 --- a/modin/core/execution/python/common/engine_wrapper.py +++ b/modin/core/execution/python/common/engine_wrapper.py @@ -40,10 +40,11 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1): args = [] if f_args is None else f_args kwargs = {} if f_kwargs is None else f_kwargs return func(*args, **kwargs) - + @classmethod def check_is_future(cls, item): return False + @classmethod def materialize(cls, obj_id): """ diff --git a/modin/core/execution/ray/common/engine_wrapper.py b/modin/core/execution/ray/common/engine_wrapper.py index 6972050323e..8c32c2724c1 100644 --- a/modin/core/execution/ray/common/engine_wrapper.py +++ b/modin/core/execution/ray/common/engine_wrapper.py @@ -73,9 +73,11 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1): return _deploy_ray_func.options(num_returns=num_returns).remote( func, *args, **kwargs ) + @classmethod def check_is_future(cls, item): - return isinstance(item, ray.ObjectRef) + return isinstance(item, ray.ObjectRef) + @classmethod def materialize(cls, obj_id): """ diff --git a/modin/core/execution/unidist/common/engine_wrapper.py b/modin/core/execution/unidist/common/engine_wrapper.py index 71f5aa615d4..fea5bc6ced2 100644 --- a/modin/core/execution/unidist/common/engine_wrapper.py +++ b/modin/core/execution/unidist/common/engine_wrapper.py @@ -73,10 +73,11 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1): return _deploy_unidist_func.options(num_returns=num_returns).remote( func, *args, **kwargs ) + @classmethod def check_is_future(cls, item): return unidist.is_object_ref(item) - + @classmethod def materialize(cls, obj_id): """ diff --git a/modin/pandas/utils.py b/modin/pandas/utils.py index 699d9a5cb61..62e159ba9fe 100644 --- a/modin/pandas/utils.py +++ b/modin/pandas/utils.py @@ -429,11 +429,12 @@ def _doc_binary_op(operation, bin_op, left="Series", right="right", returns="Ser return doc_op + def apply_function_on_selected_items(input_list, filter_condition, func_apply): - breakpoint() filtered_list = [] - filterd_masks=[False]*len(input_list) - for idx,item in enumerate(input_list): + input_list_length = len(input_list) if input_list else 0 + filterd_masks = [False] * input_list_length + for idx, item in enumerate(input_list): if filter_condition(item): filterd_masks[idx] = True filtered_list.append(item) @@ -442,10 +443,8 @@ def apply_function_on_selected_items(input_list, filter_condition, func_apply): filtered_list.reverse() for i in range(len(input_list)): if filterd_masks[i]: - input_list[i]=filtered_list.pop() - - - + input_list[i] = filtered_list.pop() + _original_pandas_MultiIndex_from_frame = pandas.MultiIndex.from_frame pandas.MultiIndex.from_frame = from_modin_frame_to_mi