-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbox.py
64 lines (54 loc) · 2.05 KB
/
box.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# -*- coding: utf-8 -*-
import time
import multiprocessing
from hil.log import logger
from hil.feedback import HILFeedback
class HILBox(object):
"""docstring for HILBox"""
def __init__(self, id, participation, role, actions, callback, timeout):
super(HILBox, self).__init__()
self.id = id
self.role = role
self.participation = participation
self.actions = actions
self.callback = callback
self.timeout = timeout
self.processes = []
self.start = time.time()
self.context = multiprocessing.get_context('spawn')
self.parent_conn, self.child_conn = multiprocessing.Pipe()
def launchProcess(self):
for action in self.actions:
p = self.context.Process(target=action().call, args=(self.child_conn, self.id, self.participation, self.role))
p.start()
self.processes.append(p)
logger.info("[#{}] Process {} started".format(self.id, action))
def processResponse(self):
responses = [self.parent_conn.recv() for p in self.processes]
logger.info("[#{}] Proces output {}".format(self.id, responses))
current_timeout = (time.time() - self.start)
req = HILFeedback(self.id, self.callback, responses, timeout = 2)
p = self.context.Process(target=req.run, args=(self.child_conn,))
p.start()
res = self.parent_conn.recv()
remain = time.time() - self.start
if res[0] == True:
logger.info("[#{}] Process ended in {} seconds with status {} and body {}".format(self.id, str(remain), res[1], res[2]))
else:
logger.info("[#{}] Process ended in {} seconds with error {}".format(self.id, str(remain), res[1]))
def timeoutProcess(self):
for p in self.processes:
if p.is_alive():
p.terminate()
p.join()
logger.info("[#{}] Process timeout".format(self.id))
def run(self):
self.launchProcess()
while time.time() - self.start <= self.timeout:
if all(not p.is_alive() for p in self.processes):
self.processResponse()
break
else:
time.sleep(.1) # Just to avoid hogging the CPU
else:
self.timeoutProcess()