Skip to content

Commit

Permalink
transforms: Refactor the compilation and evaluation of additional cod…
Browse files Browse the repository at this point in the history
…e fragments

into a separate mixin class
  • Loading branch information
hagau committed May 28, 2024
1 parent 8c8064f commit 7dcc9bd
Showing 1 changed file with 114 additions and 82 deletions.
196 changes: 114 additions & 82 deletions transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,46 @@ def prepare(self):
return job_list


class ExtraCodeFunctionMixin:
r"""
A mixin class for providing the functionality to compile and evaluate a
function and an additional, optional code fragment within a separate global environment.
"""
def eval_function(self, function:Union[Callable, str], extra_code:Optional[str]) -> Callable:
r"""
Compile and evaluate the given function and an additional, optional
code fragment within a separate global environment and return the
executable function object.
Parameters
----------
function : Union[Callable, str]
The name of the function or a function object.
extra_code : Optional[str]
This can contain additional code for the transform function, such as
the definition of a function over multiple lines or split into multiple
functions for readibility.
"""
# create a copy of the global environment for evaluating the extra
# code fragment so as to not pollute the global namespace itself
global_env = globals().copy()

if type(extra_code) == str:
# compile the code fragment
compiled_extra_code = compile(extra_code, filename='<string>', mode='exec')
# actually evaluate the code within the given namespace to allow
# access to all the defined symbols, such as helper functions that are not defined inline
eval(compiled_extra_code, global_env)

if isinstance(function, Callable):
evaluated_function = function
else:
evaluated_function = eval(function, global_env)

return evaluated_function


class ConcatTransform(Transform, YAMLObject):
r"""
A transform for concatenating all DataFrames from the given datasets.
Expand Down Expand Up @@ -279,7 +319,7 @@ def prepare(self):
return self.prepare_simple_sequential()


class FunctionTransform(Transform, YAMLObject):
class FunctionTransform(Transform, ExtraCodeFunctionMixin, YAMLObject):
r"""
A transform for applying a arbitrary function to a whole DataFrame.
Expand All @@ -291,7 +331,7 @@ class FunctionTransform(Transform, YAMLObject):
output_dataset_name: str
The name given to the output dataset.
function: Callable
function: Union[Callable[[pandas.DataFrame], pandas.DataFrame], str]
The unary function to apply to each DataFrame of the dataset.
It takes the full DataFrame as its only argument and returns a DataFrame.
Expand All @@ -304,7 +344,7 @@ class FunctionTransform(Transform, YAMLObject):
yaml_tag = u'!FunctionTransform'

def __init__(self, dataset_name:str, output_dataset_name:str
, function:Callable=None
, function:Union[Callable[[pandas.DataFrame], pandas.DataFrame], str]=None
, extra_code:Optional[str]=None
):
self.dataset_name = dataset_name
Expand All @@ -318,25 +358,6 @@ def __init__(self, dataset_name:str, output_dataset_name:str
self.function = function
self.extra_code = extra_code

def eval_function(self) -> Callable:
# create a copy of the global environment for evaluating the extra
# code fragment so as to not pollute the global namespace itself
global_env = globals().copy()

if type(self.extra_code) == str:
# compile the code fragment
extra_code = compile(self.extra_code, filename='<string>', mode='exec')
# actually evaluate the code within the given namespace to allow
# access to all the defined symbols, such as helper functions that are not defined inline
eval(extra_code, global_env)

if isinstance(self.function, Callable):
function = self.function
else:
function = eval(self.function, global_env)

return function

def process(self, data, attributes) -> pd.DataFrame:
if data is None or (not data is None and data.empty):
return pd.DataFrame()
Expand All @@ -345,7 +366,7 @@ def process(self, data, attributes) -> pd.DataFrame:
# extra_code in a separate global namespace.
# The compilation of the extra code has to happen in the thread/process
# of the processing worker since code objects can't be serialized.
function = self.eval_function()
function = self.eval_function(self.function, self.extra_code)

result = function(data)

Expand All @@ -367,7 +388,7 @@ def prepare(self):
return job_list


class ColumnFunctionTransform(Transform, YAMLObject):
class ColumnFunctionTransform(Transform, ExtraCodeFunctionMixin, YAMLObject):
r"""
A transform for applying a function to every value in a column of a DataFrame
Expand All @@ -386,49 +407,62 @@ class ColumnFunctionTransform(Transform, YAMLObject):
the name given to the output column containing the results of applying
the function
function: Callable
the unary function to apply to the values in the chosen column
function: Union[Callable[[pandas.Series], pandas.Series], str]
The unary function to apply to the values in the chosen column.
extra_code: Optional[str]
This can contain additional code for the transform function, such as
the definition of a function over multiple lines or split into multiple
functions for readibility.
"""

yaml_tag = u'!ColumnFunctionTransform'

def __init__(self, dataset_name:str, output_dataset_name:str
, input_column:str, output_column:str
, function:Callable=pd.Series.mean):
, function:Union[Callable[[pandas.Series], pandas.Series], str]=None
, extra_code:Optional[str]=None
):
self.dataset_name = dataset_name
self.output_dataset_name = output_dataset_name

self.input_column = input_column
self.output_column = output_column

if not function:
msg = f'No processing function has been defined for ColumnFunctionTransform!'
loge(msg)
raise(TypeError(msg))

self.function = function
self.extra_code = extra_code

def process(self, data, attributes):
# Get the function to call and possibly compile and evaluate the code defined in
# extra_code in a separate global namespace.
# The compilation of the extra code has to happen in the thread/process
# of the processing worker since code objects can't be serialized.
function = self.eval_function(self.function, None)

def process(self, data, function, attributes):
data[self.output_column] = data[self.input_column].apply(function)
logd(f'ColumnFunctionTransform result:\n{data}')
return data

def prepare(self):
data_list = self.get_data(self.dataset_name)

if isinstance(self.function, Callable):
function = self.function
else:
function = eval(self.function)

job_list = []

for data, attributes in data_list:
job = dask.delayed(self.process)(data, function, attributes)
job = dask.delayed(self.process)(data, attributes)
job_list.append((job, attributes))

# allow other tasks to depend on the output of the delayed jobs
self.data_repo[self.output_dataset_name] = job_list

return job_list

class GroupedAggregationTransform(Transform, YAMLObject):
class GroupedAggregationTransform(Transform, ExtraCodeFunctionMixin, YAMLObject):
r"""
A transform for dividing a dataset into distinct partitions with
`pandas.DataFrame.groupby
Expand Down Expand Up @@ -461,12 +495,14 @@ class GroupedAggregationTransform(Transform, YAMLObject):
pre_concatenate: bool
concatenate all input DataFrames before processing
extra_code: Optional[str]
this allows specifying additional code, like a more complex transform function
aggregation_function: Union[Callable[[pandas.Series], object], str]
The unary function to apply to a each partition. Should expect an
`pandas.Series` as argument and return a scalar value.
aggregation_function: Callable
the unary function to apply to a each partition. Should expect an
`pandas.Series` as argument and return a scalar.
extra_code: Optional[str]
This can contain additional code for the transform function, such as
the definition of a function over multiple lines or split into multiple
functions for readibility.
timestamp_selector: Callable
the function to select the row in the partition data as template for the output in case of aggregation
Expand All @@ -478,8 +514,8 @@ def __init__(self, dataset_name:str, output_dataset_name:str
, grouping_columns:List
, raw:bool=False
, pre_concatenate:bool=False
, aggregation_function:Union[Callable[[pandas.Series], object], str]=None
, extra_code:Optional[str]=None
, aggregation_function:Callable=pd.Series.mean
, timestamp_selector:Callable=pd.DataFrame.head):
self.dataset_name = dataset_name
self.output_dataset_name = output_dataset_name
Expand All @@ -488,32 +524,30 @@ def __init__(self, dataset_name:str, output_dataset_name:str
self.output_column = output_column

self.grouping_columns = grouping_columns
self.extra_code = extra_code

if not aggregation_function:
msg = f'No aggregation_function has been defined for GroupedAggregationTransform!'
loge(msg)
raise(TypeError(msg))

self.aggregation_function = aggregation_function
self.extra_code = extra_code

self.timestamp_selector = timestamp_selector

self.raw = raw
self.pre_concatenate = pre_concatenate

def aggregate_frame(self, data):
# create a copy of the global environment for evaluating the extra
# code fragment so as to not pollute the global namespace itself
global_env = globals().copy()
locals_env = locals().copy()

if (data.empty):
logw(f'GroupedAggregationTransform return is empty!')
return pd.DataFrame()

if type(self.extra_code) == str:
# compile the code fragment
self.extra_code = compile(self.extra_code, filename='<string>', mode='exec')
# actually evaluate the code within the given namespace
eval(self.extra_code, global_env, locals_env)

if type(self.aggregation_function) == str:
# evaluate the expression within the given environment
self.aggregation_function = eval(self.aggregation_function, global_env, locals_env)
# Get the function to call and possibly compile and evaluate the code defined in
# extra_code in a separate global namespace.
# The compilation of the extra code has to happen in the thread/process
# of the processing worker since code objects can't be serialized.
aggregation_function = self.eval_function(self.aggregation_function, self.extra_code)

if len(self.grouping_columns) == 1:
grouping_columns = self.grouping_columns[0]
Expand All @@ -522,7 +556,7 @@ def aggregate_frame(self, data):

result_list = []
for group_key, group_data in data.groupby(by=grouping_columns, sort=False, observed=True):
result = self.aggregation_function(group_data[self.input_column])
result = aggregation_function(group_data[self.input_column])

if self.raw:
result_list.append((group_key, result))
Expand Down Expand Up @@ -564,7 +598,7 @@ def prepare(self):
return jobs


class GroupedFunctionTransform(Transform, YAMLObject):
class GroupedFunctionTransform(Transform, ExtraCodeFunctionMixin, YAMLObject):
r"""
A transform for dividing a dataset into distinct partitions with
`pandas.DataFrame.groupby
Expand Down Expand Up @@ -599,12 +633,14 @@ class GroupedFunctionTransform(Transform, YAMLObject):
pre_concatenate: bool
concatenate all input DataFrames before processing
extra_code: Optional[str]
this allows specifying additional code, like a more complex transform function
transform_function: Union[Callable[[pandas.DataFrame], pandas.DataFrame], Callable[[pandas.DataFrame], object], str]
The unary function to apply to a each partition. Should expect an
`pandas.DataFrame` as argument and return a `pandas.DataFrame` (or an arbitrary object if `raw` is true).
transform_function: Union[Callable[[pandas.DataFrame], pandas.DataFrame], Callable[[pandas.DataFrame], object]]
the unary function to apply to a each partition. Should expect an
`pandas.DataFrame` as argument and return a `pandas.DataFrame`.
extra_code: Optional[str]
This can contain additional code for the transform function, such as
the definition of a function over multiple lines or split into multiple
functions for readibility.
timestamp_selector: Callable
the function to select the row in the partition data as template for the output in case of aggregation
Expand All @@ -617,8 +653,8 @@ def __init__(self, dataset_name:str, output_dataset_name:str
, raw:bool=False
, aggregate:bool=False
, pre_concatenate:bool=False
, transform_function:Union[Callable[[pandas.DataFrame], pandas.DataFrame], Callable[[pandas.DataFrame], object], str]=None
, extra_code:Optional[str]=None
, transform_function:Callable=pd.DataFrame.head
, timestamp_selector:Callable=pd.DataFrame.head):
self.dataset_name = dataset_name
self.output_dataset_name = output_dataset_name
Expand All @@ -627,8 +663,15 @@ def __init__(self, dataset_name:str, output_dataset_name:str
self.output_column = output_column

self.grouping_columns = grouping_columns
self.extra_code = extra_code

if not transform_function:
msg = f'No transform_function has been defined for GroupedFunctionTransform!'
loge(msg)
raise(TypeError(msg))

self.transform_function = transform_function
self.extra_code = extra_code

self.timestamp_selector = timestamp_selector

self.raw = raw
Expand All @@ -637,28 +680,17 @@ def __init__(self, dataset_name:str, output_dataset_name:str

def aggregate_frame(self, data):
if data.empty:
return data
# create a copy of the global environment for evaluating the extra
# code fragment so as to not pollute the global namespace itself
global_env = globals().copy()
locals_env = locals().copy()

if (data.empty):
logw(f'GroupedFunctionTransform return is empty!')
return pd.DataFrame()

logd(f'{data=}')
# logd(f'{data.hour.unique()=}')

if type(self.extra_code) == str:
# compile the code fragment
self.extra_code = compile(self.extra_code, filename='<string>', mode='exec')
# actually evaluate the code within the given namespace
eval(self.extra_code, global_env, locals_env)

if type(self.transform_function) == str:
# evaluate the expression within the given environment
self.transform_function = eval(self.transform_function, global_env, locals_env)
# Get the function to call and possibly compile and evaluate the code defined in
# extra_code in a separate global namespace.
# The compilation of the extra code has to happen in the thread/process
# of the processing worker since code objects can't be serialized.
transform_function = self.eval_function(self.transform_function, self.extra_code)

if len(self.grouping_columns) == 1:
grouping_columns = self.grouping_columns[0]
Expand All @@ -667,7 +699,7 @@ def aggregate_frame(self, data):

result_list = []
for group_key, group_data in data.groupby(by=grouping_columns, sort=False, observed=True):
result = self.transform_function(group_data)
result = transform_function(group_data)

if self.raw:
# just append the keys for the subset and the transformed DataFrame
Expand Down

0 comments on commit 7dcc9bd

Please sign in to comment.