Skip to content

Commit

Permalink
There is no reason for a subscriber to not connect to multiple publis…
Browse files Browse the repository at this point in the history
…hers, so let's
  • Loading branch information
Stefan Eilemann authored and Stefan Eilemann committed Jul 26, 2017
1 parent 1e66af2 commit 496ad9a
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 35 deletions.
4 changes: 3 additions & 1 deletion doc/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

# git master

* [223](https://github.com/HBPVIS/ZeroEQ/pull/223):
Subscriber(uri) ctors are replaced by Subscriber(uris)
* [218](https://github.com/HBPVIS/ZeroEQ/pull/218):
Fix infinite loop in slow receivers
Fix infinite loop in slow receivers
* [215](https://github.com/HBPVIS/ZeroEQ/pull/215):
Implement Monitor to receive notifications for new subscribers
* [217](https://github.com/HBPVIS/ZeroEQ/pull/217):
Expand Down
47 changes: 42 additions & 5 deletions tests/pubSub.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

/* Copyright (c) 2014-2015, Human Brain Project
/* Copyright (c) 2014-2017, Human Brain Project
* Daniel Nachbaur <[email protected]>
* [email protected]
*/
Expand All @@ -21,7 +21,7 @@ BOOST_AUTO_TEST_CASE(publish_receive_serializable)
test::Echo echoIn;

zeroeq::Publisher publisher(zeroeq::NULL_SESSION);
zeroeq::Subscriber subscriber(zeroeq::URI(publisher.getURI()));
zeroeq::Subscriber subscriber(publisher.getURI());
test::Monitor monitor(publisher);

BOOST_CHECK(subscriber.subscribe(echoIn));
Expand Down Expand Up @@ -54,7 +54,7 @@ BOOST_AUTO_TEST_CASE(publish_receive_event)
{
const std::string echoString("The quick brown fox");
zeroeq::Publisher publisher(zeroeq::NULL_SESSION);
zeroeq::Subscriber subscriber(zeroeq::URI(publisher.getURI()));
zeroeq::Subscriber subscriber(publisher.getURI());
test::Monitor monitor(publisher, subscriber);

bool received = false;
Expand Down Expand Up @@ -94,7 +94,7 @@ BOOST_AUTO_TEST_CASE(publish_receive_event)
BOOST_AUTO_TEST_CASE(publish_receive_empty_event)
{
zeroeq::Publisher publisher(zeroeq::NULL_SESSION);
zeroeq::Subscriber subscriber(zeroeq::URI(publisher.getURI()));
zeroeq::Subscriber subscriber(publisher.getURI());
bool received = false;
BOOST_CHECK(
subscriber.subscribe(zeroeq::make_uint128("Empty"),
Expand All @@ -113,6 +113,43 @@ BOOST_AUTO_TEST_CASE(publish_receive_empty_event)
BOOST_CHECK(!"reachable");
}

BOOST_AUTO_TEST_CASE(two_publishers)
{
zeroeq::Publisher publisher1(zeroeq::NULL_SESSION);
zeroeq::Publisher publisher2(zeroeq::NULL_SESSION);
zeroeq::Subscriber subscriber({publisher1.getURI(), publisher2.getURI()});
bool received = false;
BOOST_CHECK(
subscriber.subscribe(zeroeq::make_uint128("Empty"),
zeroeq::EventFunc([&]() { received = true; })));

for (size_t i = 0; i < 10; ++i)
{
BOOST_CHECK(publisher1.publish(zeroeq::make_uint128("Empty")));

if (subscriber.receive(100))
{
BOOST_CHECK(received);
received = false;
while (subscriber.receive(100))
; /* drain publisher1 events */

for (i = 0; i < 10; ++i)
{
BOOST_CHECK(publisher2.publish(zeroeq::make_uint128("Empty")));

if (subscriber.receive(100))
{
BOOST_CHECK(received);
return;
}
}
BOOST_CHECK(!"reachable");
}
}
BOOST_CHECK(!"reachable");
}

BOOST_AUTO_TEST_CASE(no_receive)
{
zeroeq::Subscriber subscriber(zeroeq::URI("1.2.3.4:1234"));
Expand Down Expand Up @@ -211,7 +248,7 @@ BOOST_AUTO_TEST_CASE(publish_receive_filters)
zeroeq::Publisher publisher(
zeroeq::URI("inproc://zeroeq.test.publish_receive_filters"),
zeroeq::NULL_SESSION);
zeroeq::Subscriber subscriber(zeroeq::URI(publisher.getURI()));
zeroeq::Subscriber subscriber(publisher.getURI());

// Make sure we're connected
BOOST_CHECK(
Expand Down
35 changes: 19 additions & 16 deletions zeroeq/subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,27 @@ class Subscriber::Impl
update();
}

Impl(const URI& uri)
Impl(const URIs& uris)
: _context(detail::getContext())
, _browser(PUBLISHER_SERVICE)
, _selfInstance(detail::Sender::getUUID())
{
if (uri.getScheme() == DEFAULT_SCHEMA &&
(uri.getHost().empty() || uri.getPort() == 0))
for (const URI& uri : uris)
{
ZEROEQTHROW(std::runtime_error(
std::string("Non-fully qualified URI used for subscriber")));
}
if (uri.getScheme() == DEFAULT_SCHEMA &&
(uri.getHost().empty() || uri.getPort() == 0))
{
ZEROEQTHROW(std::runtime_error(std::string(
"Non-fully qualified URI used for subscriber")));
}

const std::string& zmqURI = buildZmqURI(uri);
if (!addConnection(zmqURI, uint128_t()))
{
ZEROEQTHROW(std::runtime_error("Cannot connect subscriber to " +
zmqURI + ": " +
zmq_strerror(zmq_errno())));
const std::string& zmqURI = buildZmqURI(uri);
if (!addConnection(zmqURI, uint128_t()))
{
ZEROEQTHROW(std::runtime_error("Cannot connect subscriber to " +
zmqURI + ": " +
zmq_strerror(zmq_errno())));
}
}
}

Expand Down Expand Up @@ -292,9 +295,9 @@ Subscriber::Subscriber(const std::string& session)
{
}

Subscriber::Subscriber(const URI& uri)
Subscriber::Subscriber(const URIs& uris)
: Receiver()
, _impl(new Impl(uri))
, _impl(new Impl(uris))
{
}

Expand All @@ -310,9 +313,9 @@ Subscriber::Subscriber(const std::string& session, Receiver& shared)
{
}

Subscriber::Subscriber(const URI& uri, Receiver& shared)
Subscriber::Subscriber(const URIs& uris, Receiver& shared)
: Receiver(shared)
, _impl(new Impl(uri))
, _impl(new Impl(uris))
{
}

Expand Down
32 changes: 21 additions & 11 deletions zeroeq/subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#define ZEROEQ_SUBSCRIBER_H

#include <zeroeq/receiver.h> // base class
#include <zeroeq/uri.h> // used inline

#include <vector>

Expand Down Expand Up @@ -58,16 +59,16 @@ class Subscriber : public Receiver
ZEROEQ_API explicit Subscriber(const std::string& session);

/**
* Create a subscriber which subscribes to a specific publisher.
* Create a subscriber which subscribes to specific publishers.
*
* Postconditions:
* - connected to the publisher on the given URI once publisher is running
* on the URI
* - connected to the publishers on the given URIs once publishers are
* running on the URIs
*
* @param uri publisher URI in the format [scheme://]*|host|IP|IF:port
* @throw std::runtime_error if URI is not fully qualified
* @param uris publisher URIs in the format [scheme://]*|host|IP|IF:port
* @throw std::runtime_error if an URI is not fully qualified
*/
ZEROEQ_API explicit Subscriber(const URI& uri);
ZEROEQ_API explicit Subscriber(const URIs& uris);

/**
* Create a default shared subscriber.
Expand All @@ -89,19 +90,28 @@ class Subscriber : public Receiver
ZEROEQ_API Subscriber(const std::string& session, Receiver& shared);

/**
* Create a shared subscriber which subscribes to publisher(s) on the given
* URI.
* Create a shared subscriber which subscribes to publishers on the given
* URIs.
*
* @sa Subscriber( const URI& )
* @sa Subscriber( const URIs& )
*
* @param uri publisher URI in the format [scheme://]*|host|IP|IF:port
* @param uris publisher URIs in the format [scheme://]*|host|IP|IF:port
* @param shared another receiver to share data reception with
*/
ZEROEQ_API Subscriber(const URI& uri, Receiver& shared);
ZEROEQ_API Subscriber(const URIs& uris, Receiver& shared);

/** Destroy this subscriber and withdraw any subscriptions. */
ZEROEQ_API ~Subscriber();

explicit Subscriber(const URI& uri) //!< @deprecated
: Subscriber(URIs{uri})
{
}
Subscriber(const URI& uri, Receiver& shared) //!< @deprecated
: Subscriber(URIs{uri}, shared)
{
}

/**
* Subscribe a serializable object to receive updates from any connected
* publisher.
Expand Down
6 changes: 4 additions & 2 deletions zeroeq/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ class Sender;
class Subscriber;
class URI;

using URIs = std::vector<URI>; //!< A vector of URIs

/** Callback for receival of subscribed event w/o payload. */
typedef std::function<void()> EventFunc;
using EventFunc = std::function<void()>;

/** Callback for receival of subscribed event w/ payload. */
typedef std::function<void(const void*, size_t)> EventPayloadFunc;
using EventPayloadFunc = std::function<void(const void*, size_t)>;

#ifdef WIN32
typedef SOCKET SocketDescriptor;
Expand Down

0 comments on commit 496ad9a

Please sign in to comment.