Skip to content

Commit

Permalink
Support parallel work of multiple IPCMEssageSubscribers in one process
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryKuzmenko committed Mar 5, 2019
1 parent 1fa2072 commit 710ab50
Showing 1 changed file with 147 additions and 128 deletions.
275 changes: 147 additions & 128 deletions salt/transport/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import tornado.gen
import tornado.netutil
import tornado.concurrent
from tornado.locks import Semaphore
import tornado.queues
from tornado.locks import Lock
from tornado.ioloop import IOLoop, TimeoutError as TornadoTimeoutError
from tornado.iostream import IOStream
# Import Salt libs
Expand Down Expand Up @@ -582,11 +583,116 @@ def __del__(self):
self.close()


class IPCMessageSubscriber(IPCClient):
class IPCMessageSubscriberService(IPCClient):
'''
IPC message subscriber service that is a standalone singleton class starting once for a number
of IPCMessageSubscriber instances feeding all of them with data. It closes automatically when
there are no more subscribers.
To use this rever to IPCMessageSubscriber documentation.
'''
def __singleton_init__(self, socket_path, io_loop=None):
super(IPCMessageSubscriberService, self).__singleton_init__(
socket_path, io_loop=io_loop)
self.saved_data = []
self._read_in_progress = Lock()
self.handlers = weakref.WeakSet()

def _subscribe(self, handler):
self.handlers.add(handler)

def unsubscribe(self, handler):
self.handlers.discard(handler)

def _has_subscribers(self):
return bool(self.handlers)

def _feed_subscribers(self, data):
for subscriber in self.handlers:
subscriber._feed(data)

@tornado.gen.coroutine
def _read(self, timeout, callback=None):
try:
yield self._read_in_progress.acquire(timeout=0)
except tornado.gen.TimeoutError:
raise tornado.gen.Return(None)

log.debug('IPC Subscriber Service is starting reading')
# If timeout is not specified we need to set some here to make the service able to check
# is there any handler waiting for data.
if timeout is None:
timeout = 5

read_stream_future = None
while self._has_subscribers():
if read_stream_future is None:
read_stream_future = self.stream.read_bytes(4096, partial=True)

try:
wire_bytes = yield FutureWithTimeout(self.io_loop,
read_stream_future,
timeout)
read_stream_future = None

self.unpacker.feed(wire_bytes)
msgs = [msg['body'] for msg in self.unpacker]
self._feed_subscribers(msgs)
except TornadoTimeoutError:
# Continue checking are there alive waiting handlers
# Keep 'read_stream_future' alive to wait it more in the next loop
continue
except tornado.iostream.StreamClosedError as exc:
log.trace('Subscriber disconnected from IPC %s', self.socket_path)
self._feed_subscribers([None])
break
except Exception as exc:
log.error('Exception occurred in Subscriber while handling stream: %s', exc)
self._feed_subscribers([exc])
break

log.debug('IPC Subscriber Service is stopping due to a lack of subscribers')
self._read_in_progress.release()
raise tornado.gen.Return(None)

@tornado.gen.coroutine
def read(self, handler, timeout=None):
'''
Asynchronously read messages and invoke a callback when they are ready.
:param callback: A callback with the received data
'''
self._subscribe(handler)
while not self.connected():
try:
yield self.connect(timeout=5)
except tornado.iostream.StreamClosedError:
log.trace('Subscriber closed stream on IPC %s before connect', self.socket_path)
yield tornado.gen.sleep(1)
except Exception as exc:
log.error('Exception occurred while Subscriber connecting: %s', exc)
yield tornado.gen.sleep(1)
self._read(timeout)

def close(self):
'''
Routines to handle any cleanup before the instance shuts down.
Sockets and filehandles should be closed explicitly, to prevent
leaks.
'''
if not self._closing:
super(IPCMessageSubscriberService, self).close()

def __del__(self):
if IPCMessageSubscriberService in globals():
self.close()


class IPCMessageSubscriber(object):
'''
Salt IPC message subscriber
Create an IPC client to receive messages from IPC publisher
Create or reuse an IPC client to receive messages from IPC publisher
An example of a very simple IPCMessageSubscriber connecting to an IPCMessagePublisher.
This example assumes an already running IPCMessagePublisher.
Expand Down Expand Up @@ -615,147 +721,60 @@ class IPCMessageSubscriber(IPCClient):
# Wait for some data
package = ipc_subscriber.read_sync()
'''
def __singleton_init__(self, socket_path, io_loop=None):
super(IPCMessageSubscriber, self).__singleton_init__(
socket_path, io_loop=io_loop)
self._read_sync_future = None
self._read_stream_future = None
self._sync_ioloop_running = False
self.saved_data = []
self._sync_read_in_progress = Semaphore()

@tornado.gen.coroutine
def _read_sync(self, timeout):
yield self._sync_read_in_progress.acquire()
exc_to_raise = None
ret = None

try:
while True:
if self._read_stream_future is None:
self._read_stream_future = self.stream.read_bytes(4096, partial=True)

if timeout is None:
wire_bytes = yield self._read_stream_future
else:
future_with_timeout = FutureWithTimeout(
self.io_loop, self._read_stream_future, timeout)
wire_bytes = yield future_with_timeout
def __init__(self, socket_path, io_loop=None):
self.service = IPCMessageSubscriberService(socket_path, io_loop)
self.queue = tornado.queues.Queue()

self._read_stream_future = None
def connected(self):
return self.service.connected()

# Remove the timeout once we get some data or an exception
# occurs. We will assume that the rest of the data is already
# there or is coming soon if an exception doesn't occur.
timeout = None
def connect(self, callback=None, timeout=None):
return self.service.connect(callback=callback, timeout=timeout)

self.unpacker.feed(wire_bytes)
first = True
for framed_msg in self.unpacker:
if first:
ret = framed_msg['body']
first = False
else:
self.saved_data.append(framed_msg['body'])
if not first:
# We read at least one piece of data
break
except TornadoTimeoutError:
# In the timeout case, just return None.
# Keep 'self._read_stream_future' alive.
ret = None
except tornado.iostream.StreamClosedError as exc:
log.trace('Subscriber disconnected from IPC %s', self.socket_path)
self._read_stream_future = None
exc_to_raise = exc
except Exception as exc:
log.error('Exception occurred in Subscriber while handling stream: %s', exc)
self._read_stream_future = None
exc_to_raise = exc
@tornado.gen.coroutine
def _feed(self, msgs):
for msg in msgs:
yield self.queue.put(msg)

if self._sync_ioloop_running:
# Stop the IO Loop so that self.io_loop.start() will return in
# read_sync().
self.io_loop.spawn_callback(self.io_loop.stop)
@tornado.gen.coroutine
def read_async(self, callback, timeout=None):
'''
Asynchronously read messages and invoke a callback when they are ready.
if exc_to_raise is not None:
raise exc_to_raise # pylint: disable=E0702
self._sync_read_in_progress.release()
raise tornado.gen.Return(ret)
:param callback: A callback with the received data
'''
self.service.read(self)
while True:
try:
if timeout is not None:
deadline = time.time() + timeout
else:
deadline = None
data = yield self.queue.get(timeout=deadline)
except tornado.gen.TimeoutError:
raise tornado.gen.Return(None)
if data is None:
break
elif isinstance(data, Exception):
raise data
elif callback:
self.service.io_loop.spawn_callback(callback, data)
else:
raise tornado.gen.Return(data)

def read_sync(self, timeout=None):
'''
Read a message from an IPC socket
The socket must already be connected.
The associated IO Loop must NOT be running.
:param int timeout: Timeout when receiving message
:return: message data if successful. None if timed out. Will raise an
exception for all other error conditions.
'''
if self.saved_data:
return self.saved_data.pop(0)

self._sync_ioloop_running = True
self._read_sync_future = self._read_sync(timeout)
self.io_loop.start()
self._sync_ioloop_running = False

ret_future = self._read_sync_future
self._read_sync_future = None
return ret_future.result()

@tornado.gen.coroutine
def _read_async(self, callback):
while not self.stream.closed():
try:
self._read_stream_future = self.stream.read_bytes(4096, partial=True)
wire_bytes = yield self._read_stream_future
self._read_stream_future = None
self.unpacker.feed(wire_bytes)
for framed_msg in self.unpacker:
body = framed_msg['body']
self.io_loop.spawn_callback(callback, body)
except tornado.iostream.StreamClosedError:
log.trace('Subscriber disconnected from IPC %s', self.socket_path)
break
except Exception as exc:
log.error('Exception occurred while Subscriber handling stream: %s', exc)

@tornado.gen.coroutine
def read_async(self, callback):
'''
Asynchronously read messages and invoke a callback when they are ready.
:param callback: A callback with the received data
'''
while not self.connected():
try:
yield self.connect(timeout=5)
except tornado.iostream.StreamClosedError:
log.trace('Subscriber closed stream on IPC %s before connect', self.socket_path)
yield tornado.gen.sleep(1)
except Exception as exc:
log.error('Exception occurred while Subscriber connecting: %s', exc)
yield tornado.gen.sleep(1)
yield self._read_async(callback)
return self.service.io_loop.run_sync(lambda: self.read_async(None, timeout))

def close(self):
'''
Routines to handle any cleanup before the instance shuts down.
Sockets and filehandles should be closed explicitly, to prevent
leaks.
'''
if not self._closing:
IPCClient.close(self)
# This will prevent this message from showing up:
# '[ERROR ] Future exception was never retrieved:
# StreamClosedError'
if self._read_sync_future is not None and self._read_sync_future.done():
self._read_sync_future.exception()
if self._read_stream_future is not None and self._read_stream_future.done():
self._read_stream_future.exception()
self.service.unsubscribe(self)

def __del__(self):
if IPCMessageSubscriber in globals():
self.close()
self.close()

0 comments on commit 710ab50

Please sign in to comment.