Skip to content

Commit

Permalink
Merge branch 'ft-1472-ocel-streaming-pumping-all-flatts' into 'integr…
Browse files Browse the repository at this point in the history
…ation'

FT 1472 OCEL streaming - pump the event to all flattened cases

See merge request process-mining/pm4py/pm4py-core!558
  • Loading branch information
fit-sebastiaan-van-zelst committed Dec 14, 2021
2 parents d5537eb + 68ed865 commit 146f49c
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 0 deletions.
39 changes: 39 additions & 0 deletions examples/ocel_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import pm4py
import os
from pm4py.streaming.stream import live_event_stream
from pm4py.streaming.util import event_stream_printer
from pm4py.streaming.conversion import ocel_flatts_distributor
from pm4py.objects.ocel.util import ocel_iterator


def execute_script():
ocel = pm4py.read_ocel(os.path.join("..", "tests", "input_data", "ocel", "example_log.jsonocel"))
# we wants to use the traditional algorithms for streaming also on object-centric event logs.
# for this purpose, first we create two different event streams, one for the "order" object type
# and one for the "element" object type.
order_stream = live_event_stream.LiveEventStream()
element_stream = live_event_stream.LiveEventStream()
# Then, we register an algorithm for every one of them, which is a simple printer of the received events.
order_stream_printer = event_stream_printer.EventStreamPrinter()
element_stream_printer = event_stream_printer.EventStreamPrinter()
order_stream.register(order_stream_printer)
element_stream.register(element_stream_printer)
# Then, we create the distributor object.
# This registers different event streams for different object types.
flatts_distributor = ocel_flatts_distributor.OcelFlattsDistributor()
flatts_distributor.register("order", order_stream)
flatts_distributor.register("element", element_stream)
order_stream.start()
element_stream.start()
# in this way, we iterate over the events of an OCEL
for ev in ocel_iterator.apply(ocel):
# ... and the OCEL event is sent to all the "flattened" event streams.
flatts_distributor.append(ev)
# since the "flattened" event streams register a printer each, what we get is a print
# of all the events that reach these instances.
order_stream.stop()
element_stream.stop()


if __name__ == "__main__":
execute_script()
39 changes: 39 additions & 0 deletions pm4py/objects/ocel/util/ocel_iterator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from pm4py.objects.ocel.obj import OCEL
from typing import Optional, Dict, Any
from enum import Enum
from pm4py.util import exec_utils
from pm4py.objects.ocel import constants as ocel_constants
import pandas as pd


class Parameters(Enum):
OCEL_TYPE_PREFIX = ocel_constants.PARAM_OBJECT_TYPE_PREFIX_EXTENDED


def apply(ocel: OCEL, parameters: Optional[Dict[Any, Any]] = None):
"""
Creates an iterator over the events of an object-centric event log
Parameters
----------------
ocel
OCEL
parameters
Parameters of the method, including:
- Parameters.OCEL_TYPE_PREFIX => the prefix of the object types in the OCEL (default: ocel:type)
Returns
----------------
yielded event
The events of the OCEL, one by one.
"""
if parameters is None:
parameters = {}

ot_prefix = exec_utils.get_param_value(Parameters.OCEL_TYPE_PREFIX, parameters,
ocel_constants.DEFAULT_OBJECT_TYPE_PREFIX_EXTENDED)

ext_table = ocel.get_extended_table(ot_prefix)

for k, ev in ext_table.iterrows():
yield {x: y for x, y in dict(ev).items() if isinstance(y, list) or not pd.isna(y)}
90 changes: 90 additions & 0 deletions pm4py/streaming/conversion/ocel_flatts_distributor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from typing import Optional, Dict, Any
from enum import Enum
from pm4py.util import exec_utils, constants, xes_constants
from pm4py.objects.ocel import constants as ocel_constants
from copy import copy
from pm4py.streaming.stream.live_event_stream import LiveEventStream


class Parameters(Enum):
ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY
CASE_ID_KEY = constants.PARAMETER_CONSTANT_CASEID_KEY
TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY
OCEL_ACTIVITY_KEY = ocel_constants.PARAM_EVENT_ACTIVITY
OCEL_TIMESTAMP_KEY = ocel_constants.PARAM_EVENT_TIMESTAMP
OCEL_TYPE_PREFIX = ocel_constants.PARAM_OBJECT_TYPE_PREFIX_EXTENDED


class OcelFlattsDistributor(object):
def __init__(self, parameters: Optional[Dict[Any, Any]] = None):
"""
Instantiate the object, "distributing" an OCEL event among all
the event streams for the "flattened" events.
Parameters
-----------------
parameters
Parameters of the algorithm, including:
- Parameters.ACTIVITY_KEY => the activity key to use in the flattening
- Parameters.TIMESTAMP_KEY => the timestamp key to use in the flattening
- Parameters.OCEL_ACTIVITY_KEY => the attribute in the OCEL event that is the activity (default: ocel:activity)
- Parameters.OCEL_TIMESTAMP_KEY => the attribute in the OCEL event that is the timestamp (default: ocel:timestamp)
- Parameters.OCEL_TYPE_PREFIX => the prefix of the object types in the OCEL (default: ocel:type)
"""
if parameters is None:
parameters = {}

self.activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters,
xes_constants.DEFAULT_NAME_KEY)
self.case_id_key = exec_utils.get_param_value(Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME)
self.timestamp_key = exec_utils.get_param_value(Parameters.TIMESTAMP_KEY, parameters,
xes_constants.DEFAULT_TIMESTAMP_KEY)
self.ocel_activity = exec_utils.get_param_value(Parameters.OCEL_ACTIVITY_KEY, parameters,
ocel_constants.DEFAULT_EVENT_ACTIVITY)
self.ocel_timestamp = exec_utils.get_param_value(Parameters.OCEL_TIMESTAMP_KEY, parameters,
ocel_constants.DEFAULT_EVENT_TIMESTAMP)

self.ot_prefix = exec_utils.get_param_value(Parameters.OCEL_TYPE_PREFIX, parameters,
ocel_constants.DEFAULT_OBJECT_TYPE_PREFIX_EXTENDED)
self.flattened_stream_listeners = {}

def register(self, object_type: str, live_event_stream: LiveEventStream):
"""
Register a new event stream (listener) for a given object type.
Parameters
-----------------
object_type
Given object type
live_event_stream
Live event stream
"""
if object_type not in self.flattened_stream_listeners:
self.flattened_stream_listeners[object_type] = []
self.flattened_stream_listeners[object_type].append(live_event_stream)

def append(self, event: Dict[str, Any]):
"""
Flattens an OCEL among all the available object types, and send its flattening to each
corresponding event stream.
Parameters
-------------
event
OCEL event (obtained for example using the ocel_iterator)
"""
base_event = {x: y for x, y in event.items() if not x.startswith(self.ot_prefix)}
base_event[self.activity_key] = base_event[self.ocel_activity]
base_event[self.timestamp_key] = base_event[self.ocel_timestamp]
del base_event[self.ocel_activity]
del base_event[self.ocel_timestamp]

ev_objects = {x.split(self.ot_prefix)[1]: y for x, y in event.items() if x.startswith(self.ot_prefix)}

for ot in ev_objects:
if ot in self.flattened_stream_listeners:
for obj in ev_objects[ot]:
fl_ev = copy(base_event)
fl_ev[self.case_id_key] = obj
for listener in self.flattened_stream_listeners[ot]:
listener.append(fl_ev)

0 comments on commit 146f49c

Please sign in to comment.