Skip to content

Commit

Permalink
ZMQ: Add topics filtering (facontidavide#730)
Browse files Browse the repository at this point in the history
- Add ZeroMQ topics support.
- Add ZeroMQ publisher in /utilities.
- Add requirements.txt
  • Loading branch information
bonkuraps authored Dec 18, 2022
1 parent 5edd5f0 commit db90f70
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 50 deletions.
16 changes: 16 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,19 @@ installer/io.plotjuggler.application/data

# OS X
.DS_Store

# Cmake files
CMakeLists.txt.user
CMakeCache.txt
CMakeFiles
CMakeScripts
Testing
Makefile
cmake_install.cmake
install_manifest.txt
compile_commands.json
CTestTestfile.cmake
_deps

# Clangd
.cache
97 changes: 72 additions & 25 deletions plotjuggler_plugins/DataStreamZMQ/datastream_zmq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,22 @@ bool DataStreamZMQ::start(QStringList*)
QSettings settings;
QString address = settings.value("ZMQ_Subscriber::address", "localhost").toString();
QString protocol = settings.value("ZMQ_Subscriber::protocol", "JSON").toString();
QString topics = settings.value("ZMQ_Subscriber::topics", "").toString();

int port = settings.value("ZMQ_Subscriber::port", 9872).toInt();

dialog->ui->lineEditAddress->setText(address);
dialog->ui->lineEditPort->setText(QString::number(port));
dialog->ui->lineEditTopics->setText(topics);

ParserFactoryPlugin::Ptr parser_creator;

connect(dialog->ui->comboBoxProtocol,
qOverload<const QString &>(&QComboBox::currentIndexChanged),
this, [&](const QString & selected_protocol)
{
qOverload<const QString&>(&QComboBox::currentIndexChanged), this,
[&](const QString& selected_protocol) {
if (parser_creator)
{
if( auto prev_widget = parser_creator->optionsWidget())
if (auto prev_widget = parser_creator->optionsWidget())
{
prev_widget->setVisible(false);
}
Expand All @@ -110,21 +112,25 @@ bool DataStreamZMQ::start(QStringList*)
address = dialog->ui->lineEditAddress->text();
port = dialog->ui->lineEditPort->text().toUShort(&ok);
protocol = dialog->ui->comboBoxProtocol->currentText();
topics = dialog->ui->lineEditTopics->text();

_parser = parser_creator->createParser({}, {}, {}, dataMap());

// save back to service
settings.setValue("ZMQ_Subscriber::address", address);
settings.setValue("ZMQ_Subscriber::protocol", protocol);
settings.setValue("ZMQ_Subscriber::port", port);
settings.setValue("ZMQ_Subscriber::topics", topics);

_socket_address =
(dialog->ui->comboBox->currentText() + address + ":" + QString::number(port))
.toStdString();

_zmq_socket.connect(_socket_address.c_str());
// subscribe to everything
_zmq_socket.set(zmq::sockopt::subscribe, "");

parseTopicFilters(topics);
subscribeTopics();

_zmq_socket.set(zmq::sockopt::rcvtimeo, 100);

qDebug() << "ZMQ listening on address" << QString::fromStdString(_socket_address);
Expand All @@ -141,12 +147,15 @@ void DataStreamZMQ::shutdown()
if (_running)
{
_running = false;

if (_receive_thread.joinable())
{
_receive_thread.join();
}

unsubscribeTopics();

_zmq_socket.disconnect(_socket_address.c_str());
_running = false;
}
}

Expand All @@ -164,27 +173,65 @@ void DataStreamZMQ::receiveLoop()
double timestamp = 1e-6 * double(duration_cast<microseconds>(ts).count());

PJ::MessageRef msg(reinterpret_cast<uint8_t*>(recv_msg.data()), recv_msg.size());

try
if (parseMessage(msg, timestamp))
{
std::lock_guard<std::mutex> lock(mutex());
_parser->parseMessage(msg, timestamp);
emit this->dataReceived();
}
catch (std::exception& err)
{
QMessageBox::warning(nullptr, tr("ZMQ Subscriber"),
tr("Problem parsing the message. ZMQ Subscriber will be "
"stopped.\n%1")
.arg(err.what()),
QMessageBox::Ok);

_zmq_socket.disconnect(_socket_address.c_str());
_running = false;
// notify the GUI
emit closed();
return;
}
}
}
}

bool DataStreamZMQ::parseMessage(const PJ::MessageRef& msg, double& timestamp)
{
try
{
std::lock_guard<std::mutex> lock(mutex());
_parser->parseMessage(msg, timestamp);
return true;
}
catch (...)
{
return false;
}
}

void DataStreamZMQ::parseTopicFilters(const QString& topic_filters)
{
const QRegExp regex("(,{0,1}\\s+)|(;\\s*)");

if (topic_filters.trimmed().size() != 0)
{
const auto splitted = topic_filters.split(regex);

for (const auto& topic : splitted)
{
_topic_filters.push_back(topic.toStdString());
}
}
else
{
_topic_filters.push_back("");
}
}

void DataStreamZMQ::subscribeTopics()
{
for (const auto& topic : _topic_filters)
{
qDebug() << "ZMQ Subscribed topic" << QString::fromStdString(topic);

_zmq_socket.set(zmq::sockopt::subscribe, topic);
}
}

void DataStreamZMQ::unsubscribeTopics()
{
for (const auto& topic : _topic_filters)
{
qDebug() << "ZMQ Unsubscribed topic" << QString::fromStdString(topic);

_zmq_socket.set(zmq::sockopt::unsubscribe, topic);
}

_topic_filters.clear();
}
5 changes: 5 additions & 0 deletions plotjuggler_plugins/DataStreamZMQ/datastream_zmq.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ class DataStreamZMQ : public PJ::DataStreamer
PJ::MessageParserPtr _parser;
std::string _socket_address;
std::thread _receive_thread;
std::vector<std::string> _topic_filters;

void receiveLoop();
bool parseMessage(const PJ::MessageRef& msg, double& timestamp);
void parseTopicFilters(const QString& filters);
void subscribeTopics();
void unsubscribeTopics();
};
16 changes: 16 additions & 0 deletions plotjuggler_plugins/DataStreamZMQ/datastream_zmq.ui
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,22 @@
</layout>
</widget>
</item>
<item>
<widget class="QLabel" name="label_6">
<property name="font">
<font>
<weight>75</weight>
<bold>true</bold>
</font>
</property>
<property name="text">
<string>Topics:</string>
</property>
</widget>
</item>
<item>
<widget class="QLineEdit" name="lineEditTopics"/>
</item>
<item>
<spacer name="verticalSpacer">
<property name="orientation">
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/usr/bin/env python3

import zmq
import math
import json
import argparse

from time import sleep
import numpy as np

PORT = 9872

parser = argparse.ArgumentParser("start_test_publisher")

parser.add_argument("--topic|-t",
dest="topic",
help="Topic on which messages will be published",
type=str,
required=False)

args = parser.parse_args()
topic = args.topic


def main():
context = zmq.Context()
server_socket = context.socket(zmq.PUB)
server_socket.bind("tcp://*:" + str(PORT))
ticks = 0

while True:
data = {
"ticks": ticks,
"data": {
"cos": math.cos(ticks),
"sin": math.sin(ticks),
"floor": np.floor(np.cos(ticks)),
"ceil": np.ceil(np.cos(ticks))
}
}

if topic:
print(f"[{topic}] - " + json.dumps(data))
server_socket.send_multipart(
[topic.encode(), json.dumps(data).encode()])
else:
print(json.dumps(data))
server_socket.send(json.dumps(data).encode())

ticks += 1

sleep(0.1)


if __name__ == '__main__':
main()
25 changes: 0 additions & 25 deletions plotjuggler_plugins/DataStreamZMQ/zmp_publisher.py

This file was deleted.

3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
numpy==1.23.2
pyzmq==23.2.1
autopep8==1.7.0

0 comments on commit db90f70

Please sign in to comment.