From d83c8b0391af652a812cc9fd8139d2aa64bd83da Mon Sep 17 00:00:00 2001 From: "alessandro.berti" Date: Fri, 10 Dec 2021 15:38:16 +0000 Subject: [PATCH] FT 1434 Interleavings visualizer --- examples/timestamp_interleavings.py | 7 + .../ocel/interleavings/__init__.py | 1 + .../ocel/interleavings/variants/__init__.py | 1 + .../ocel/interleavings/variants/graphviz.py | 248 ++++++++++++++++++ .../ocel/interleavings/visualizer.py | 80 ++++++ 5 files changed, 337 insertions(+) create mode 100644 pm4py/visualization/ocel/interleavings/__init__.py create mode 100644 pm4py/visualization/ocel/interleavings/variants/__init__.py create mode 100644 pm4py/visualization/ocel/interleavings/variants/graphviz.py create mode 100644 pm4py/visualization/ocel/interleavings/visualizer.py diff --git a/examples/timestamp_interleavings.py b/examples/timestamp_interleavings.py index be872c5a2..b8b760890 100644 --- a/examples/timestamp_interleavings.py +++ b/examples/timestamp_interleavings.py @@ -1,6 +1,7 @@ import pm4py import pandas as pd from pm4py.algo.discovery.ocel.interleavings import algorithm as interleavings_miner +from pm4py.visualization.ocel.interleavings import visualizer as interleavings_visualizer import os @@ -16,6 +17,12 @@ def execute_script(): print(interleavings_dataframe[["@@source_activity", "@@target_activity", "@@direction"]].value_counts()) # print the performance of the interleavings print(interleavings_dataframe.groupby(["@@source_activity", "@@target_activity", "@@direction"])["@@timestamp_diff"].agg("mean")) + # visualizes the frequency of the interleavings + gviz_freq = interleavings_visualizer.apply(receipt_even, receipt_odd, interleavings_dataframe, parameters={"annotation": "frequency", "format": "svg"}) + interleavings_visualizer.view(gviz_freq) + # visualizes the performance of the interleavings + gviz_perf = interleavings_visualizer.apply(receipt_even, receipt_odd, interleavings_dataframe, parameters={"annotation": "performance", "aggregation_measure": "median", "format": "svg"}) + interleavings_visualizer.view(gviz_perf) if __name__ == "__main__": diff --git a/pm4py/visualization/ocel/interleavings/__init__.py b/pm4py/visualization/ocel/interleavings/__init__.py new file mode 100644 index 000000000..26b0a46ea --- /dev/null +++ b/pm4py/visualization/ocel/interleavings/__init__.py @@ -0,0 +1 @@ +from pm4py.visualization.ocel.interleavings import visualizer, variants diff --git a/pm4py/visualization/ocel/interleavings/variants/__init__.py b/pm4py/visualization/ocel/interleavings/variants/__init__.py new file mode 100644 index 000000000..9c870322d --- /dev/null +++ b/pm4py/visualization/ocel/interleavings/variants/__init__.py @@ -0,0 +1 @@ +from pm4py.visualization.ocel.interleavings.variants import graphviz diff --git a/pm4py/visualization/ocel/interleavings/variants/graphviz.py b/pm4py/visualization/ocel/interleavings/variants/graphviz.py new file mode 100644 index 000000000..15abb78b4 --- /dev/null +++ b/pm4py/visualization/ocel/interleavings/variants/graphviz.py @@ -0,0 +1,248 @@ +from graphviz import Digraph +from enum import Enum + +import pm4py +from pm4py.util import exec_utils, constants, xes_constants +from typing import Optional, Dict, Any +import pandas as pd +from uuid import uuid4 +from pm4py.util import vis_utils +import tempfile +from pm4py.algo.filtering.dfg import dfg_filtering + + +def __get_freq_perf_df(dataframe: pd.DataFrame, activity_key: str, aggregation_measure: str, activity_percentage: float, + paths_percentage: float, dependency_threshold: float): + """ + Gets the frequency and performance DFG abstractions from the provided dataframe + (internal usage) + """ + freq_dfg, sa, ea = pm4py.discover_dfg(dataframe) + perf_dfg, sa, ea = pm4py.discover_performance_dfg(dataframe) + act_count = pm4py.get_event_attribute_values(dataframe, activity_key) + + freq_dfg, sa, ea, act_count = dfg_filtering.filter_dfg_on_activities_percentage(freq_dfg, sa, ea, act_count, + activity_percentage) + freq_dfg, sa, ea, act_count = dfg_filtering.filter_dfg_on_paths_percentage(freq_dfg, sa, ea, act_count, + paths_percentage) + freq_dfg, sa, ea, act_count = dfg_filtering.filter_dfg_keep_connected(freq_dfg, sa, ea, act_count, + dependency_threshold) + + perf_dfg = {x: y[aggregation_measure] for x, y in perf_dfg.items() if x in freq_dfg} + + return freq_dfg, perf_dfg, sa, ea, act_count + + +class Parameters(Enum): + FORMAT = "format" + BGCOLOR = "bgcolor" + RANKDIR = "rankdir" + ANNOTATION = "annotation" + AGGREGATION_MEASURE = "aggregation_measure" + ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY + ACTIVITY_PERCENTAGE = "activity_percentage" + PATHS_PERCENTAGE = "paths_percentage" + DEPENDENCY_THRESHOLD = "dependency_threshold" + MIN_FACT_EDGES_INTERLEAVINGS = "min_fact_edges_interleavings" + + +def apply(dataframe1: pd.DataFrame, dataframe2: pd.DataFrame, interleavings: pd.DataFrame, + parameters: Optional[Dict[Any, Any]] = None) -> Digraph: + """ + Visualizes the interleavings discovered between two different processes. + We suppose to provide both event logs, and the discovered interleavings. + The visualization includes the DFG of both processes, along with the arcs discovered between them. + Both frequency and performance visualization are available. + + Parameters + -------------------- + dataframe1 + Dataframe of the first process + dataframe2 + Dataframe of the second process + interleavings + Interleavings between the two considered processes + parameters + Parameters of the algorithm, including: + - Parameters.FORMAT => the format of the visualization + - Parameters.BGCOLOR => the background color + - Parameters.RANKDIR => the rank direction (LR or TB; default: TB) + - Parameters.ANNOTATION => the annotation to represent (possible values: frequency or performance) + - Parameters.AGGREGATION_MEASURE => which aggregation should be used when considering performance + - Parameters.ACTIVITY_KEY => the activity key + - Parameters.ACTIVITY_PERCENTAGE => the percentage of activities to include for the DFG of the single processes + - Parameters.PATHS_PERCENTAGE => the percentage of paths to include for the DFG of the single processes + - Parameters.DEPENDENCY_THRESHOLD => the dependency threshold to consider for the DFG of the single processes + - Parameters.MIN_FACT_EDGES_INTERLEAVINGS => factor that is multiplied to the minimum number of occurrences of + edges in the single processes, to decide if the interleavings edge should + be included. E.g., if 0.3 is provided, only interleavings edges having a frequency + of at least 0.3 * MIN_EDGE_COUNT_IN_PROCESSES are included. + Returns + ---------------- + digraph + Graphviz Digraph + """ + if parameters is None: + parameters = {} + + image_format = exec_utils.get_param_value(Parameters.FORMAT, parameters, "png") + bgcolor = exec_utils.get_param_value(Parameters.BGCOLOR, parameters, "transparent") + rankdir = exec_utils.get_param_value(Parameters.RANKDIR, parameters, "TB") + annotation = exec_utils.get_param_value(Parameters.ANNOTATION, parameters, "frequency") + aggregation_measure = exec_utils.get_param_value(Parameters.AGGREGATION_MEASURE, parameters, "mean") + activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY) + activity_percentage = exec_utils.get_param_value(Parameters.ACTIVITY_PERCENTAGE, parameters, 0.3) + paths_percentage = exec_utils.get_param_value(Parameters.PATHS_PERCENTAGE, parameters, 0.3) + dependency_threshold = exec_utils.get_param_value(Parameters.DEPENDENCY_THRESHOLD, parameters, 0.3) + min_fact_edges_interleavings = exec_utils.get_param_value(Parameters.MIN_FACT_EDGES_INTERLEAVINGS, parameters, 0.3) + + filename = tempfile.NamedTemporaryFile(suffix='.gv') + viz = Digraph("interleavings", filename=filename.name, engine='dot', graph_attr={'bgcolor': bgcolor}) + viz.attr('node', shape='ellipse', fixedsize='false') + + viz.attr(rankdir=rankdir) + viz.format = image_format + + freq_dfg1, perf_dfg1, sa1, ea1, act_count1 = __get_freq_perf_df(dataframe1, activity_key, aggregation_measure, + activity_percentage, paths_percentage, + dependency_threshold) + freq_dfg2, perf_dfg2, sa2, ea2, act_count2 = __get_freq_perf_df(dataframe2, activity_key, aggregation_measure, + activity_percentage, paths_percentage, + dependency_threshold) + + min_act_count = min(min(act_count1.values()), min(act_count2.values())) + max_act_count = max(max(act_count1.values()), max(act_count2.values())) + + min_edge_count = min([min(freq_dfg1.values()), min(freq_dfg2.values())]) + + interleavings_lr_frequency = interleavings[interleavings["@@direction"] == "LR"][ + ["@@source_activity", "@@target_activity"]].value_counts().to_dict() + interleavings_lr_performance = \ + interleavings[interleavings["@@direction"] == "LR"].groupby(["@@source_activity", "@@target_activity"])[ + "@@timestamp_diff"].agg(aggregation_measure).to_dict() + interleavings_rl_frequency = interleavings[interleavings["@@direction"] == "RL"][ + ["@@source_activity", "@@target_activity"]].value_counts().to_dict() + interleavings_rl_performance = \ + interleavings[interleavings["@@direction"] == "RL"].groupby(["@@source_activity", "@@target_activity"])[ + "@@timestamp_diff"].agg(aggregation_measure).to_dict() + + interleavings_lr_frequency = {x: y for x, y in interleavings_lr_frequency.items() if x[0] in act_count1 and x[ + 1] in act_count2 and y >= min_edge_count * min_fact_edges_interleavings} + interleavings_rl_frequency = {x: y for x, y in interleavings_rl_frequency.items() if x[0] in act_count2 and x[ + 1] in act_count1 and y >= min_edge_count * min_fact_edges_interleavings} + interleavings_lr_performance = {x: y for x, y in interleavings_lr_performance.items() if + x[0] in act_count1 and x[1] in act_count2 and x in interleavings_lr_frequency} + interleavings_rl_performance = {x: y for x, y in interleavings_rl_performance.items() if + x[0] in act_count2 and x[1] in act_count1 and x in interleavings_rl_frequency} + + min_edge_count = min([min(freq_dfg1.values()), min(freq_dfg2.values()), min(interleavings_lr_frequency.values()), + min(interleavings_rl_frequency.values()), min(sa1.values()), min(sa2.values()), + min(ea1.values()), min(ea2.values())]) + max_edge_count = max([max(freq_dfg1.values()), max(freq_dfg2.values()), max(interleavings_lr_frequency.values()), + max(interleavings_rl_frequency.values()), max(sa1.values()), max(sa2.values()), + max(ea1.values()), max(ea2.values())]) + + min_edge_perf = min([min(perf_dfg1.values()), min(perf_dfg2.values()), min(interleavings_lr_performance.values()), + min(interleavings_rl_performance.values())]) + max_edge_perf = max([max(perf_dfg1.values()), max(perf_dfg2.values()), max(interleavings_lr_performance.values()), + max(interleavings_rl_performance.values())]) + + nodes1 = {} + nodes2 = {} + + with viz.subgraph(name="First Model") as c1: + c1.attr(style='filled') + c1.attr(color='lightgray') + c1.attr(label="First Model") + + for act in act_count1: + act_uuid = str(uuid4()) + nodes1[act] = act_uuid + color = vis_utils.get_trans_freq_color(act_count1[act], min_act_count, max_act_count) + c1.node(act_uuid, label=act + "\n" + str(act_count1[act]), shape="box", style="filled", fillcolor=color) + + for edge in freq_dfg1: + if annotation == "frequency": + count = freq_dfg1[edge] + label = str(count) + penwidth = str(vis_utils.get_arc_penwidth(count, min_edge_count, max_edge_count)) + elif annotation == "performance": + perf = perf_dfg1[edge] + label = vis_utils.human_readable_stat(perf) + penwidth = str(vis_utils.get_arc_penwidth(perf, min_edge_perf, max_edge_perf)) + viz.edge(nodes1[edge[0]], nodes1[edge[1]], label=label, penwidth=penwidth) + + c1.node("@@startnode1", "<●>", shape='circle', fontsize="34", color="black", fontcolor="black") + c1.node("@@endnode1", "<■>", shape='doublecircle', fontsize="32", color="black", fontcolor="black") + + for sa in sa1: + penwidth = str(vis_utils.get_arc_penwidth(sa1[sa], min_edge_count, max_edge_count)) + label = str(sa1[sa]) if annotation == "frequency" else " " + viz.edge("@@startnode1", nodes1[sa], color="black", label=label, penwidth=penwidth) + + for ea in ea1: + penwidth = str(vis_utils.get_arc_penwidth(ea1[ea], min_edge_count, max_edge_count)) + label = str(ea1[ea]) if annotation == "frequency" else " " + viz.edge(nodes1[ea], "@@endnode1", color="black", label=label, penwidth=penwidth) + + with viz.subgraph(name="Second Model") as c2: + c2.attr(style='filled') + c2.attr(color='lightgray') + c2.attr(label="Second Model") + + for act in act_count2: + act_uuid = str(uuid4()) + nodes2[act] = act_uuid + color = vis_utils.get_trans_freq_color(act_count2[act], min_act_count, max_act_count) + c2.node(act_uuid, label=act + "\n" + str(act_count2[act]), shape="box", style="filled", fillcolor=color, + color="gray", fontcolor="gray") + + for edge in freq_dfg2: + if annotation == "frequency": + count = freq_dfg2[edge] + label = str(count) + penwidth = str(vis_utils.get_arc_penwidth(count, min_edge_count, max_edge_count)) + elif annotation == "performance": + perf = perf_dfg2[edge] + label = vis_utils.human_readable_stat(perf) + penwidth = str(vis_utils.get_arc_penwidth(perf, min_edge_perf, max_edge_perf)) + viz.edge(nodes2[edge[0]], nodes2[edge[1]], label=label, penwidth=penwidth, color="gray", fontcolor="gray") + + c2.node("@@startnode2", "<●>", shape='circle', fontsize="34", color="gray", fontcolor="gray") + c2.node("@@endnode2", "<■>", shape='doublecircle', fontsize="32", color="gray", fontcolor="gray") + + for sa in sa2: + penwidth = str(vis_utils.get_arc_penwidth(sa2[sa], min_edge_count, max_edge_count)) + label = str(sa2[sa]) if annotation == "frequency" else " " + viz.edge("@@startnode2", nodes2[sa], color="gray", label=label, penwidth=penwidth) + + for ea in ea2: + penwidth = str(vis_utils.get_arc_penwidth(ea2[ea], min_edge_count, max_edge_count)) + label = str(ea2[ea]) if annotation == "frequency" else " " + viz.edge(nodes2[ea], "@@endnode2", color="gray", label=label, penwidth=penwidth) + + for edge in interleavings_lr_frequency: + if annotation == "frequency": + count = interleavings_lr_frequency[edge] + label = str(count) + penwidth = str(vis_utils.get_arc_penwidth(count, min_edge_count, max_edge_count)) + elif annotation == "performance": + perf = interleavings_lr_performance[edge] + label = vis_utils.human_readable_stat(perf) + penwidth = str(vis_utils.get_arc_penwidth(perf, min_edge_perf, max_edge_perf)) + viz.edge(nodes1[edge[0]], nodes2[edge[1]], label=label, penwidth=penwidth, color="violet", fontcolor="violet", + style="dashed") + + for edge in interleavings_rl_frequency: + if annotation == "frequency": + count = interleavings_rl_frequency[edge] + label = str(count) + penwidth = str(vis_utils.get_arc_penwidth(count, min_edge_count, max_edge_count)) + elif annotation == "performance": + perf = interleavings_rl_frequency[edge] + label = vis_utils.human_readable_stat(perf) + penwidth = str(vis_utils.get_arc_penwidth(perf, min_edge_perf, max_edge_perf)) + viz.edge(nodes2[edge[0]], nodes1[edge[1]], label=label, penwidth=penwidth, color="violet", fontcolor="violet", + style="dashed") + + return viz diff --git a/pm4py/visualization/ocel/interleavings/visualizer.py b/pm4py/visualization/ocel/interleavings/visualizer.py new file mode 100644 index 000000000..1b1aab772 --- /dev/null +++ b/pm4py/visualization/ocel/interleavings/visualizer.py @@ -0,0 +1,80 @@ +from graphviz import Digraph +from enum import Enum +from pm4py.util import exec_utils +from typing import Optional, Dict, Any +from pm4py.visualization.common import gview +from pm4py.visualization.common import save as gsave +from pm4py.visualization.ocel.interleavings.variants import graphviz +import pandas as pd + + +class Variants(Enum): + GRAPHVIZ = graphviz + + +def apply(dataframe1: pd.DataFrame, dataframe2: pd.DataFrame, interleavings: pd.DataFrame, variant=Variants.GRAPHVIZ, + parameters: Optional[Dict[Any, Any]] = None) -> Digraph: + """ + Visualizes the interleavings discovered between two different processes. + We suppose to provide both event logs, and the discovered interleavings. + The visualization includes the DFG of both processes, along with the arcs discovered between them. + Both frequency and performance visualization are available. + + Parameters + -------------------- + dataframe1 + Dataframe of the first process + dataframe2 + Dataframe of the second process + interleavings + Interleavings between the two considered processes + variant + Variant of the visualizer to apply, possible values: Variants.GRAPHVIZ + parameters + Variant-specific parameters + + Returns + ---------------- + digraph + Graphviz Digraph + """ + return exec_utils.get_variant(variant).apply(dataframe1, dataframe2, interleavings, parameters=parameters) + + +def save(gviz: Digraph, output_file_path: str): + """ + Save the diagram + + Parameters + ----------- + gviz + GraphViz diagram + output_file_path + Path where the GraphViz output should be saved + """ + gsave.save(gviz, output_file_path) + + +def view(gviz: Digraph): + """ + View the diagram + + Parameters + ----------- + gviz + GraphViz diagram + """ + return gview.view(gviz) + + +def matplotlib_view(gviz: Digraph): + """ + Views the diagram using Matplotlib + + Parameters + --------------- + gviz + Graphviz + """ + + return gview.matplotlib_view(gviz)