Skip to content

Commit

Permalink
Monitor implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Stefan Eilemann authored and Stefan Eilemann committed Jul 7, 2017
1 parent 6dbb2fb commit ac15a38
Show file tree
Hide file tree
Showing 12 changed files with 251 additions and 8 deletions.
19 changes: 19 additions & 0 deletions tests/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,23 @@ void onEchoEvent(const void* data, const size_t size)
BOOST_CHECK_EQUAL(size, message.size());
BOOST_CHECK_EQUAL(echoMessage, message);
}

class Monitor : public zeroeq::Monitor
{
public:
explicit Monitor(zeroeq::Sender& sender)
: zeroeq::Monitor(sender)
, connections(0)
{
}

Monitor(zeroeq::Sender& sender, zeroeq::Receiver& shared)
: zeroeq::Monitor(sender, shared)
, connections(0)
{
}

void notifyNewConnection() final { ++connections; }
size_t connections;
};
}
7 changes: 6 additions & 1 deletion tests/connection/broker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class Subscriber

// Using the connection broker in place of zeroconf
BrokerPtr broker = createBroker(subscriber);
test::Monitor monitor(*broker, subscriber);
BOOST_REQUIRE(broker.get());
if (!broker)
return;
Expand All @@ -55,8 +56,12 @@ class Subscriber
}

// test receive of data for echo event
for (size_t i = 0; i < 100 && !received; ++i)
for (size_t i = 0; i < 100 && !received && monitor.connections == 0;
++i)
{
subscriber.receive(100);
}
BOOST_CHECK_EQUAL(monitor.connections, 1);
}

void waitStarted() const
Expand Down
28 changes: 26 additions & 2 deletions tests/pubSub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,31 @@ BOOST_AUTO_TEST_CASE(publish_receive_serializable)

zeroeq::Publisher publisher(zeroeq::NULL_SESSION);
zeroeq::Subscriber subscriber(zeroeq::URI(publisher.getURI()));
test::Monitor monitor(publisher);

BOOST_CHECK(subscriber.subscribe(echoIn));

for (size_t i = 0; i < 10; ++i)
{
BOOST_CHECK(publisher.publish(echoOut));

std::cout << monitor.receive(0) << std::endl;
if (subscriber.receive(100))
{
BOOST_CHECK_EQUAL(echoIn.getMessage(), echoOut.getMessage());

for (size_t j = 0; j < 10; ++j)
{
if (monitor.receive(100))
{
BOOST_CHECK_EQUAL(monitor.connections, 1);
return;
}
}
BOOST_CHECK(!"reachable");
return;
}
}
BOOST_CHECK_EQUAL(monitor.connections, 1);
BOOST_CHECK(!"reachable");
}

Expand All @@ -42,6 +55,8 @@ BOOST_AUTO_TEST_CASE(publish_receive_event)
const std::string echoString("The quick brown fox");
zeroeq::Publisher publisher(zeroeq::NULL_SESSION);
zeroeq::Subscriber subscriber(zeroeq::URI(publisher.getURI()));
test::Monitor monitor(publisher, subscriber);

bool received = false;
BOOST_CHECK(subscriber.subscribe(
zeroeq::make_uint128("Echo"),
Expand All @@ -59,6 +74,15 @@ BOOST_AUTO_TEST_CASE(publish_receive_event)
if (subscriber.receive(100))
{
BOOST_CHECK(received);
for (size_t j = 0; j < 10; ++j)
{
if (subscriber.receive(100))
{
BOOST_CHECK_EQUAL(monitor.connections, 1);
return;
}
}
BOOST_CHECK(!"reachable");
return;
}
}
Expand Down Expand Up @@ -208,7 +232,7 @@ BOOST_AUTO_TEST_CASE(publish_receive_filters)
while (subscriber.receive(0)) /* NOP to drain */
;
}
const auto& noEchoTime =
const auto noEchoTime =
std::chrono::high_resolution_clock::now() - startTime;

// Benchmark with echo handler, now should send data
Expand Down
3 changes: 3 additions & 0 deletions zeroeq/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ set(ZEROEQ_PUBLIC_HEADERS
connection/broker.h
connection/service.h
log.h
monitor.h
publisher.h
receiver.h
sender.h
subscriber.h
types.h
uri.h)
Expand All @@ -26,6 +28,7 @@ set(ZEROEQ_SOURCES
connection/service.cpp
detail/port.cpp
detail/sender.cpp
monitor.cpp
publisher.cpp
receiver.cpp
subscriber.cpp
Expand Down
7 changes: 6 additions & 1 deletion zeroeq/connection/broker.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

/* Copyright (c) 2014-2015, Human Brain Project
/* Copyright (c) 2014-2017, Human Brain Project
* [email protected]
*/

Expand Down Expand Up @@ -130,5 +130,10 @@ std::string Broker::getAddress() const
{
return _impl->getAddress();
}

void* Broker::getSocket()
{
return _impl->socket;
}
}
}
8 changes: 6 additions & 2 deletions zeroeq/connection/broker.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

/* Copyright (c) 2014-2015, Human Brain Project
/* Copyright (c) 2014-2017, Human Brain Project
* [email protected]
*/

Expand All @@ -8,6 +8,7 @@

#include <zeroeq/log.h>
#include <zeroeq/receiver.h> // base class
#include <zeroeq/sender.h> // base class

namespace zeroeq
{
Expand Down Expand Up @@ -35,7 +36,7 @@ class Broker;
*
* Example: @include tests/connection/broker.cpp
*/
class Broker : public Receiver
class Broker : public Receiver, public Sender
{
public:
enum PortSelection
Expand Down Expand Up @@ -94,6 +95,9 @@ class Broker : public Receiver
// Receiver API
void addSockets(std::vector<zeroeq::detail::Socket>& entries) final;
void process(zeroeq::detail::Socket& socket, uint32_t timeout) final;

// Sender API
void* getSocket() final;
};
}
}
Expand Down
118 changes: 118 additions & 0 deletions zeroeq/monitor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@

/* Copyright (c) 2017, Human Brain Project
* [email protected]
*/

#include "monitor.h"

#include "detail/socket.h"
#include "log.h"
#include "sender.h"

#include <zmq.h>

namespace zeroeq
{
class Monitor::Impl
{
public:
Impl(Sender& sender, void* context)
{
const auto inproc = std::string("inproc://zeroeq.monitor.") +
servus::make_UUID().getString();

if (::zmq_socket_monitor(sender.getSocket(), inproc.c_str(),
ZMQ_EVENT_ALL) != 0)
{
ZEROEQTHROW(
std::runtime_error(std::string("Cannot monitor socket: ") +
zmq_strerror(zmq_errno())));
}

_socket = ::zmq_socket(context, ZMQ_PAIR);
if (!_socket)
ZEROEQTHROW(std::runtime_error(
std::string("Cannot create inproc socket: ") +
zmq_strerror(zmq_errno())));

if (::zmq_connect(_socket, inproc.c_str()) != 0)
{
ZEROEQTHROW(std::runtime_error(
std::string("Cannot connect inproc socket: ") +
zmq_strerror(zmq_errno())));
}
}

~Impl()
{
if (_socket)
::zmq_close(_socket);
}

void addSockets(std::vector<zeroeq::detail::Socket>& entries)
{
zeroeq::detail::Socket entry;
entry.socket = _socket;
entry.events = ZMQ_POLLIN;
entries.push_back(entry);
}

private:
void* _socket;
};

Monitor::Monitor(Sender& sender)
: Receiver()
, _impl(new Impl(sender, getZMQContext()))
{
}

Monitor::Monitor(Sender& sender, Receiver& shared)
: Receiver(shared)
, _impl(new Impl(sender, getZMQContext()))
{
}

Monitor::~Monitor()
{
}

void Monitor::addSockets(std::vector<zeroeq::detail::Socket>& entries)
{
_impl->addSockets(entries);
}

void Monitor::process(zeroeq::detail::Socket& socket, uint32_t)
{
// Messages consist of 2 Frames, the first containing the event-id and the
// associated value. The second frame holds the affected endpoint as string.
zmq_msg_t msg;
zmq_msg_init(&msg);

// The layout of the first Frame is: 16 bit event id 32 bit event value
if (zmq_msg_recv(&msg, socket.socket, 0) == -1)
return;

const uint16_t event = *(uint16_t*)zmq_msg_data(&msg);
if (!zmq_msg_more(&msg))
return;
zmq_msg_close(&msg);

// Second frame in message contains event address, skip
zmq_msg_init(&msg);
if (zmq_msg_recv(&msg, socket.socket, 0) == -1)
return;
zmq_msg_close(&msg);

switch (event)
{
case ZMQ_EVENT_CONNECTED:
case ZMQ_EVENT_ACCEPTED:
notifyNewConnection();
break;

default:
ZEROEQWARN << "Unhandled monitor event " << event << std::endl;
}
}
}
38 changes: 38 additions & 0 deletions zeroeq/monitor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@

/* Copyright (c) 2017, Human Brain Project
* [email protected]
*/

#pragma once

#include <zeroeq/receiver.h> // base class

namespace zeroeq
{
/** Monitors a Sender and notifies on events on its socket. */
class Monitor : public Receiver
{
public:
/** Monitor the given sender. */
ZEROEQ_API explicit Monitor(Sender& sender);

/** Monitor the given sender and notify with the given shared group. */
ZEROEQ_API Monitor(Sender& sender, Receiver& shared);

/** Destroy this monitor*/
ZEROEQ_API ~Monitor();

/** Notify of a new connection to the sender. */
virtual void notifyNewConnection() {}
private:
class Impl;
std::unique_ptr<Impl> _impl;

Monitor(const Monitor&) = delete;
Monitor& operator=(const Monitor&) = delete;

// Receiver API
void addSockets(std::vector<zeroeq::detail::Socket>& entries) final;
void process(zeroeq::detail::Socket& socket, uint32_t timeout) final;
};
}
5 changes: 5 additions & 0 deletions zeroeq/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,9 @@ const URI& Publisher::getURI() const
{
return _impl->uri;
}

void* Publisher::getSocket()
{
return _impl->socket;
}
}
7 changes: 5 additions & 2 deletions zeroeq/publisher.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

/* Copyright (c) 2014-2016, Human Brain Project
/* Copyright (c) 2014-2017, Human Brain Project
* Daniel Nachbaur <[email protected]>
* [email protected]
*/
Expand All @@ -8,6 +8,7 @@
#define ZEROEQ_PUBLISHER_H

#include <zeroeq/api.h>
#include <zeroeq/sender.h> // base class
#include <zeroeq/types.h>

#include <memory>
Expand All @@ -22,7 +23,7 @@ namespace zeroeq
*
* Example: @include tests/publisher.cpp
*/
class Publisher
class Publisher : public Sender
{
public:
/**
Expand Down Expand Up @@ -136,6 +137,8 @@ class Publisher

Publisher(const Publisher&) = delete;
Publisher& operator=(const Publisher&) = delete;

ZEROEQ_API void* getSocket() final;
};
}

Expand Down
Loading

0 comments on commit ac15a38

Please sign in to comment.