-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathwatchdog.py
42 lines (31 loc) · 979 Bytes
/
watchdog.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import time
import threading
SLEEP_TIME = 0.1
class Watchdog(threading.Thread):
def __init__(self, timeout_in_sec, callback):
threading.Thread.__init__(self)
self.daemon = True
self.interrupted = threading.Lock()
self.raised = False
self.timeout_in_sec = timeout_in_sec
self.callback = callback
self.start()
def stop(self):
self.interrupted.release()
def run(self):
self.interrupted.acquire()
self.ping()
while self.interrupted.locked():
if not self.raised:
if self.time_since_last_ping > self.timeout_in_sec:
self.callback()
self.raised = True
time.sleep(SLEEP_TIME)
def reset(self):
self.ping()
self.raised = False
def ping(self):
self.last_ping = time.time()
@property
def time_since_last_ping(self):
return time.time() - self.last_ping