Skip to content

Commit

Permalink
feat(pm4py): performance improvement batch detection on Pandas datafr…
Browse files Browse the repository at this point in the history
…ames
  • Loading branch information
fit-alessandro-berti committed Aug 17, 2022
1 parent 78d9bf0 commit 1cbd37a
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 21 deletions.
15 changes: 7 additions & 8 deletions pm4py/algo/discovery/batches/utils/detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from typing import Tuple, List, Any, Set, Optional, Dict, Union

from pm4py.util import exec_utils
import heapq
from copy import copy


class Parameters(Enum):
Expand Down Expand Up @@ -62,18 +64,15 @@ def __merge_near_intervals(intervals: List[Tuple[float, float, Set[Any]]], max_a
# decide to merge interval i and i+1
new_interval = (min(intervals[i][0], intervals[i + 1][0]), max(intervals[i][1], intervals[i + 1][1]),
intervals[i][2].union(intervals[i + 1][2]))
# add the new interval to the list
intervals.append(new_interval)
# remove the i+1 interval
del intervals[i + 1]
# remove the i interval
del intervals[i]
# sort the intervals
intervals.sort()
# add the new interval to the list
heapq.heappush(intervals, new_interval)
# set the variable continue_cycle to True
continue_cycle = True
# interrupt the current iteration on the intervals
break
i = i - 1
i = i + 1
return intervals

Expand Down Expand Up @@ -136,9 +135,9 @@ def __detect_single(events: List[Tuple[float, float, str]], parameters: Optional
merge_distance = exec_utils.get_param_value(Parameters.MERGE_DISTANCE, parameters, 15 * 60)
min_batch_size = exec_utils.get_param_value(Parameters.MIN_BATCH_SIZE, parameters, 2)

intervals = [(e[0], e[1], {(e[0], e[1], e[2])}) for e in
intervals = [(e[0], e[1], {copy(e)}) for e in
events]
intervals.sort()
heapq.heapify(intervals)
intervals = __merge_overlapping_intervals(intervals)
intervals = __merge_near_intervals(intervals, merge_distance)
batches = [x for x in intervals if len(x[2]) >= min_batch_size]
Expand Down
44 changes: 31 additions & 13 deletions pm4py/algo/discovery/batches/variants/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from pm4py.algo.discovery.batches.utils import detection
from pm4py.util import exec_utils, constants, xes_constants
import numpy as np


class Parameters(Enum):
Expand All @@ -13,6 +14,7 @@ class Parameters(Enum):
START_TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_START_TIMESTAMP_KEY
TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY
CASE_ID_KEY = constants.PARAMETER_CONSTANT_CASEID_KEY
EVENT_ID_KEY = "event_id_key"
MERGE_DISTANCE = "merge_distance"
MIN_BATCH_SIZE = "min_batch_size"

Expand Down Expand Up @@ -68,22 +70,38 @@ def apply(log: pd.DataFrame, parameters: Optional[Dict[Union[str, Parameters], A
start_timestamp_key = exec_utils.get_param_value(Parameters.START_TIMESTAMP_KEY, parameters,
timestamp_key)
case_id_key = exec_utils.get_param_value(Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME)
event_id_key = exec_utils.get_param_value(Parameters.EVENT_ID_KEY, parameters, constants.DEFAULT_INDEX_KEY)

log = log[list({activity_key, resource_key, start_timestamp_key, timestamp_key, case_id_key})]
events = log.to_dict('records')
attributes_to_consider = {activity_key, resource_key, start_timestamp_key, timestamp_key, case_id_key}
log_contains_evidkey = event_id_key in log
if log_contains_evidkey:
attributes_to_consider.add(event_id_key)

actres_grouping = {}

for ev in events:
case = ev[case_id_key]
activity = ev[activity_key]
resource = ev[resource_key]
st = ev[start_timestamp_key].timestamp()
et = ev[timestamp_key].timestamp()
log = log[list(attributes_to_consider)]
log[timestamp_key] = log[timestamp_key].values.astype(np.int64) // 10**9
if start_timestamp_key != timestamp_key:
log[start_timestamp_key] = log[start_timestamp_key].values.astype(np.int64) // 10**9

if (activity, resource) not in actres_grouping:
actres_grouping[(activity, resource)] = []
actres_grouping0 = log.groupby([activity_key, resource_key]).agg(list).to_dict()
start_timestamps = actres_grouping0[start_timestamp_key]
complete_timestamps = actres_grouping0[timestamp_key]
cases = actres_grouping0[case_id_key]
if log_contains_evidkey:
events_ids = actres_grouping0[event_id_key]

actres_grouping[(activity, resource)].append((st, et, case))
actres_grouping = {}
for k in start_timestamps:
st = start_timestamps[k]
et = complete_timestamps[k]
c = cases[k]
if log_contains_evidkey:
eid = events_ids[k]
actres_grouping_k = []
for i in range(len(st)):
if log_contains_evidkey:
actres_grouping_k.append((st[i], et[i], c[i], eid[i]))
else:
actres_grouping_k.append((st[i], et[i], c[i]))
actres_grouping[k] = actres_grouping_k

return detection.detect(actres_grouping, parameters=parameters)

0 comments on commit 1cbd37a

Please sign in to comment.