-
Notifications
You must be signed in to change notification settings - Fork 172
/
Copy pathdaemon.py
180 lines (151 loc) · 5.87 KB
/
daemon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# Copyright 2017-2021 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import errno
import functools
import os
import platform
import socket
import rclpy
import ros2cli.daemon as daemon
from ros2cli.daemon.daemonize import daemonize
from ros2cli.helpers import get_ros_domain_id
from ros2cli.helpers import wait_for
from ros2cli.xmlrpc.client import ServerProxy
class DaemonNode:
def __init__(self, args):
self._args = args
self._proxy = ServerProxy(
daemon.get_xmlrpc_server_url(),
allow_none=True)
self._methods = []
@property
def connected(self):
try:
self._methods = [
method
for method in self._proxy.system.listMethods()
if not method.startswith('system.')
]
except ConnectionRefusedError:
return False
return True
@property
def methods(self):
return self._methods
def __enter__(self):
self._proxy.__enter__()
return self
def __getattr__(self, name):
return getattr(self._proxy, name)
def __exit__(self, exc_type, exc_value, traceback):
self._proxy.__exit__(exc_type, exc_value, traceback)
def is_daemon_running(args):
"""
Check if the daemon node is running.
:param args: `DaemonNode` arguments namespace.
"""
with DaemonNode(args) as node:
return node.connected
def shutdown_daemon(args, timeout=None):
"""
Shut down daemon node if it's running.
:param args: `DaemonNode` arguments namespace.
:param timeout: optional duration, in seconds, to wait
until the daemon node is fully shut down. Non-positive
durations will result in an indefinite wait.
:return: `True` if the the daemon was shut down,
`False` if it was already shut down.
:raises: if it fails to shutdown the daemon.
"""
with DaemonNode(args) as node:
if not node.connected:
return False
node.system.shutdown()
if timeout is not None:
predicate = (lambda: not node.connected)
if not wait_for(predicate, timeout):
raise RuntimeError(
'Timed out waiting for '
'daemon to shutdown'
)
return True
def spawn_daemon(args, timeout=None, debug=False):
"""
Spawn daemon node if it's not running.
To avoid TOCTOU races, this function instantiates
the XMLRPC server (binding the socket in the process)
and transfers it to the daemon process through pipes
(sending the inheritable socket with it). In a sense,
the socket functionally behaves as a mutex.
:param args: `DaemonNode` arguments namespace.
:param timeout: optional duration, in seconds, to wait
until the daemon node is ready. Non-positive
durations will result in an indefinite wait.
:param debug: if `True`, the daemon process will output
to the current `stdout` and `stderr` streams.
:return: `True` if the the daemon was spawned,
`False` if it was already running.
:raises: if it fails to spawn the daemon.
"""
# Acquire socket by instantiating XMLRPC server.
try:
server = daemon.make_xmlrpc_server()
server.socket.set_inheritable(True)
except socket.error as e:
if e.errno == errno.EADDRINUSE:
# Failed to acquire socket
# Daemon already running
return False
raise
# During tab completion on the ros2 tooling, we can get here and attempt to spawn a daemon.
# In that scenario, there may be open file descriptors that can prevent us from successfully
# daemonizing, and instead cause the terminal to hang. Here we mark all file handles except
# for 0, 1, 2, and the server socket as non-inheritable, which will cause daemonize() to close
# those file descriptors. See https://github.com/ros2/ros2cli/issues/851 for more details.
if platform.system() != 'Windows':
# Some unices have a high soft_limit; read fdsize if available.
fdlimit = None
try:
string_to_find = 'FDSize:'
with open('/proc/self/status', 'r') as f:
for line in f:
if line.startswith(string_to_find):
fdlimit = int(line.removeprefix(string_to_find).strip())
break
except (FileNotFoundError, ValueError):
pass
# The soft limit might be quite high on some systems.
if fdlimit is None:
import resource
fdlimit, _ = resource.getrlimit(resource.RLIMIT_NOFILE)
for i in range(3, fdlimit):
try:
if i != server.socket.fileno() and os.get_inheritable(i):
os.set_inheritable(i, False)
except OSError:
# Just in case the file handles might be [3(closed), ..., 8(pipe handle), ...]
continue
# Transfer XMLRPC server to daemon (and the socket with it).
try:
tags = {
'name': 'ros2-daemon', 'ros_domain_id': get_ros_domain_id(),
'rmw_implementation': rclpy.get_rmw_implementation_identifier()}
daemonize(
functools.partial(daemon.serve, server),
tags=tags, timeout=timeout, debug=debug)
finally:
server.server_close()
return True
def add_arguments(parser):
pass