Skip to content

Commit

Permalink
Added --repeat-latched option to rosbag record
Browse files Browse the repository at this point in the history
  • Loading branch information
tappan-at-git authored Dec 13, 2019
1 parent 917c542 commit f777d6e
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 6 deletions.
3 changes: 3 additions & 0 deletions tools/rosbag/include/rosbag/recorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ struct ROSBAG_DECL RecorderOptions
bool snapshot;
bool verbose;
bool publish;
bool repeat_latched;
CompressionType compression;
std::string prefix;
std::string name;
Expand Down Expand Up @@ -170,6 +171,8 @@ class ROSBAG_DECL Recorder

int exit_code_; //!< eventual exit code

std::map<std::pair<std::string, std::string>, OutgoingMessage> latched_msgs_;

boost::condition_variable_any queue_condition_; //!< conditional variable for queue
boost::mutex queue_mutex_; //!< mutex for queue
std::queue<OutgoingMessage>* queue_; //!< queue for storing
Expand Down
7 changes: 5 additions & 2 deletions tools/rosbag/src/record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ rosbag::RecorderOptions parseOptions(int argc, char** argv) {
("duration", po::value<std::string>(), "Record a bag of maximum duration in seconds, unless 'm', or 'h' is appended.")
("node", po::value<std::string>(), "Record all topics subscribed to by a specific node.")
("tcpnodelay", "Use the TCP_NODELAY transport hint when subscribing to topics.")
("udp", "Use the UDP transport hint when subscribing to topics.");
("udp", "Use the UDP transport hint when subscribing to topics.")
("repeat-latched", "Repeat latched msgs at the start of each new bag file.");



po::positional_options_description p;
p.add("topic", -1);

Expand Down Expand Up @@ -106,6 +107,8 @@ rosbag::RecorderOptions parseOptions(int argc, char** argv) {
opts.quiet = true;
if (vm.count("publish"))
opts.publish = true;
if (vm.count("repeat-latched"))
opts.repeat_latched = true;
if (vm.count("output-prefix"))
{
opts.prefix = vm["output-prefix"].as<std::string>();
Expand Down
24 changes: 23 additions & 1 deletion tools/rosbag/src/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,20 @@ void Recorder::doQueue(const ros::MessageEvent<topic_tools::ShapeShifter const>&

queue_->push(out);
queue_size_ += out.msg->size();


if (options_.repeat_latched)
{
ros::M_string::const_iterator it = out.connection_header->find("latching");
if ((it != out.connection_header->end()) && (it->second == "1"))
{
ros::M_string::const_iterator it2 = out.connection_header->find("callerid");
if (it2 != out.connection_header->end())
{
latched_msgs_.insert({{subscriber->getTopic(), it2->second}, out});
}
}
}

// Check to see if buffer has been exceeded
while (options_.buffer_size > 0 && queue_size_ > options_.buffer_size) {
OutgoingMessage drop = queue_->front();
Expand Down Expand Up @@ -393,6 +406,15 @@ void Recorder::startWriting() {
}
ROS_INFO("Recording to %s.", target_filename_.c_str());

if (options_.repeat_latched)
{
// Start each new bag file with copies of all latched messages.
for (auto const& out : latched_msgs_)
{
bag_.write(out.second.topic, out.second.time, *out.second.msg);
}
}

if (options_.publish)
{
std_msgs::String msg;
Expand Down
8 changes: 5 additions & 3 deletions tools/rosbag/src/rosbag/rosbag_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def record_cmd(argv):
parser.add_option("--lz4", dest="compression", action="store_const", const='lz4', help="use LZ4 compression")
parser.add_option("--tcpnodelay", dest="tcpnodelay", action="store_true", help="Use the TCP_NODELAY transport hint when subscribing to topics.")
parser.add_option("--udp", dest="udp", action="store_true", help="Use the UDP transport hint when subscribing to topics.")
parser.add_option("--repeat-latched", dest="repeat_latched", action="store_true", help="Repeat latched msgs at the start of each new bag file.")

(options, args) = parser.parse_args(argv)

Expand Down Expand Up @@ -134,6 +135,7 @@ def record_cmd(argv):
cmd.extend(["--node", options.node])
if options.tcpnodelay: cmd.extend(["--tcpnodelay"])
if options.udp: cmd.extend(["--udp"])
if options.repeat_latched: cmd.extend(["--repeat-latched"])

cmd.extend(args)

Expand Down Expand Up @@ -381,7 +383,7 @@ def expr_eval(expr):
else:
print('NO MATCH', verbose_pattern(topic, msg, t))

total_bytes += len(serialized_bytes)
total_bytes += len(serialized_bytes)
meter.step(total_bytes)
else:
for topic, raw_msg, t, conn_header in inbag.read_messages(raw=True, return_connection_header=True):
Expand Down Expand Up @@ -515,9 +517,9 @@ def check_cmd(argv):
sys.exit(1)

mm = MessageMigrator(args[1:] + append_rule, not options.noplugins)

migrations = checkbag(mm, args[0])

migrations = checkbag(mm, args[0])

if len(migrations) == 0:
print('Bag file does not need any migrations.')
exit(0)
Expand Down

0 comments on commit f777d6e

Please sign in to comment.