diff --git a/transforms.py b/transforms.py index 14f2e5a..3e36b4e 100644 --- a/transforms.py +++ b/transforms.py @@ -90,6 +90,46 @@ def prepare(self): return job_list +class ExtraCodeFunctionMixin: + r""" + A mixin class for providing the functionality to compile and evaluate a + function and an additional, optional code fragment within a separate global environment. + """ + def eval_function(self, function:Union[Callable, str], extra_code:Optional[str]) -> Callable: + r""" + Compile and evaluate the given function and an additional, optional + code fragment within a separate global environment and return the + executable function object. + + Parameters + ---------- + function : Union[Callable, str] + The name of the function or a function object. + + extra_code : Optional[str] + This can contain additional code for the transform function, such as + the definition of a function over multiple lines or split into multiple + functions for readibility. + """ + # create a copy of the global environment for evaluating the extra + # code fragment so as to not pollute the global namespace itself + global_env = globals().copy() + + if type(extra_code) == str: + # compile the code fragment + compiled_extra_code = compile(extra_code, filename='', mode='exec') + # actually evaluate the code within the given namespace to allow + # access to all the defined symbols, such as helper functions that are not defined inline + eval(compiled_extra_code, global_env) + + if isinstance(function, Callable): + evaluated_function = function + else: + evaluated_function = eval(function, global_env) + + return evaluated_function + + class ConcatTransform(Transform, YAMLObject): r""" A transform for concatenating all DataFrames from the given datasets. @@ -279,7 +319,7 @@ def prepare(self): return self.prepare_simple_sequential() -class FunctionTransform(Transform, YAMLObject): +class FunctionTransform(Transform, ExtraCodeFunctionMixin, YAMLObject): r""" A transform for applying a arbitrary function to a whole DataFrame. @@ -291,7 +331,7 @@ class FunctionTransform(Transform, YAMLObject): output_dataset_name: str The name given to the output dataset. - function: Callable + function: Union[Callable[[pandas.DataFrame], pandas.DataFrame], str] The unary function to apply to each DataFrame of the dataset. It takes the full DataFrame as its only argument and returns a DataFrame. @@ -304,7 +344,7 @@ class FunctionTransform(Transform, YAMLObject): yaml_tag = u'!FunctionTransform' def __init__(self, dataset_name:str, output_dataset_name:str - , function:Callable=None + , function:Union[Callable[[pandas.DataFrame], pandas.DataFrame], str]=None , extra_code:Optional[str]=None ): self.dataset_name = dataset_name @@ -318,25 +358,6 @@ def __init__(self, dataset_name:str, output_dataset_name:str self.function = function self.extra_code = extra_code - def eval_function(self) -> Callable: - # create a copy of the global environment for evaluating the extra - # code fragment so as to not pollute the global namespace itself - global_env = globals().copy() - - if type(self.extra_code) == str: - # compile the code fragment - extra_code = compile(self.extra_code, filename='', mode='exec') - # actually evaluate the code within the given namespace to allow - # access to all the defined symbols, such as helper functions that are not defined inline - eval(extra_code, global_env) - - if isinstance(self.function, Callable): - function = self.function - else: - function = eval(self.function, global_env) - - return function - def process(self, data, attributes) -> pd.DataFrame: if data is None or (not data is None and data.empty): return pd.DataFrame() @@ -345,7 +366,7 @@ def process(self, data, attributes) -> pd.DataFrame: # extra_code in a separate global namespace. # The compilation of the extra code has to happen in the thread/process # of the processing worker since code objects can't be serialized. - function = self.eval_function() + function = self.eval_function(self.function, self.extra_code) result = function(data) @@ -367,7 +388,7 @@ def prepare(self): return job_list -class ColumnFunctionTransform(Transform, YAMLObject): +class ColumnFunctionTransform(Transform, ExtraCodeFunctionMixin, YAMLObject): r""" A transform for applying a function to every value in a column of a DataFrame @@ -386,25 +407,43 @@ class ColumnFunctionTransform(Transform, YAMLObject): the name given to the output column containing the results of applying the function - function: Callable - the unary function to apply to the values in the chosen column + function: Union[Callable[[pandas.Series], pandas.Series], str] + The unary function to apply to the values in the chosen column. + extra_code: Optional[str] + This can contain additional code for the transform function, such as + the definition of a function over multiple lines or split into multiple + functions for readibility. """ yaml_tag = u'!ColumnFunctionTransform' def __init__(self, dataset_name:str, output_dataset_name:str , input_column:str, output_column:str - , function:Callable=pd.Series.mean): + , function:Union[Callable[[pandas.Series], pandas.Series], str]=None + , extra_code:Optional[str]=None + ): self.dataset_name = dataset_name self.output_dataset_name = output_dataset_name self.input_column = input_column self.output_column = output_column + if not function: + msg = f'No processing function has been defined for ColumnFunctionTransform!' + loge(msg) + raise(TypeError(msg)) + self.function = function + self.extra_code = extra_code + + def process(self, data, attributes): + # Get the function to call and possibly compile and evaluate the code defined in + # extra_code in a separate global namespace. + # The compilation of the extra code has to happen in the thread/process + # of the processing worker since code objects can't be serialized. + function = self.eval_function(self.function, None) - def process(self, data, function, attributes): data[self.output_column] = data[self.input_column].apply(function) logd(f'ColumnFunctionTransform result:\n{data}') return data @@ -412,15 +451,10 @@ def process(self, data, function, attributes): def prepare(self): data_list = self.get_data(self.dataset_name) - if isinstance(self.function, Callable): - function = self.function - else: - function = eval(self.function) - job_list = [] for data, attributes in data_list: - job = dask.delayed(self.process)(data, function, attributes) + job = dask.delayed(self.process)(data, attributes) job_list.append((job, attributes)) # allow other tasks to depend on the output of the delayed jobs @@ -428,7 +462,7 @@ def prepare(self): return job_list -class GroupedAggregationTransform(Transform, YAMLObject): +class GroupedAggregationTransform(Transform, ExtraCodeFunctionMixin, YAMLObject): r""" A transform for dividing a dataset into distinct partitions with `pandas.DataFrame.groupby @@ -461,12 +495,14 @@ class GroupedAggregationTransform(Transform, YAMLObject): pre_concatenate: bool concatenate all input DataFrames before processing - extra_code: Optional[str] - this allows specifying additional code, like a more complex transform function + aggregation_function: Union[Callable[[pandas.Series], object], str] + The unary function to apply to a each partition. Should expect an + `pandas.Series` as argument and return a scalar value. - aggregation_function: Callable - the unary function to apply to a each partition. Should expect an - `pandas.Series` as argument and return a scalar. + extra_code: Optional[str] + This can contain additional code for the transform function, such as + the definition of a function over multiple lines or split into multiple + functions for readibility. timestamp_selector: Callable the function to select the row in the partition data as template for the output in case of aggregation @@ -478,8 +514,8 @@ def __init__(self, dataset_name:str, output_dataset_name:str , grouping_columns:List , raw:bool=False , pre_concatenate:bool=False + , aggregation_function:Union[Callable[[pandas.Series], object], str]=None , extra_code:Optional[str]=None - , aggregation_function:Callable=pd.Series.mean , timestamp_selector:Callable=pd.DataFrame.head): self.dataset_name = dataset_name self.output_dataset_name = output_dataset_name @@ -488,32 +524,30 @@ def __init__(self, dataset_name:str, output_dataset_name:str self.output_column = output_column self.grouping_columns = grouping_columns - self.extra_code = extra_code + + if not aggregation_function: + msg = f'No aggregation_function has been defined for GroupedAggregationTransform!' + loge(msg) + raise(TypeError(msg)) + self.aggregation_function = aggregation_function + self.extra_code = extra_code + self.timestamp_selector = timestamp_selector self.raw = raw self.pre_concatenate = pre_concatenate def aggregate_frame(self, data): - # create a copy of the global environment for evaluating the extra - # code fragment so as to not pollute the global namespace itself - global_env = globals().copy() - locals_env = locals().copy() - if (data.empty): logw(f'GroupedAggregationTransform return is empty!') return pd.DataFrame() - if type(self.extra_code) == str: - # compile the code fragment - self.extra_code = compile(self.extra_code, filename='', mode='exec') - # actually evaluate the code within the given namespace - eval(self.extra_code, global_env, locals_env) - - if type(self.aggregation_function) == str: - # evaluate the expression within the given environment - self.aggregation_function = eval(self.aggregation_function, global_env, locals_env) + # Get the function to call and possibly compile and evaluate the code defined in + # extra_code in a separate global namespace. + # The compilation of the extra code has to happen in the thread/process + # of the processing worker since code objects can't be serialized. + aggregation_function = self.eval_function(self.aggregation_function, self.extra_code) if len(self.grouping_columns) == 1: grouping_columns = self.grouping_columns[0] @@ -522,7 +556,7 @@ def aggregate_frame(self, data): result_list = [] for group_key, group_data in data.groupby(by=grouping_columns, sort=False, observed=True): - result = self.aggregation_function(group_data[self.input_column]) + result = aggregation_function(group_data[self.input_column]) if self.raw: result_list.append((group_key, result)) @@ -564,7 +598,7 @@ def prepare(self): return jobs -class GroupedFunctionTransform(Transform, YAMLObject): +class GroupedFunctionTransform(Transform, ExtraCodeFunctionMixin, YAMLObject): r""" A transform for dividing a dataset into distinct partitions with `pandas.DataFrame.groupby @@ -599,12 +633,14 @@ class GroupedFunctionTransform(Transform, YAMLObject): pre_concatenate: bool concatenate all input DataFrames before processing - extra_code: Optional[str] - this allows specifying additional code, like a more complex transform function + transform_function: Union[Callable[[pandas.DataFrame], pandas.DataFrame], Callable[[pandas.DataFrame], object], str] + The unary function to apply to a each partition. Should expect an + `pandas.DataFrame` as argument and return a `pandas.DataFrame` (or an arbitrary object if `raw` is true). - transform_function: Union[Callable[[pandas.DataFrame], pandas.DataFrame], Callable[[pandas.DataFrame], object]] - the unary function to apply to a each partition. Should expect an - `pandas.DataFrame` as argument and return a `pandas.DataFrame`. + extra_code: Optional[str] + This can contain additional code for the transform function, such as + the definition of a function over multiple lines or split into multiple + functions for readibility. timestamp_selector: Callable the function to select the row in the partition data as template for the output in case of aggregation @@ -617,8 +653,8 @@ def __init__(self, dataset_name:str, output_dataset_name:str , raw:bool=False , aggregate:bool=False , pre_concatenate:bool=False + , transform_function:Union[Callable[[pandas.DataFrame], pandas.DataFrame], Callable[[pandas.DataFrame], object], str]=None , extra_code:Optional[str]=None - , transform_function:Callable=pd.DataFrame.head , timestamp_selector:Callable=pd.DataFrame.head): self.dataset_name = dataset_name self.output_dataset_name = output_dataset_name @@ -627,8 +663,15 @@ def __init__(self, dataset_name:str, output_dataset_name:str self.output_column = output_column self.grouping_columns = grouping_columns - self.extra_code = extra_code + + if not transform_function: + msg = f'No transform_function has been defined for GroupedFunctionTransform!' + loge(msg) + raise(TypeError(msg)) + self.transform_function = transform_function + self.extra_code = extra_code + self.timestamp_selector = timestamp_selector self.raw = raw @@ -637,28 +680,17 @@ def __init__(self, dataset_name:str, output_dataset_name:str def aggregate_frame(self, data): if data.empty: - return data - # create a copy of the global environment for evaluating the extra - # code fragment so as to not pollute the global namespace itself - global_env = globals().copy() - locals_env = locals().copy() - - if (data.empty): logw(f'GroupedFunctionTransform return is empty!') return pd.DataFrame() logd(f'{data=}') # logd(f'{data.hour.unique()=}') - if type(self.extra_code) == str: - # compile the code fragment - self.extra_code = compile(self.extra_code, filename='', mode='exec') - # actually evaluate the code within the given namespace - eval(self.extra_code, global_env, locals_env) - - if type(self.transform_function) == str: - # evaluate the expression within the given environment - self.transform_function = eval(self.transform_function, global_env, locals_env) + # Get the function to call and possibly compile and evaluate the code defined in + # extra_code in a separate global namespace. + # The compilation of the extra code has to happen in the thread/process + # of the processing worker since code objects can't be serialized. + transform_function = self.eval_function(self.transform_function, self.extra_code) if len(self.grouping_columns) == 1: grouping_columns = self.grouping_columns[0] @@ -667,7 +699,7 @@ def aggregate_frame(self, data): result_list = [] for group_key, group_data in data.groupby(by=grouping_columns, sort=False, observed=True): - result = self.transform_function(group_data) + result = transform_function(group_data) if self.raw: # just append the keys for the subset and the transformed DataFrame