tcp_pubsub is a minimal publish-subscribe library that transports data via TCP. The project is CMake based. The dependencies are integrated as git submodules. In your own Project you can either use those submodules as well, or provide the dependencies in your own manner.
tcp_pubsub does not define a message format but only transports binary blobs. It does however define a protocol around that, which is kept as lightweight as possible.
Dependencies:
A very similar Example is also provided in the repository.
#include <thread>
#include <tcp_pubsub/executor.h>
#include <tcp_pubsub/publisher.h>
int main()
{
// Create a "Hello World" buffer
std::string data_to_send = "Hello World";
// Create an Executor with a thread-pool size of 6. If you create multiple
// publishers and subscribers, they all should share the same Executor.
std::shared_ptr<tcp_pubsub::Executor> executor = std::make_shared<tcp_pubsub::Executor>(6);
// Create a publisher that will offer the data on port 1588
tcp_pubsub::Publisher hello_world_publisher(executor, 1588);
for (;;)
{
// Send the "Hello World" string by passing the pointer to the first
// character and the length.
hello_world_publisher.send(&data_to_send[0], data_to_send.size());
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
return 0;
}
#include <iostream>
#include <thread>
#include <tcp_pubsub/executor.h>
#include <tcp_pubsub/subscriber.h>
int main()
{
// Create an Executor with a thread-pool size of 6. If you create multiple
// publishers and subscribers, they all should share the same Executor.
std::shared_ptr<tcp_pubsub::Executor> executor = std::make_shared<tcp_pubsub::Executor>(6);
// Create a subscriber
tcp_pubsub::Subscriber hello_world_subscriber(executor);
// Add a session to the subscriber that connects to port 1588 on localhost. A
// subscriber will aggregate traffic from multiple source, if you add multiple
// sessions.
hello_world_subscriber.addSession("127.0.0.1", 1588);
// Create a Callback that will be called each time a data packet is received.
// This function will create an std::string from the packet and print it to
// the console.
std::function<void(const tcp_pubsub::CallbackData& callback_data)> callback_function
= [](const tcp_pubsub::CallbackData& callback_data) -> void
{
std::cout << "Received playload: "
<< std::string(callback_data.buffer_->data(), callback_data.buffer_->size())
<< std::endl;
};
// Set the callback to the subsriber
hello_world_subscriber.setCallback(callback_function);
// Prevent the application from exiting immediatelly
for (;;) std::this_thread::sleep_for(std::chrono::milliseconds(500));
return 0;
}
You can set the following CMake Options to control how tcp_pubsub is built:
Option | Type | Default | Explanation |
---|---|---|---|
TCP_PUBSUB_BUILD_SAMPLES |
BOOL |
ON |
Build project samples. |
TCP_PUBSUB_BUILD_ECAL_SAMPLES |
BOOL |
OFF |
Build eCAL-based project samples. Requires eCAL to be findable by CMake. |
TCP_PUBSUB_USE_BUILTIN_ASIO |
BOOL |
ON |
Use the builtin asio submodule. If set to OFF , asio must be available from somewhere else (e.g. system libs). |
TCP_PUBSUB_USE_BUILTIN_RECYCLE |
BOOL |
ON |
Use the builtin steinwurf::recycle submodule. If set to OFF , recycle must be available from somewhere else (e.g. system libs). |
TCP_PUBSUB_BUILD_TESTS |
BOOL |
OFF |
Build the tcp_pubsub tests. Requires Gtest::GTest to be findable by CMake. |
TCP_PUBSUB_USE_BUILTIN_GTEST |
BOOL |
ON (if building tests) |
Use the builtin GoogleTest submodule. Only needed if TCP_PUBSUB_BUILD_TESTS is ON . If set to OFF , GoogleTest must be available from elsewhere. |
TCP_PUBSUB_LIBRARY_TYPE |
STRING |
Controls the library type of tcp_pubsub by injecting the string into the add_library call. Can be set to STATIC / SHARED / OBJECT. If set, this will override the regular BUILD_SHARED_LIBS CMake option. If not set, CMake will use the default setting, which is controlled by BUILD_SHARED_LIBS . |
There are several examples provided that aim to show you the functionality.
-
Install cmake and git / git-for-windows
-
Checkout this repo and the asio submodule
git clone https://github.com/eclipse-ecal/tcp_pubsub.git cd tcp_pubsub git submodule init git submodule update
-
CMake the project (Building as debug will add some debug output)
mkdir _build cd _build cmake .. -DCMAKE_BUILD_TYPE=Debug -DCMAKE_INSTALL_PREFIX=_install
-
Build the project
- Linux:
make
- Windows: Open
_build\tcp_pubsub.sln
with Visual Studio and build one of the example projects
- Linux:
-
Start either of the example pairs on the same machine.
hello_world_publisher /.exe
+hello_world_subscriber /.exe
orperformance_publisher /.exe
+performance_subscriber /.exe
When using this library, you do not need to know how the protocol works. Both the subscriber and receiver are completely implemented and ready for you to use. This section is meant for advanced users that are interested in the underlying protocol.
Show
The Protocol is quite simple:
-
The Subsriber connects to the publisher and sends a ProtocolHandshakeRequest. This Message contains the maximum protocol Version the Subscriber supports
-
The Publisher returns a ProtocolHandshakeResponse. This message contains the protocol version that will be used from now on. The version must not be higher than the version sent by the subsriber.
-
The Publisher starts sending data to the subsriber.
The ProtocolHandshake is meant to provide future-proof expansions. At the moment the only available protocol version is 0.
Subscriber Publisher
| |
| -> ProtocolHandshakeReq -> |
| |
| <- ProtocolHandshakeResp <- |
| |
| <--------- DATA <---------- |
| <--------- DATA <---------- |
| <--------- DATA <---------- |
| ... |
The protocol uses the following message layout. Values that are not sent by the sender are to be interpreted as 0.
-
General Message header Each message will have a message header as follows. Values are to be interpreted little-endian. This header is defined in tcp_pubsub/src/tcp_header.h
- 16 bit: Header size
- 8 bit: Type
- 0 = Regular Payload
- 1 = Handshake Message
- 8 bit: Reserved
- Must be 0
- 64bit: Payload size
-
ProtocolHandshakeReq & ProtocolHandshakeResp The layout of ProtocolHandshakeReq / ProtocolHandshakeResp is the same. Values are to be interpreted little-endian This message is defined in tcp_pubsub/src/protocol_handshake_message.h
- Message Header (size given in the first 16 bit)
- 8 bit: Protocol Version