Skip to content

Commit

Permalink
Merge branch 'ft-1170-semantics-option-petri-playout' into 'integration'
Browse files Browse the repository at this point in the history
Semantics option for Petri playout

See merge request pm4py/pm4py-core!458
  • Loading branch information
fit-sebastiaan-van-zelst committed Sep 3, 2021
2 parents 3b63254 + 1d9fb66 commit c03b618
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 185 deletions.
23 changes: 14 additions & 9 deletions pm4py/algo/simulation/playout/petri_net/variants/basic_playout.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,16 @@
from copy import copy
from enum import Enum
from random import choice
from typing import Optional, Dict, Any, Union

from pm4py.objects import petri_net
from pm4py.objects.log import obj as log_instance
from pm4py.objects.petri_net import semantics
from pm4py.objects.petri_net.obj import PetriNet
from pm4py.objects.log.obj import EventLog
from pm4py.objects.petri_net.obj import PetriNet, Marking
from pm4py.util import constants
from pm4py.util import exec_utils
from pm4py.util import xes_constants

from pm4py.objects.petri_net.obj import PetriNet, Marking
from typing import Optional, Dict, Any, Union, Tuple
from pm4py.objects.log.obj import EventLog, EventStream


class Parameters(Enum):
ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY
Expand All @@ -22,12 +20,13 @@ class Parameters(Enum):
RETURN_VISITED_ELEMENTS = "return_visited_elements"
NO_TRACES = "noTraces"
MAX_TRACE_LENGTH = "maxTraceLength"
PETRI_SEMANTICS = "petri_semantics"


def apply_playout(net, initial_marking, no_traces=100, max_trace_length=100,
case_id_key=xes_constants.DEFAULT_TRACEID_KEY,
activity_key=xes_constants.DEFAULT_NAME_KEY, timestamp_key=xes_constants.DEFAULT_TIMESTAMP_KEY,
final_marking=None, return_visited_elements=False):
final_marking=None, return_visited_elements=False, semantics=petri_net.semantics.ClassicSemantics()):
"""
Do the playout of a Petrinet generating a log
Expand All @@ -49,6 +48,8 @@ def apply_playout(net, initial_marking, no_traces=100, max_trace_length=100,
Event attribute that corresponds to the timestamp
final_marking
If provided, the final marking of the Petri net
semantics
Semantics of the Petri net to be used (default: petri_net.semantics.ClassicSemantics())
"""
# assigns to each event an increased timestamp from 1970
curr_timestamp = 10000000
Expand Down Expand Up @@ -101,7 +102,8 @@ def apply_playout(net, initial_marking, no_traces=100, max_trace_length=100,
return log


def apply(net: PetriNet, initial_marking: Marking, final_marking: Marking = None, parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> EventLog:
def apply(net: PetriNet, initial_marking: Marking, final_marking: Marking = None,
parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> EventLog:
"""
Do the playout of a Petrinet generating a log
Expand All @@ -117,6 +119,7 @@ def apply(net: PetriNet, initial_marking: Marking, final_marking: Marking = None
Parameters of the algorithm:
Parameters.NO_TRACES -> Number of traces of the log to generate
Parameters.MAX_TRACE_LENGTH -> Maximum trace length
Parameters.PETRI_SEMANTICS -> Petri net semantics to be used (default: petri_nets.semantics.ClassicSemantics())
"""
if parameters is None:
parameters = {}
Expand All @@ -127,7 +130,9 @@ def apply(net: PetriNet, initial_marking: Marking, final_marking: Marking = None
no_traces = exec_utils.get_param_value(Parameters.NO_TRACES, parameters, 1000)
max_trace_length = exec_utils.get_param_value(Parameters.MAX_TRACE_LENGTH, parameters, 1000)
return_visited_elements = exec_utils.get_param_value(Parameters.RETURN_VISITED_ELEMENTS, parameters, False)
semantics = exec_utils.get_param_value(Parameters.PETRI_SEMANTICS, parameters, petri_net.semantics.ClassicSemantics())

return apply_playout(net, initial_marking, max_trace_length=max_trace_length, no_traces=no_traces,
case_id_key=case_id_key, activity_key=activity_key, timestamp_key=timestamp_key,
final_marking=final_marking, return_visited_elements=return_visited_elements)
final_marking=final_marking, return_visited_elements=return_visited_elements,
semantics=semantics)
20 changes: 11 additions & 9 deletions pm4py/algo/simulation/playout/petri_net/variants/extensive.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
import datetime
import sys
from collections import Counter
from enum import Enum
from typing import Optional, Dict, Any, Union

from pm4py.objects import petri_net
from pm4py.objects.log import obj as log_instance
from pm4py.objects.petri_net import semantics
from pm4py.objects.petri_net.obj import PetriNet
from pm4py.objects.log.obj import EventLog
from pm4py.objects.petri_net.obj import PetriNet, Marking
from pm4py.util import constants
from pm4py.util import exec_utils
from pm4py.util import xes_constants
import sys

from pm4py.objects.petri_net.obj import PetriNet, Marking
from typing import Optional, Dict, Any, Union, Tuple
from pm4py.objects.log.obj import EventLog, EventStream


class Parameters(Enum):
Expand All @@ -22,14 +20,16 @@ class Parameters(Enum):
MAX_TRACE_LENGTH = "maxTraceLength"
RETURN_ELEMENTS = "return_elements"
MAX_MARKING_OCC = "max_marking_occ"
PETRI_SEMANTICS = "petri_semantics"


POSITION_MARKING = 0
POSITION_TRACE = 1
POSITION_ELEMENTS = 2


def apply(net: PetriNet, initial_marking: Marking, final_marking: Marking = None, parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> EventLog:
def apply(net: PetriNet, initial_marking: Marking, final_marking: Marking = None,
parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> EventLog:
"""
Do the playout of a Petrinet generating a log (extensive search; stop at the maximum
trace length specified
Expand All @@ -45,6 +45,7 @@ def apply(net: PetriNet, initial_marking: Marking, final_marking: Marking = None
parameters
Parameters of the algorithm:
Parameters.MAX_TRACE_LENGTH -> Maximum trace length
Parameters.PETRI_SEMANTICS -> Petri net semantics
"""
if parameters is None:
parameters = {}
Expand All @@ -56,6 +57,7 @@ def apply(net: PetriNet, initial_marking: Marking, final_marking: Marking = None
max_trace_length = exec_utils.get_param_value(Parameters.MAX_TRACE_LENGTH, parameters, 10)
return_elements = exec_utils.get_param_value(Parameters.RETURN_ELEMENTS, parameters, False)
max_marking_occ = exec_utils.get_param_value(Parameters.MAX_MARKING_OCC, parameters, sys.maxsize)
semantics = exec_utils.get_param_value(Parameters.PETRI_SEMANTICS, parameters, petri_net.semantics.ClassicSemantics())

# assigns to each event an increased timestamp from 1970
curr_timestamp = 10000000
Expand Down Expand Up @@ -91,7 +93,7 @@ def apply(net: PetriNet, initial_marking: Marking, final_marking: Marking = None
if counter_elements[m] > max_marking_occ:
continue

new_m = semantics.weak_execute(t, m)
new_m = semantics.weak_execute(t, net, m)
if t.label is not None:
new_trace = trace + (t.label,)
else:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
import datetime
from copy import copy
from enum import Enum
from typing import Optional, Dict, Any, Union

from pm4py.algo.simulation.montecarlo.utils import replay
from pm4py.objects import petri_net
from pm4py.objects.log import obj as log_instance
from pm4py.objects.petri_net import semantics
from pm4py.objects.log.obj import EventLog
from pm4py.objects.petri_net.obj import PetriNet, Marking
from pm4py.objects.petri_net.utils import final_marking as final_marking_discovery
from pm4py.objects.petri_net.obj import PetriNet
from pm4py.objects.stochastic_petri import utils as stochastic_utils
from pm4py.algo.simulation.montecarlo.utils import replay
from pm4py.util import constants
from pm4py.util import exec_utils
from pm4py.util import xes_constants

from pm4py.objects.petri_net.obj import PetriNet, Marking
from typing import Optional, Dict, Any, Union, Tuple
from pm4py.objects.log.obj import EventLog, EventStream


class Parameters(Enum):
ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY
Expand All @@ -26,12 +24,14 @@ class Parameters(Enum):
MAX_TRACE_LENGTH = "maxTraceLength"
LOG = "log"
STOCHASTIC_MAP = "stochastic_map"
PETRI_SEMANTICS = "petri_semantics"


def apply_playout(net, initial_marking, no_traces=100, max_trace_length=100,
case_id_key=xes_constants.DEFAULT_TRACEID_KEY,
activity_key=xes_constants.DEFAULT_NAME_KEY, timestamp_key=xes_constants.DEFAULT_TIMESTAMP_KEY,
final_marking=None, smap=None, log=None, return_visited_elements=False):
final_marking=None, smap=None, log=None, return_visited_elements=False,
semantics=petri_net.semantics.ClassicSemantics()):
"""
Do the playout of a Petrinet generating a log
Expand All @@ -57,6 +57,8 @@ def apply_playout(net, initial_marking, no_traces=100, max_trace_length=100,
Stochastic map
log
Log
semantics
Semantics of the Petri net to be used (default: petri_net.semantics.ClassicSemantics())
"""
if final_marking is None:
# infer the final marking from the net
Expand Down Expand Up @@ -121,7 +123,8 @@ def apply_playout(net, initial_marking, no_traces=100, max_trace_length=100,
return log


def apply(net: PetriNet, initial_marking: Marking, final_marking: Marking = None, parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> EventLog:
def apply(net: PetriNet, initial_marking: Marking, final_marking: Marking = None,
parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> EventLog:
"""
Do the playout of a Petrinet generating a log
Expand All @@ -137,6 +140,7 @@ def apply(net: PetriNet, initial_marking: Marking, final_marking: Marking = None
Parameters of the algorithm:
Parameters.NO_TRACES -> Number of traces of the log to generate
Parameters.MAX_TRACE_LENGTH -> Maximum trace length
Parameters.PETRI_SEMANTICS -> Petri net semantics to be used (default: petri_nets.semantics.ClassicSemantics())
"""
if parameters is None:
parameters = {}
Expand All @@ -149,7 +153,10 @@ def apply(net: PetriNet, initial_marking: Marking, final_marking: Marking = None
smap = exec_utils.get_param_value(Parameters.STOCHASTIC_MAP, parameters, None)
log = exec_utils.get_param_value(Parameters.LOG, parameters, None)
return_visited_elements = exec_utils.get_param_value(Parameters.RETURN_VISITED_ELEMENTS, parameters, False)
semantics = exec_utils.get_param_value(Parameters.PETRI_SEMANTICS, parameters, petri_net.semantics.ClassicSemantics())

return apply_playout(net, initial_marking, max_trace_length=max_trace_length, no_traces=no_traces,
case_id_key=case_id_key, activity_key=activity_key, timestamp_key=timestamp_key,
final_marking=final_marking, smap=smap, log=log, return_visited_elements=return_visited_elements)
final_marking=final_marking, smap=smap, log=log,
return_visited_elements=return_visited_elements,
semantics=semantics)
135 changes: 93 additions & 42 deletions pm4py/objects/petri_net/data_petri_nets/semantics.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,80 @@
import copy
from pm4py.objects.petri_net import properties as petri_properties
from pm4py.objects.petri_net.sem_interface import Semantics


class DataPetriNetSemantics(Semantics):
def is_enabled(self, t, pn, m, **kwargs):
"""
Verifies whether a given transition is enabled in a given Petri net and marking
Parameters
----------
:param t: transition to check
:param pn: Petri net
:param m: marking to check
:param e: associated event (optional, as keyword argument)
Returns
-------
:return: true if enabled, false otherwise
"""
e = kwargs["e"] if "e" in kwargs else {}
return is_enabled(t, pn, m, e)

def execute(self, t, pn, m, **kwargs):
"""
Executes a given transition in a given Petri net, the given data marking and the associated event
Parameters
----------
:param t: transition to execute
:param pn: Petri net
:param m: marking to use
:param e: associated event (optional, as keyword argument)
Returns
-------
:return: newly reached marking if :param t: is enabled, None otherwise
"""
e = kwargs["e"] if "e" in kwargs else {}
return execute(t, pn, m, e)

def weak_execute(self, t, pn, m, **kwargs):
"""
Executes a given transition in a given Petri net, the given data marking and the associated event,
even if not fully enabled
Parameters
----------
:param t: transition to execute
:param pn: Petri net
:param m: marking to use
:param e: associated event (optional, as keyword argument)
Returns
-------
:return: newly reached marking
"""
e = kwargs["e"] if "e" in kwargs else {}
return weak_execute(t, m, e)

def enabled_transitions(self, pn, m, **kwargs):
"""
Returns a set of enabled transitions in a Petri net, the given data marking and the associated event
Parameters
----------
:param pn: Petri net
:param m: marking of the pn
:param e: associated event (optional, as keyword argument)
Returns
-------
:return: set of enabled transitions
"""
e = kwargs["e"] if "e" in kwargs else {}
return enabled_transitions(pn, m, e)


def evaluate_guard(guard, read_variables, data):
Expand Down Expand Up @@ -30,22 +105,12 @@ def evaluate_guard(guard, read_variables, data):
return False


def is_enabled(t, pn, m, e):
"""
Verifies whether a given transition is enabled in a given Petri net and marking
Parameters
----------
:param t: transition to check
:param pn: Petri net
:param m: marking to check
:param e: associated event
Returns
-------
:return: true if enabled, false otherwise
"""

# 29/08/2021: the following methods have been incapsulated in the DataPetriNetSemantics class.
# the long term idea is to remove them. However, first we need to adapt the existing code to the new
# structure. Moreover, for performance reason, it is better to leave the code here, without having
# to instantiate a DataPetriNetSemantics object.
def is_enabled(t, pn, m, e):
if t not in pn.transitions:
return False
else:
Expand All @@ -64,21 +129,6 @@ def is_enabled(t, pn, m, e):


def execute(t, pn, m, e):
"""
Executes a given transition in a given Petri net, the given data marking and the associated event
Parameters
----------
:param t: transition to execute
:param pn: Petri net
:param m: marking to use
:param e: associated event
Returns
-------
:return: newly reached marking if :param t: is enabled, None otherwise
"""

if not is_enabled(t, pn, m, e):
return None

Expand All @@ -96,20 +146,21 @@ def execute(t, pn, m, e):
return m_out


def enabled_transitions(pn, m, e):
"""
Returns a set of enabled transitions in a Petri net, the given data marking and the associated event
def weak_execute(t, m, e):
m_out = copy.copy(m)
for a in t.in_arcs:
m_out[a.source] -= a.weight
if m_out[a.source] <= 0:
del m_out[a.source]
for a in t.out_arcs:
m_out[a.target] += a.weight

Parameters
----------
:param pn: Petri net
:param m: marking of the pn
:param e: associated event
m_out.data_dict.update(e)

Returns
-------
:return: set of enabled transitions
"""
return m_out


def enabled_transitions(pn, m, e):
enabled = set()
for t in pn.transitions:
if is_enabled(t, pn, m, e):
Expand Down
Loading

0 comments on commit c03b618

Please sign in to comment.