-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstress_test.py
109 lines (82 loc) · 2.84 KB
/
stress_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
"""
Test a dag that spawns subDAGs that spawns more subDAGs
main_dag will span some test_dag's which spawns test_subdags
If the queue names are not setup properly, this will deadlock
"""
from lightflow.models import Dag, Parameters, Option
from lightflow.tasks import PythonTask
from lightflow.models.task_data import TaskData, MultiTaskData
from celery.contrib import rdb
import logging
from logging.handlers import SysLogHandler
import os
def get_logger():
logger = logging.getLogger()
logger.setLevel(logging.INFO)
fh = logging.FileHandler(os.path.expandir('~/logs/stress_test.log'))
fh.setLevel(logging.INFO)
# host is your server that used for the log
formatter = logging.Formatter('%(name)s: %(levelname)s %(message)s')
fh.setFormatter(formatter)
logger.addHandler(fh)
return logger
parameters = Parameters([
Option('N', help='Specify number of frames to test', default=10, type=int),
])
def main_func(data, store, signal, context):
store.set("foo", "bar")
print("in main func")
dag_names = list()
N = store.get("N")
print(N)
#logger = get_logger()
#logger.info("test")
for i in range(N):
print("iteration {} of {}".format(i, N))
dag_names.append(signal.start_dag(sub_dag, data=data))
signal.join_dags(dag_names)
from collections import deque
def sub_func(data, store, signal, context):
N = 10
dag_names = deque()
for i in range(N):
# test a debugger
print("iteration {} of {}".format(i, N))
dag_names.append(signal.start_dag(subsub_dag, data=data))
#rdb.set_trace()
#logger = get_logger()
#logger.info("starting DAG search loop")
stopped = False
while not stopped:
if len(dag_names):
dag_name = dag_names.popleft()
if len(dag_names) == 0:
stopped = True
else:
time.sleep(.1)
#signal.join_dags(dag_names)
def subsub_func(data, store, signal, context):
print("completed")
main_task = PythonTask(name="main_task",
callback=main_func, queue='cms-main-task')
main_dag_dict = {
main_task: None,
}
main_dag = Dag("main_dag", autostart=True, queue='cms-main')
main_dag.define(main_dag_dict)
sub_task = PythonTask(name="test_task",
callback=sub_func, queue='cms-primary-task')
sub_dag_dict = {
sub_task: None,
}
sub_dag = Dag("test_dag", autostart=False, queue='cms-primary')
sub_dag.define(sub_dag_dict)
from functools import partial
OneImageTask = partial(PythonTask, queue='cms-oneimage-task')
subsub_task = OneImageTask(name="testsub_task",
callback=subsub_func)#, queue='cms-oneimage-task')
subsub_dag_dict = {
subsub_task: None,
}
subsub_dag = Dag("testsubdag_dag", autostart=False, queue='cms-oneimage')
subsub_dag.define(subsub_dag_dict)