Skip to content

Commit

Permalink
Merge branch 'PM4PY-1550' into 'integration'
Browse files Browse the repository at this point in the history
PM4PY-1550 Additional features (useful for instance-spanning constraints) in trace-based feature extraction

See merge request process-mining/pm4py/pm4py-core!594
  • Loading branch information
fit-sebastiaan-van-zelst committed Jan 25, 2022
2 parents 5b5c048 + 84ee2e6 commit 877701f
Showing 1 changed file with 124 additions and 0 deletions.
124 changes: 124 additions & 0 deletions pm4py/algo/transformation/log_to_features/variants/trace_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,114 @@ class Parameters(Enum):
ENABLE_WORK_IN_PROGRESS = "enable_work_in_progress"
ENABLE_RESOURCE_WORKLOAD = "enable_resource_workload"
ENABLE_FIRST_LAST_ACTIVITY_INDEX = "enable_first_last_activity_index"
ENABLE_MAX_CONCURRENT_EVENTS = "enable_max_concurrent_events"
ENABLE_MAX_CONCURRENT_EVENTS_PER_ACTIVITY = "enable_max_concurrent_events_per_activity"


def max_concurrent_events(log: EventLog, parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> Tuple[Any, List[str]]:
"""
Counts for every trace the maximum number of events (of any activity) that happen concurrently
(e.g., their time intervals [st1, ct1] and [st2, ct2] have non-empty intersection).
Parameters
-----------------
log
Event log
parameters
Parameters of the algorithm
Returns
----------------
data
Numeric value of the features
feature_names
Names of the features
"""
if parameters is None:
parameters = {}

start_timestamp_key = exec_utils.get_param_value(Parameters.START_TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY)
timestamp_key = exec_utils.get_param_value(Parameters.TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY)

data = []
feature_names = ["@@max_concurrent_activities_general"]

for trace in log:
max_conc = 0
i = 0
while i < len(trace)-1:
conc = 0
ct = trace[i][timestamp_key].timestamp()
j = i + 1
while j < len(trace):
st = trace[j][start_timestamp_key].timestamp()
if st > ct:
break
conc = conc + 1
j = j + 1
if conc > max_conc:
max_conc = conc
i = i + 1
data.append([max_conc])

return data, feature_names


def max_concurrent_events_per_activity(log: EventLog, parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> Tuple[Any, List[str]]:
"""
Counts for every trace and every activity the maximum number of events of the given activity that happen concurrently
(e.g., their time intervals [st1, ct1] and [st2, ct2] have non-empty intersection).
Parameters
-----------------
log
Event log
parameters
Parameters of the algorithm
Returns
----------------
data
Numeric value of the features
feature_names
Names of the features
"""
if parameters is None:
parameters = {}

start_timestamp_key = exec_utils.get_param_value(Parameters.START_TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY)
timestamp_key = exec_utils.get_param_value(Parameters.TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY)
activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY)

activities = list(set(y[activity_key] for x in log for y in x))

data = []
feature_names = ["@@max_concurrent_activities_like_"+x for x in activities]

for trace in log:
max_conc_act = {act: 0 for act in activities}
i = 0
while i < len(trace)-1:
conc = 0
act = trace[i][activity_key]
ct = trace[i][timestamp_key].timestamp()
j = i + 1
while j < len(trace):
st = trace[j][start_timestamp_key].timestamp()
actj = trace[j][activity_key]
if st > ct:
break
if act == actj:
conc = conc + 1
j = j + 1
i = i + 1
max_conc_act[act] = conc
arr = []
for act in activities:
arr.append(max_conc_act[act])
data.append(arr)

return data, feature_names


def resource_workload(log: EventLog, parameters: Optional[Dict[Union[str, Parameters], Any]] = None) -> Tuple[Any, List[str]]:
Expand Down Expand Up @@ -997,6 +1105,8 @@ def apply(log: EventLog, parameters: Optional[Dict[Union[str, Parameters], Any]]
- ENABLE_WORK_IN_PROGRESS => enables the work in progress (number of concurrent cases) as a feature
- ENABLE_RESOURCE_WORKLOAD => enables the resource workload as a feature
- ENABLE_FIRST_LAST_ACTIVITY_INDEX => enables the insertion of the indexes of the activities as features
- ENABLE_MAX_CONCURRENT_EVENTS => enables the count of the number of concurrent events inside a case
- ENABLE_MAX_CONCURRENT_EVENTS_PER_ACTIVITY => enables the count of the number of concurrent events per activity
Returns
-------------
Expand Down Expand Up @@ -1055,6 +1165,8 @@ def apply(log: EventLog, parameters: Optional[Dict[Union[str, Parameters], Any]]
enable_work_in_progress = exec_utils.get_param_value(Parameters.ENABLE_WORK_IN_PROGRESS, parameters, enable_all)
enable_resource_workload = exec_utils.get_param_value(Parameters.ENABLE_RESOURCE_WORKLOAD, parameters, enable_all)
enable_first_last_activity_index = exec_utils.get_param_value(Parameters.ENABLE_FIRST_LAST_ACTIVITY_INDEX, parameters, enable_all)
enable_max_concurrent_events = exec_utils.get_param_value(Parameters.ENABLE_MAX_CONCURRENT_EVENTS, parameters, enable_all)
enable_max_concurrent_events_per_activity = exec_utils.get_param_value(Parameters.ENABLE_MAX_CONCURRENT_EVENTS_PER_ACTIVITY, parameters, enable_all)

log = converter.apply(log, parameters=parameters)
if at_least_one_provided:
Expand Down Expand Up @@ -1118,4 +1230,16 @@ def apply(log: EventLog, parameters: Optional[Dict[Union[str, Parameters], Any]]
datas[i] = datas[i] + data[i]
features_namess = features_namess + features_names

if enable_max_concurrent_events:
data, features_names = max_concurrent_events(log, parameters=parameters)
for i in range(len(datas)):
datas[i] = datas[i] + data[i]
features_namess = features_namess + features_names

if enable_max_concurrent_events_per_activity:
data, features_names = max_concurrent_events_per_activity(log, parameters=parameters)
for i in range(len(datas)):
datas[i] = datas[i] + data[i]
features_namess = features_namess + features_names

return datas, features_namess

0 comments on commit 877701f

Please sign in to comment.