From 81fa8017fbd25352fd87961bc4dcc1406c4e0dda Mon Sep 17 00:00:00 2001 From: Alessandro Berti Date: Thu, 22 Feb 2024 07:08:00 +0100 Subject: [PATCH] feat(pm4py): refactoring POWL object --- pm4py/objects/powl/obj.py | 83 +++++++++++++++------------------------ 1 file changed, 32 insertions(+), 51 deletions(-) diff --git a/pm4py/objects/powl/obj.py b/pm4py/objects/powl/obj.py index 13eab3755..9f78eb0e8 100644 --- a/pm4py/objects/powl/obj.py +++ b/pm4py/objects/powl/obj.py @@ -2,9 +2,10 @@ from pm4py.objects.powl.constants import STRICT_PARTIAL_ORDER_LABEL from pm4py.objects.process_tree.obj import ProcessTree, Operator from typing import List as TList, Optional, Union +from abc import ABC, abstractmethod -class POWL(ProcessTree): +class POWL(ProcessTree, ABC): def __str__(self) -> str: return self.__repr__() @@ -27,65 +28,25 @@ def validate_partial_orders(self): for child in self.children: child.validate_partial_orders() - def validate_partial_orders_with_missing_transitive_edges(self): - if isinstance(self, StrictPartialOrder): - if not self.order.is_irreflexive(): - raise Exception("The irreflexivity of the partial order is violated!") - if not self.order.is_transitive(): - self.order.add_transitive_edges() - if not self.order.is_irreflexive(): - raise Exception("The transitive closure of the provided relation violates irreflexivity!") - if hasattr(self, 'children'): - for child in self.children: - child.validate_partial_orders_with_missing_transitive_edges() - - def validate_unique_transitions(self) -> TList[Union["Transition", "SilentTransition"]]: - def _find_duplicates(lst): - counts = {} - duplicates = [] - for item in lst: - if item in counts: - counts[item] += 1 - if counts[item] == 2: - duplicates.append(item) - else: - counts[item] = 1 - return duplicates - - def _collect_leaves(node: "POWL"): - if isinstance(node, Transition) or isinstance(node, SilentTransition): - return [node] - - elif hasattr(node, 'children'): - leaves = [] - for child in node.children: - leaves = leaves + _collect_leaves(child) - return leaves - else: - raise Exception( - "Unknown model type! The following model is not a transition and has no children: " + str(node)) - - transitions = _collect_leaves(self) - duplicate_transitions = _find_duplicates(transitions) - if len(duplicate_transitions) > 0: - raise Exception("Each of the following transitions occurs in multiple submodels: " - + str([t.label if t.label else "silent transition" for t in duplicate_transitions])) - return transitions - @staticmethod def model_description() -> str: descr = """A partially ordered workflow language (POWL) is a partially ordered graph representation of a process, extended with control-flow operators for modeling choice and loop structures. There are four types of POWL models: -- an activity (identified by its label, i.e., 'M' identifies the activity M). Silent activities (i.e., with empty labels) are also supported. +- an activity (identified by its label, i.e., 'M' identifies the activity M). Silent activities with empty labels (tau labels) are also supported. - a choice of other POWL models (an exclusive choice between the sub-models A and B is identified by X ( A, B ) ) - a loop node between two POWL models (a loop between the sub-models A and B is identified by * ( A, B ) and tells that you execute A, then you either exit the loop or execute B and then A again, this is repeated until you exit the loop). -- a partial order over a set of POWL models. A partial order is a binary relation that is irreflexive, transitive, and asymmetric. A partial order sets an execution order between the sub-models (i.e., the target node cannot be executed before the source node is completed). +- a partial order over a set of POWL models. A partial order is a binary relation that is irreflexive, transitive, and asymmetric. A partial order sets an execution order between the sub-models (i.e., the target node cannot be executed before the source node is completed). Unconnected nodes in a partial order are considered to be concurrent. An example is PO=(nodes={ NODE1, NODE2 }, order={ }) +where NODE1 and NODE2 are independent and can be executed in parallel. Another example is PO=(nodes={ NODE1, NODE2 }, order={ NODE1-->NODE2 }) where NODE2 can only be executed after NODE1 is completed. + +A more advanced example: PO=(nodes={ NODE1, NODE2, NODE3, X ( NODE4, NODE5 ) }, order={ NODE1-->NODE2, NODE1-->X ( NODE4, NODE5 ), NODE2-->X ( NODE4, NODE5 ) }), in this case, NODE2 can be executed only after NODE1 is completed, while the choice between NODE4 and NODE5 needs to wait until both NODE1 and NODE2 are finalized. + -You can specify a POWL model as follows: -PO=(nodes={ NODE1, NODE2, NODE3, X ( NODE4, NODE5 ) }, order={ NODE1-->NODE2, NODE1-->X ( NODE4, NODE5 ), NODE2-->X ( NODE4, NODE5 ) }) -in this case, NODE2 can be executed only after NODE1 is completed, while the choice between NODE4 and NODE5 needs to wait until both NODE1 and NODE2 are finalized. """ return descr + @abstractmethod + def copy(self): + pass + class Transition(POWL): transition_id: int = 0 @@ -96,6 +57,9 @@ def __init__(self, label: Optional[str] = None) -> None: self._identifier = Transition.transition_id Transition.transition_id = Transition.transition_id + 1 + def copy(self): + return Transition(self._label) + def __eq__(self, other: object) -> bool: if isinstance(other, Transition): return self._label == other._label and self._identifier == other._identifier @@ -125,6 +89,9 @@ class SilentTransition(Transition): def __init__(self) -> None: super().__init__(label=None) + def copy(self): + return SilentTransition() + class FrequentTransition(Transition): def __init__(self, label, min_freq: Union[str, int], max_freq: Union[str, int]) -> None: @@ -150,6 +117,16 @@ def __init__(self, nodes: TList[POWL]) -> None: self._set_order(nodes) self.additional_information = None + def copy(self): + copied_nodes = {n:n.copy() for n in self.order.nodes} + res = StrictPartialOrder(list(copied_nodes.values())) + for n1 in self.order.nodes: + for n2 in self.order.nodes: + if self.order.is_edge(n1, n2): + res.add_edge(copied_nodes[n1], copied_nodes[n2]) + return res + + def _set_order(self, nodes: TList[POWL]) -> None: self.order = BinaryRelation(nodes) @@ -310,6 +287,10 @@ def __init__(self, operator: Operator, children: TList[POWL]) -> None: self.operator = operator self.children = children + def copy(self): + copied_nodes = [n.copy() for n in self.children] + return OperatorPOWL(self.operator, copied_nodes) + def __lt__(self, other: object) -> bool: if isinstance(other, OperatorPOWL): return self.__repr__() < other.__repr__()