Skip to content

Commit

Permalink
refactor(pm4py): moving utilities for sojourn/service/waiting/arrival…
Browse files Browse the repository at this point in the history
…/finish
  • Loading branch information
fit-alessandro-berti committed Jan 24, 2023
1 parent d5c9d1c commit bc1f21e
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 95 deletions.
99 changes: 4 additions & 95 deletions pm4py/algo/transformation/log_to_features/variants/temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Union, Optional, Dict, Any
from pm4py.objects.conversion.log import converter as log_converter
from enum import Enum
from pm4py.util import exec_utils, constants, xes_constants
from pm4py.util import exec_utils, constants, xes_constants, pandas_utils


class Parameters(Enum):
Expand All @@ -21,98 +21,6 @@ class Parameters(Enum):
DIFF_START_END = "diff_start_end"


def insert_arrival_finish_rate(log: pd.DataFrame, parameters: Dict[Any, Any]) -> pd.DataFrame:
"""
Inserts the arrival/finish rate in the dataframe for the purpose of computing temporal features.
Parameters
-----------------
log
Pandas dataframe
parameters
Parameters of the method
Returns
-----------------
log
Pandas dataframe enriched by arrival and finish rate
"""
arrival_rate = exec_utils.get_param_value(Parameters.ARRIVAL_RATE, parameters, "@@arrival_rate")
finish_rate = exec_utils.get_param_value(Parameters.FINISH_RATE, parameters, "@@finish_rate")
case_id_column = exec_utils.get_param_value(Parameters.CASE_ID_COLUMN, parameters, constants.CASE_CONCEPT_NAME)
timestamp_column = exec_utils.get_param_value(Parameters.TIMESTAMP_COLUMN, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY)
start_timestamp_column = exec_utils.get_param_value(Parameters.START_TIMESTAMP_COLUMN, parameters, None)
if start_timestamp_column is None:
start_timestamp_column = timestamp_column

case_arrival = log.groupby(case_id_column)[start_timestamp_column].agg(min).to_dict()
case_arrival = [[x, y.timestamp()] for x, y in case_arrival.items()]
case_arrival.sort(key=lambda x: (x[1], x[0]))

case_finish = log.groupby(case_id_column)[timestamp_column].agg(max).to_dict()
case_finish = [[x, y.timestamp()] for x, y in case_finish.items()]
case_finish.sort(key=lambda x: (x[1], x[0]))

i = len(case_arrival) - 1
while i > 0:
case_arrival[i][1] = case_arrival[i][1] - case_arrival[i-1][1]
i = i - 1
case_arrival[0][1] = 0
case_arrival = {x[0]: x[1] for x in case_arrival}

i = len(case_finish) - 1
while i > 0:
case_finish[i][1] = case_finish[i][1] - case_finish[i-1][1]
i = i - 1
case_finish[0][1] = 0
case_finish = {x[0]: x[1] for x in case_finish}

log[arrival_rate] = log[case_id_column].map(case_arrival)
log[finish_rate] = log[case_id_column].map(case_finish)

return log


def insert_service_waiting_time(log: pd.DataFrame, parameters: Dict[Any, Any]) -> pd.DataFrame:
"""
Inserts the service/waiting/sojourn time in the dataframe for the purpose of computing temporal features.
Parameters
----------------
log
Pandas dataframe
parameters
Parameters of the method
Returns
----------------
log
Pandas dataframe with service, waiting and sojourn time
"""
timestamp_column = exec_utils.get_param_value(Parameters.TIMESTAMP_COLUMN, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY)
start_timestamp_column = exec_utils.get_param_value(Parameters.START_TIMESTAMP_COLUMN, parameters, None)
if start_timestamp_column is None:
start_timestamp_column = timestamp_column
case_id_column = exec_utils.get_param_value(Parameters.CASE_ID_COLUMN, parameters, constants.CASE_CONCEPT_NAME)
diff_start_end = exec_utils.get_param_value(Parameters.DIFF_START_END, parameters, "@@diff_start_end")
service_time = exec_utils.get_param_value(Parameters.SERVICE_TIME, parameters, "@@service_time")
waiting_time = exec_utils.get_param_value(Parameters.WAITING_TIME, parameters, "@@waiting_time")
sojourn_time = exec_utils.get_param_value(Parameters.SOJOURN_TIME, parameters, "@@sojourn_time")

log[diff_start_end] = (log[timestamp_column] - log[start_timestamp_column]).astype("timedelta64[ms]")
service_times = log.groupby(case_id_column)[diff_start_end].sum().to_dict()
log[service_time] = log[case_id_column].map(service_times)

start_timestamps = log.groupby(case_id_column)[start_timestamp_column].agg(min).to_dict()
complete_timestamps = log.groupby(case_id_column)[timestamp_column].agg(max).to_dict()
sojourn_time_cases = {x: complete_timestamps[x].timestamp() - start_timestamps[x].timestamp() for x in start_timestamps}

log[sojourn_time] = log[case_id_column].map(sojourn_time_cases)
log[waiting_time] = log[sojourn_time] - log[service_time]

return log


def apply(log: Union[EventLog, EventStream, pd.DataFrame], parameters: Optional[Dict[Any, Any]] = None) -> pd.DataFrame:
"""
Extracts temporal features with the provided granularity from the Pandas dataframe.
Expand Down Expand Up @@ -152,6 +60,7 @@ def apply(log: Union[EventLog, EventStream, pd.DataFrame], parameters: Optional[
if start_timestamp_column is None:
start_timestamp_column = timestamp_column
case_id_column = exec_utils.get_param_value(Parameters.CASE_ID_COLUMN, parameters, constants.CASE_CONCEPT_NAME)
diff_start_end = exec_utils.get_param_value(Parameters.DIFF_START_END, parameters, "@@diff_start_end")
arrival_rate = exec_utils.get_param_value(Parameters.ARRIVAL_RATE, parameters, "@@arrival_rate")
finish_rate = exec_utils.get_param_value(Parameters.FINISH_RATE, parameters, "@@finish_rate")
service_time = exec_utils.get_param_value(Parameters.SERVICE_TIME, parameters, "@@service_time")
Expand All @@ -161,8 +70,8 @@ def apply(log: Union[EventLog, EventStream, pd.DataFrame], parameters: Optional[
activity_column = exec_utils.get_param_value(Parameters.ACTIVITY_COLUMN, parameters, xes_constants.DEFAULT_NAME_KEY)

log = log_converter.apply(log, variant=log_converter.Variants.TO_DATA_FRAME, parameters=parameters)
log = insert_arrival_finish_rate(log, parameters=parameters)
log = insert_service_waiting_time(log, parameters=parameters)
log = pandas_utils.insert_feature_arrival_finish_rate(log, case_id_column=case_id_column, timestamp_column=timestamp_column, arrival_rate_column=arrival_rate, finish_rate_column=finish_rate)
log = pandas_utils.insert_feature_service_waiting_time(log, case_id_column=case_id_column, timestamp_column=timestamp_column, diff_start_end_column=diff_start_end, service_time_column=service_time, sojourn_time_column=sojourn_time, waiting_time_column=waiting_time)

grouped_log = log.groupby(pd.Grouper(key=start_timestamp_column, freq=grouper_freq))

Expand Down
78 changes: 78 additions & 0 deletions pm4py/util/pandas_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,84 @@ def insert_feature_activity_position_in_trace(df: pd.DataFrame, case_id: str = c
return df


def insert_feature_arrival_finish_rate(log: pd.DataFrame, case_id_column=constants.CASE_CONCEPT_NAME, timestamp_column=xes_constants.DEFAULT_TIMESTAMP_KEY, start_timestamp_column=None, arrival_rate_column="@@arrival_rate", finish_rate_column="@@finish_rate") -> pd.DataFrame:
"""
Inserts the arrival/finish rate in the dataframe.
Parameters
-----------------
log
Pandas dataframe
Returns
-----------------
log
Pandas dataframe enriched by arrival and finish rate
"""
if start_timestamp_column is None:
start_timestamp_column = timestamp_column

case_arrival = log.groupby(case_id_column)[start_timestamp_column].agg(min).to_dict()
case_arrival = [[x, y.timestamp()] for x, y in case_arrival.items()]
case_arrival.sort(key=lambda x: (x[1], x[0]))

case_finish = log.groupby(case_id_column)[timestamp_column].agg(max).to_dict()
case_finish = [[x, y.timestamp()] for x, y in case_finish.items()]
case_finish.sort(key=lambda x: (x[1], x[0]))

i = len(case_arrival) - 1
while i > 0:
case_arrival[i][1] = case_arrival[i][1] - case_arrival[i-1][1]
i = i - 1
case_arrival[0][1] = 0
case_arrival = {x[0]: x[1] for x in case_arrival}

i = len(case_finish) - 1
while i > 0:
case_finish[i][1] = case_finish[i][1] - case_finish[i-1][1]
i = i - 1
case_finish[0][1] = 0
case_finish = {x[0]: x[1] for x in case_finish}

log[arrival_rate_column] = log[case_id_column].map(case_arrival)
log[finish_rate_column] = log[case_id_column].map(case_finish)

return log


def insert_feature_service_waiting_time(log: pd.DataFrame, case_id_column=constants.CASE_CONCEPT_NAME, timestamp_column=xes_constants.DEFAULT_TIMESTAMP_KEY, start_timestamp_column=None, diff_start_end_column="@@diff_start_end", service_time_column="@@service_time", sojourn_time_column="@@sojourn_time", waiting_time_column="@@waiting_time") -> pd.DataFrame:
"""
Inserts the service/waiting/sojourn time in the dataframe.
Parameters
----------------
log
Pandas dataframe
parameters
Parameters of the method
Returns
----------------
log
Pandas dataframe with service, waiting and sojourn time
"""
if start_timestamp_column is None:
start_timestamp_column = timestamp_column

log[diff_start_end_column] = (log[timestamp_column] - log[start_timestamp_column]).astype("timedelta64[ms]")
service_times = log.groupby(case_id_column)[diff_start_end_column].sum().to_dict()
log[service_time_column] = log[case_id_column].map(service_times)

start_timestamps = log.groupby(case_id_column)[start_timestamp_column].agg(min).to_dict()
complete_timestamps = log.groupby(case_id_column)[timestamp_column].agg(max).to_dict()
sojourn_time_cases = {x: complete_timestamps[x].timestamp() - start_timestamps[x].timestamp() for x in start_timestamps}

log[sojourn_time_column] = log[case_id_column].map(sojourn_time_cases)
log[waiting_time_column] = log[sojourn_time_column] - log[service_time_column]

return log


def check_is_pandas_dataframe(log):
"""
Checks if a log object is a dataframe
Expand Down

0 comments on commit bc1f21e

Please sign in to comment.