Skip to content

Commit

Permalink
black
Browse files Browse the repository at this point in the history
  • Loading branch information
arunjose696 committed Dec 26, 2023
1 parent 28a4f1d commit 9e5efcc
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 33 deletions.
61 changes: 41 additions & 20 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@

from modin.logging import ClassLogger
from modin.pandas.indexing import is_range_like
from modin.pandas.utils import check_both_not_none, is_full_grab_slice,apply_function_on_selected_items
from modin.pandas.utils import (
apply_function_on_selected_items,
check_both_not_none,
is_full_grab_slice,
)
from modin.utils import MODIN_UNNAMED_SERIES_LABEL


Expand Down Expand Up @@ -188,10 +192,16 @@ def row_lengths(self):
if self._row_lengths_cache is None:
if len(self._partitions.T) > 0:
row_parts = self._partitions.T[0]
self._row_lengths_cache = [part.length(materialize=False) for part in row_parts]
filter_condition = self._partition_mgr_cls._execution_wrapper.check_is_future
apply_function_on_selected_items(self._row_lengths_cache,filter_condition,self.materialize_func)

self._row_lengths_cache = [
part.length(materialize=False) for part in row_parts
]
filter_condition = (
self._partition_mgr_cls._execution_wrapper.check_is_future
)
apply_function_on_selected_items(
self._row_lengths_cache, filter_condition, self.materialize_func
)

else:
self._row_lengths_cache = []
return self._row_lengths_cache
Expand Down Expand Up @@ -220,14 +230,20 @@ def column_widths(self):
list
A list of column partitions widths.
"""

if self._column_widths_cache is None:
if len(self._partitions) > 0:
if len(self._partitions) > 0:
col_parts = self._partitions[0]

self._column_widths_cache = [part.width(materialize=False) for part in col_parts]
filter_condition = self._partition_mgr_cls._execution_wrapper.check_is_future
apply_function_on_selected_items(self._column_widths_cache,filter_condition,self.materialize_func)

self._column_widths_cache = [
part.width(materialize=False) for part in col_parts
]
filter_condition = (
self._partition_mgr_cls._execution_wrapper.check_is_future
)
apply_function_on_selected_items(
self._column_widths_cache, filter_condition, self.materialize_func
)
else:
self._column_widths_cache = []
return self._column_widths_cache
Expand Down Expand Up @@ -3671,9 +3687,13 @@ def _compute_new_widths():
else:
new_lengths = None
break
filter_condition = self._partition_mgr_cls._execution_wrapper.check_is_future
apply_function_on_selected_items(new_lengths,filter_condition,self.materialize_func)

filter_condition = (
self._partition_mgr_cls._execution_wrapper.check_is_future
)
apply_function_on_selected_items(
new_lengths, filter_condition, self.materialize_func
)

else:
if all(obj.has_materialized_columns for obj in (self, *others)):
new_columns = self.columns.append([other.columns for other in others])
Expand All @@ -3698,12 +3718,13 @@ def _compute_new_widths():
else:
new_widths = None
break
filter_condition = self._partition_mgr_cls._execution_wrapper.check_is_future
apply_function_on_selected_items(new_widths,filter_condition,self.materialize_func)




filter_condition = (
self._partition_mgr_cls._execution_wrapper.check_is_future
)
apply_function_on_selected_items(
new_widths, filter_condition, self.materialize_func
)

return self.__constructor__(
new_partitions, new_index, new_columns, new_lengths, new_widths, new_dtypes
)
Expand Down
6 changes: 3 additions & 3 deletions modin/core/execution/dask/common/engine_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
from collections import UserDict

from dask.distributed import wait
from distributed.client import default_client
from distributed import Future
from distributed.client import default_client


def _deploy_dask_func(func, *args, **kwargs): # pragma: no cover
"""
Expand Down Expand Up @@ -89,11 +90,10 @@ def deploy(
for i in range(num_returns)
]
return remote_task_future

@classmethod
def check_is_future(cls, item):
return isinstance(item, Future)


@classmethod
def materialize(cls, future):
Expand Down
3 changes: 2 additions & 1 deletion modin/core/execution/python/common/engine_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1):
args = [] if f_args is None else f_args
kwargs = {} if f_kwargs is None else f_kwargs
return func(*args, **kwargs)

@classmethod
def check_is_future(cls, item):
return False

@classmethod
def materialize(cls, obj_id):
"""
Expand Down
4 changes: 3 additions & 1 deletion modin/core/execution/ray/common/engine_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1):
return _deploy_ray_func.options(num_returns=num_returns).remote(
func, *args, **kwargs
)

@classmethod
def check_is_future(cls, item):
return isinstance(item, ray.ObjectRef)
return isinstance(item, ray.ObjectRef)

@classmethod
def materialize(cls, obj_id):
"""
Expand Down
3 changes: 2 additions & 1 deletion modin/core/execution/unidist/common/engine_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@ def deploy(cls, func, f_args=None, f_kwargs=None, num_returns=1):
return _deploy_unidist_func.options(num_returns=num_returns).remote(
func, *args, **kwargs
)

@classmethod
def check_is_future(cls, item):
return unidist.is_object_ref(item)

@classmethod
def materialize(cls, obj_id):
"""
Expand Down
13 changes: 6 additions & 7 deletions modin/pandas/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,11 +429,12 @@ def _doc_binary_op(operation, bin_op, left="Series", right="right", returns="Ser

return doc_op


def apply_function_on_selected_items(input_list, filter_condition, func_apply):
breakpoint()
filtered_list = []
filterd_masks=[False]*len(input_list)
for idx,item in enumerate(input_list):
input_list_length = len(input_list) if input_list else 0
filterd_masks = [False] * input_list_length
for idx, item in enumerate(input_list):
if filter_condition(item):
filterd_masks[idx] = True
filtered_list.append(item)
Expand All @@ -442,10 +443,8 @@ def apply_function_on_selected_items(input_list, filter_condition, func_apply):
filtered_list.reverse()
for i in range(len(input_list)):
if filterd_masks[i]:
input_list[i]=filtered_list.pop()



input_list[i] = filtered_list.pop()


_original_pandas_MultiIndex_from_frame = pandas.MultiIndex.from_frame
pandas.MultiIndex.from_frame = from_modin_frame_to_mi

0 comments on commit 9e5efcc

Please sign in to comment.