Skip to content

Commit

Permalink
Add a management interface for the VMPoller Proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
dnaeon committed Sep 11, 2013
1 parent c580c4f commit 37b1741
Showing 1 changed file with 65 additions and 3 deletions.
68 changes: 65 additions & 3 deletions src/vmpoller/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,10 +680,22 @@ def run(self, config_file):

self.frontend_endpoint = config.get('Default', 'frontend')
self.backend_endpoint = config.get('Default', 'backend')
self.mgmt_endpoint = config.get('Default', 'mgmt')

# A flag to indicate that the VMPollerProxy daemon should be terminated
self.time_to_die = False

# ZeroMQ context
self.zcontext = zmq.Context()

# A management socket, used to control the VMPollerProxy daemon
self.mgmt = self.zcontext.socket(zmq.REP)

try:
self.mgmt.bind(self.mgmt_endpoint)
except zmq.ZMQError as e:
raise VMPollerException, "Cannot bind management socket: %s" % e

# Socket facing clients
self.frontend = self.zcontext.socket(zmq.ROUTER)

Expand All @@ -700,14 +712,64 @@ def run(self, config_file):
except zmq.ZMQError as e:
raise VMPollerException, "Cannot bind backend socket: %s" % e

# Start the proxy
# Create a poll set for our sockets
self.zpoller = zmq.Poller()
self.zpoller.register(self.frontend, zmq.POLLIN)
self.zpoller.register(self.backend, zmq.POLLIN)
self.zpoller.register(self.mgmt, zmq.POLLIN)

syslog.syslog("Starting the VMPoller Proxy")
zmq.proxy(self.frontend, self.backend)

# Enter the daemon loop from here
while not self.time_to_die:
socks = dict(self.zpoller.poll())

# Frontend socket, forward messages to the backend
if socks.get(self.frontend) == zmq.POLLIN:
msg = self.frontend.recv()
more = self.frontend.getsockopt(zmq.RCVMORE)
if more:
self.backend.send(msg, zmq.SNDMORE)
else:
self.backend.send(msg)

# Backend socket, forward messages back to the frontend
if socks.get(self.backend) == zmq.POLLIN:
msg = self.backend.recv()
more = self.backend.getsockopt(zmq.RCVMORE)
if more:
self.frontend.send(msg, zmq.SNDMORE)
else:
self.frontend.send(msg)

# This is never reached...
# Management socket
if socks.get(self.mgmt) == zmq.POLLIN:
msg = self.mgmt.recv_json()
result = self.process_mgmt_message(msg)
self.mgmt.send(result)

# Shutdown time has arrived, let's clean up a bit
self.frontend.close()
self.backend.close()
self.mgmt.close()
self.zcontext.term()
self.stop()

def process_mgmt_message(self, msg):
"""
Processes a message for the management interface of the VMPoller Proxy
"""
# Check if we have a command to process
if not "cmd" in msg:
return "Missing command name"

if msg["cmd"] == "shutdown":
self.time_to_die = True
syslog.syslog("VMPoller Proxy is shutting down")
return "Shutting down VMPoller Proxy"
else:
return "Unknown command '%s' received" % msg["cmd"]

class VMPollerClient(object):
"""
Expand Down

0 comments on commit 37b1741

Please sign in to comment.