Skip to content

Commit

Permalink
add stealth mode for topic_tools/relay (#1155)
Browse files Browse the repository at this point in the history
  • Loading branch information
furushchev authored and dirk-thomas committed Oct 20, 2017
1 parent 6bb5446 commit 34d0237
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 4 deletions.
1 change: 1 addition & 0 deletions tools/topic_tools/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ if(CATKIN_ENABLE_TESTING)
add_rostest(test/throttle_simtime.test)
add_rostest(test/drop.test)
add_rostest(test/relay.test)
add_rostest(test/relay_stealth.test)
add_rostest(test/lazy_transport.test)
## Latched test disabled until underlying issue in roscpp is resolved,
## #3385, #3434.
Expand Down
60 changes: 56 additions & 4 deletions tools/topic_tools/src/relay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ ros::NodeHandle *g_node = NULL;
bool g_advertised = false;
string g_input_topic;
string g_output_topic;
string g_monitor_topic;
ros::Publisher g_pub;
ros::Subscriber* g_sub;
ros::Timer g_timer;
bool g_lazy;
bool g_stealth;
ros::TransportHints g_th;

void conn_cb(const ros::SingleSubscriberPublisher&);
Expand All @@ -56,11 +59,20 @@ void subscribe()
g_sub = new ros::Subscriber(g_node->subscribe(g_input_topic, 10, &in_cb, g_th));
}

void unsubscribe()
{
if (g_sub)
{
delete g_sub;
g_sub = NULL;
}
}

void conn_cb(const ros::SingleSubscriberPublisher&)
{
// If we're in lazy subscribe mode, and the first subscriber just
// connected, then subscribe, #3389.
if(g_lazy && !g_sub)
if(g_lazy && !g_stealth && !g_sub)
{
ROS_DEBUG("lazy mode; resubscribing");
subscribe();
Expand Down Expand Up @@ -91,16 +103,47 @@ void in_cb(const ros::MessageEvent<ShapeShifter>& msg_event)
}
// If we're in lazy subscribe mode, and nobody's listening,
// then unsubscribe, #3389.
if(g_lazy && !g_pub.getNumSubscribers())
if((g_lazy || g_stealth) && !g_pub.getNumSubscribers())
{
ROS_DEBUG("lazy mode; unsubscribing");
delete g_sub;
g_sub = NULL;
unsubscribe();
}
else
g_pub.publish(msg);
}

void timer_cb(const ros::TimerEvent&)
{
if (!g_advertised) return;

// get subscriber num of ~monitor_topic
XmlRpc::XmlRpcValue req(ros::this_node::getName()), res, data;
if (!ros::master::execute("getSystemState", req, res, data, false))
{
ROS_ERROR("Failed to communicate with rosmaster");
return;
}

int subscriber_num = 0;
XmlRpc::XmlRpcValue sub_info = data[1];
for (int i = 0; i < sub_info.size(); ++i)
{
string topic_name = sub_info[i][0];
if (topic_name != g_monitor_topic) continue;
XmlRpc::XmlRpcValue& subscribers = sub_info[i][1];
for (int j = 0; j < subscribers.size(); ++j)
{
if (subscribers[j] != ros::this_node::getName()) ++subscriber_num;
}
break;
}

// if no node subscribes to ~monitor, do unsubscribe
if (g_sub && subscriber_num == 0) unsubscribe();
// if any other nodes subscribe ~monitor, do subscribe
else if (!g_sub && subscriber_num > 0) subscribe();
}

int main(int argc, char **argv)
{
if (argc < 2)
Expand Down Expand Up @@ -128,6 +171,15 @@ int main(int argc, char **argv)
if (unreliable)
g_th.unreliable().reliable(); // Prefers unreliable, but will accept reliable.

pnh.param<bool>("stealth", g_stealth, false);
if (g_stealth)
{
double monitor_rate;
pnh.param<string>("monitor_topic", g_monitor_topic, g_input_topic);
pnh.param<double>("monitor_rate", monitor_rate, 1.0);
g_timer = n.createTimer(ros::Duration(monitor_rate), &timer_cb);
}

subscribe();
ros::spin();
return 0;
Expand Down
21 changes: 21 additions & 0 deletions tools/topic_tools/test/relay_stealth.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<launch>
<node name="topic_pub" pkg="rostopic" type="rostopic"
args="pub -r 10 /original_topic std_msgs/String foo" />
<node name="topic_relay" pkg="topic_tools" type="relay"
args="/original_topic /original_topic/relay">
<rosparam>
lazy: true
</rosparam>
</node>
<test test-name="test_relay_stealth"
pkg="topic_tools" type="test_relay_stealth.py" />

<node name="relay_stealth" pkg="topic_tools" type="relay"
args="/original_topic/relay /relay_stealth/output"
output="screen">
<rosparam>
stealth: true
monitor_topic: /original_topic/relay
</rosparam>
</node>
</launch>
45 changes: 45 additions & 0 deletions tools/topic_tools/test/test_relay_stealth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import rospy
import unittest
from std_msgs.msg import String


class TestRelayStealth(unittest.TestCase):
def out_callback(self, msg):
self.out_msg_count += 1

def monitor_callback(self, msg):
self.monitor_msg_count += 1

def test_stealth_relay(self):
self.out_msg_count = 0
self.monitor_msg_count = 0
sub_out = rospy.Subscriber("/relay_stealth/output", String,
self.out_callback, queue_size=1)
for i in range(5):
if sub_out.get_num_connections() == 0:
rospy.sleep(1)
self.assertTrue(sub_out.get_num_connections() > 0)

rospy.sleep(5)
self.assertEqual(self.out_msg_count, 0)

sub_monitor = rospy.Subscriber("/original_topic/relay", String,
self.monitor_callback, queue_size=1)
rospy.sleep(5)
self.assertGreater(self.monitor_msg_count, 0)
self.assertGreater(self.out_msg_count, 0)

cnt = self.out_msg_count
sub_monitor.unregister()

rospy.sleep(3)
self.assertLess(abs(cnt - self.out_msg_count), 30)


if __name__ == '__main__':
import rostest
rospy.init_node("test_relay_stealth")
rostest.rosrun("topic_tools", "test_relay_stealth", TestRelayStealth)

0 comments on commit 34d0237

Please sign in to comment.