Skip to content

Commit

Permalink
FT 1447 Fixing PTAndLogGenerator
Browse files Browse the repository at this point in the history
  • Loading branch information
fit-alessandro-berti authored and fit-sebastiaan-van-zelst committed Dec 14, 2021
1 parent a06cc1c commit 830f23d
Showing 1 changed file with 48 additions and 167 deletions.
215 changes: 48 additions & 167 deletions pm4py/algo/simulation/tree_generator/variants/ptandloggenerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from bisect import bisect as _bisect
import random
from typing import Optional, Dict, Any, Union, Tuple
from string import ascii_lowercase
import itertools
from pm4py.objects.process_tree.utils import generic as tree_util


def choices(population, weights=None, *, cum_weights=None, k=1):
Expand Down Expand Up @@ -159,155 +162,10 @@ def select_operator(self):
self.parameters["loop"], self.parameters["or"]])
return operator[0]

def get_next_activity(self, activity):
result = self.set_activity_labels[self.set_activity_labels.index(activity) + 1]
def get_next_activity(self):
result = self.iter.__next__()
return result

def assign_root_opeartor(self):
activity = "a"
# is a silent activity chosen
silent_activity = False
if random.random() < self.parameters["silent"]:
silent_activity = True
root = self.tree._get_root()
operator = self.select_operator()
root.operator = assign_operator(operator)
# if operator is loop, we use a special structure, otherwise 2
if operator == "loop":
root.operator = pt_operator.Operator.SEQUENCE
root_loop = obj.ProcessTree(operator=pt_operator.Operator.LOOP)
root_loop.parent = root
root._children.append(root_loop)
new_node = obj.ProcessTree(label=activity)
new_node.parent = root_loop
root_loop._children.append(new_node)
activity = self.get_next_activity(activity)
if silent_activity:
new_node = obj.ProcessTree(label=None)
new_node.parent = root_loop
root_loop._children.append(new_node)

else:
new_node = obj.ProcessTree(label=activity)
new_node.parent = root_loop
root_loop._children.append(new_node)
activity = self.get_next_activity(activity)
new_node = obj.ProcessTree(label=activity)
new_node.parent = root
root._children.append(new_node)
self.total_activities -= 1
else:
if silent_activity and operator == "choice":
number = random.choice([0, 1])
if number == 0:
new_node = obj.ProcessTree(label=None)
new_node.parent = root
root._children.append(new_node)
new_node = obj.ProcessTree(label=activity)
new_node.parent = root
root._children.append(new_node)
else:
new_node = obj.ProcessTree(label=activity)
new_node.parent = root
root._children.append(new_node)
new_node = obj.ProcessTree(label=None)
new_node.parent = root
root._children.append(new_node)
else:
new_node = obj.ProcessTree(label=activity)
new_node.parent = root
root._children.append(new_node)
activity = self.get_next_activity(activity)
new_node = obj.ProcessTree(label=activity)
new_node.parent = root
root._children.append(new_node)
# always two children are added
self.total_activities -= 2
return self.get_next_activity(activity)

def add_node(self, next_activity):
"""
Add nodes to current tree. The general procedure is as follows:
Select a random leaf (leaves have label). Next step, and opertor is chosen.
The chosen operator then replaces the leaf, whereby the old label is then add as a leaf to the manipulated node.
Then, next activity is added as a second leaf to the new operator node or a silent acticity (tau) is added.
:return: Next activity
"""
# Need to select random node that is not a silent activity
leaf_silent = True
while (leaf_silent):
leaf = random.choice(self.tree._get_leaves())
if leaf.label is not None:
leaf_silent = False
operator_nok = True
while (operator_nok):
operator = self.select_operator()
if self.total_activities > 1:
operator_nok = False
else:
if operator != "loop":
operator_nok = False
activity = leaf._get_label()
leaf._set_label(None)
leaf._set_operator(assign_operator(operator))
# Will be an tau added?
silent_activity = False
if random.random() < self.parameters["silent"]:
silent_activity = True
# add two children
if operator == "loop":
leaf._set_operator(pt_operator.Operator.SEQUENCE)
root_loop = obj.ProcessTree(pt_operator.Operator.LOOP)
root_loop.parent = leaf
leaf._children.append(root_loop)
new_node = obj.ProcessTree(label=activity)
new_node.parent = root_loop
root_loop._children.append(new_node)
activity = next_activity
if silent_activity:
new_node = obj.ProcessTree(label=None)
new_node.parent = root_loop
root_loop._children.append(new_node)
else:
new_node = obj.ProcessTree(label=activity)
new_node.parent = root_loop
root_loop._children.append(new_node)
activity = self.get_next_activity(activity)
new_node = obj.ProcessTree(label=activity)
new_node.parent = leaf
leaf._children.append(new_node)
self.total_activities -= 1
else:
if silent_activity and operator == "choice":
number = random.choice([0, 1])
if number == 0:
new_node = obj.ProcessTree(label=None)
new_node.parent = leaf
leaf._children.append(new_node)
new_node = obj.ProcessTree(label=activity)
new_node.parent = leaf
leaf._children.append(new_node)
else:
new_node = obj.ProcessTree(label=activity)
new_node.parent = leaf
leaf._children.append(new_node)
new_node = obj.ProcessTree(label=None)
new_node.parent = leaf
leaf._children.append(new_node)
else:
new_node = obj.ProcessTree(label=activity)
new_node.parent = leaf
leaf._children.append(new_node)
activity = next_activity
new_node = obj.ProcessTree(label=activity)
new_node.parent = leaf
leaf._children.append(new_node)

self.total_activities -= 2
if silent_activity and operator == "choice":
return next_activity
else:
return self.get_next_activity(activity)

def add_duplicates(self):
"""
Expand Down Expand Up @@ -353,27 +211,50 @@ def add_duplicates(self):
replacement._label = leaf._label
break

def add_node(self):
leaves = self.tree._get_leaves()
if self.tree.operator is None:
leaves.append(self.tree)
visible_leaves = [x for x in leaves if x.label is not None]
operator = self.select_operator()
op_map = {"parallel": obj.Operator.PARALLEL, "sequence": obj.Operator.SEQUENCE, "choice": obj.Operator.XOR, "loop": obj.Operator.LOOP, "or": obj.Operator.OR}
mapped_operator = op_map[operator]
order = random.randrange(0, 2)
chosen_leaf = random.choice(leaves)
label1 = chosen_leaf.label if chosen_leaf.label is not None else self.get_next_activity()
label2 = None
if chosen_leaf.parent is None:
self.tree = obj.ProcessTree(operator=mapped_operator)
chosen_leaf = self.tree
else:
parent = chosen_leaf.parent
del parent.children[parent.children.index(chosen_leaf)]
chosen_leaf = obj.ProcessTree(operator=mapped_operator, parent=parent)
parent.children.append(chosen_leaf)
r = random.random()
if self.total_activities - len(visible_leaves) > 1 and not r < self.parameters["silent"]:
label2 = self.get_next_activity()
node1 = obj.ProcessTree(label=label1, parent=chosen_leaf)
node2 = obj.ProcessTree(label=label2, parent=chosen_leaf)
if order == 0:
chosen_leaf.children.append(node1)
chosen_leaf.children.append(node2)
elif order == 1:
chosen_leaf.children.append(node2)
chosen_leaf.children.append(node1)

def iter_all_strings(self):
for size in itertools.count(1):
for s in itertools.product(ascii_lowercase, repeat=size):
yield "".join(s)

def create_process_tree(self):
self.iter = self.iter_all_strings()
self.tree = obj.ProcessTree()
self.set_activity_labels = []
p = 1
# create labels
while (self.total_activities > len(self.set_activity_labels)):
# pairwise product
l = itertools.product(self.alphabet, repeat=p)
for item in l:
label = ""
for element in item:
label += str(element)
self.set_activity_labels.append(label)
p += 1
step = 1
activity = self.assign_root_opeartor()
step += 1

while (self.total_activities > 0):
activity = self.add_node(activity)
step += 1
visible_leaves = [x for x in self.tree._get_leaves() if x.label is not None]
while len(visible_leaves) < self.total_activities:
self.add_node()
visible_leaves = [x for x in self.tree._get_leaves() if x.label is not None]

def __init__(self, parameters):
self.parameters = {}
Expand Down Expand Up @@ -405,5 +286,5 @@ def generate(self):
# add duplicates
self.add_duplicates()

self.tree = tree_util.fold(self.tree)
return self.tree

0 comments on commit 830f23d

Please sign in to comment.