From 77bd63c70deea46b534b28e6a83eb7eff20c12fb Mon Sep 17 00:00:00 2001 From: brendangeck <71093409+brendangeck@users.noreply.github.com> Date: Thu, 16 Sep 2021 13:47:47 -0700 Subject: [PATCH] RosBag odd padding parsing error fix (#23) * Small reformat and adding gdb to yum install list to use with --compilation_mode dbg * Removing order-naive parsing of bagfiles in favor of a more informed algorithm. * Bumping version number * Adding unit tests to verify we support the 'oddly padded bag' edge case Co-authored-by: Ubuntu --- Dockerfile | 5 +- lib/embag.cc | 274 +++++++++++++++++++++++--------------------- lib/embag.h | 1 + lib/ros_bag_types.h | 1 + lib/version.bzl | 2 +- test/BUILD | 2 +- test/embag_test.cc | 7 ++ test/test_2.bag | Bin 0 -> 5347 bytes 8 files changed, 159 insertions(+), 133 deletions(-) create mode 100644 test/test_2.bag diff --git a/Dockerfile b/Dockerfile index 59ea62b..e86dded 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,9 @@ FROM quay.io/pypa/manylinux2014_x86_64 -RUN yum install npm git python-devel python2-pip -y -q && npm install -g @bazel/bazelisk && pip install wheel && pip install --upgrade "pip < 21.0" +RUN yum install npm git python-devel python2-pip gdb -y -q && \ + npm install -g @bazel/bazelisk && \ + pip install wheel && \ + pip install --upgrade "pip < 21.0" RUN mkdir -p /tmp/embag /tmp/pip_build /tmp/out COPY WORKSPACE /tmp/embag diff --git a/lib/embag.cc b/lib/embag.cc index d4432d3..4ffda42 100644 --- a/lib/embag.cc +++ b/lib/embag.cc @@ -147,138 +147,152 @@ RosBagTypes::header_t Bag::readHeader(const RosBagTypes::record_t &record) { template bool Bag::readRecords(boost::iostreams::stream &stream) { - while (true) { - const auto pos = stream.tellg(); - if (pos == -1 || pos == std::streampos(bag_bytes_size_)) { - break; + // Docs on the RosBag 2.0 format: http://wiki.ros.org/Bags/Format/2.0 + + /** + * Read the BAG_HEADER record. So long as the file is not corrupted, this is guaranteed + * to be the first record in the bag + */ + const auto pos = stream.tellg(); + uint32_t connection_count; + uint32_t chunk_count; + uint64_t index_pos; + + const auto bag_header_record = readRecord(stream); + const auto bag_header_header = readHeader(bag_header_record); + + bag_header_header.getField("conn_count", connection_count); + bag_header_header.getField("chunk_count", chunk_count); + bag_header_header.getField("index_pos", index_pos); + + // TODO: check these values are nonzero and index_pos is > 64 + connections_.resize(connection_count); + chunk_infos_.reserve(chunk_count); + chunks_.reserve(chunk_count); + index_pos_ = index_pos; + + /** + * Read the CONNECTION records. As per the docs, these are located at the start of the "INDEX" section, + * which is denoted by the index_pos field of the bag header + */ + stream.seekg(index_pos_, std::ios_base::beg); + for (size_t i = 0; i < connection_count; i++) { + const auto conn_record = readRecord(stream); + const auto conn_header = readHeader(conn_record); + + uint32_t connection_id; + std::string topic; + + conn_header.getField("conn", connection_id); + conn_header.getField("topic", topic); + + if (topic.empty()) + continue; + + // TODO: check these variables along with md5sum + RosBagTypes::connection_data_t connection_data; + connection_data.topic = topic; + + const auto fields = readFields(conn_record.data, conn_record.data_len); + + connection_data.type = fields->at("type"); + const size_t slash_pos = connection_data.type.find_first_of('/'); + if (slash_pos != std::string::npos) { + connection_data.scope = connection_data.type.substr(0, slash_pos); } - const auto record = readRecord(stream); - const auto header = readHeader(record); - - const auto op = header.getOp(); - - switch (op) { - case RosBagTypes::header_t::op::BAG_HEADER: { - uint32_t connection_count; - uint32_t chunk_count; - uint64_t index_pos; - - header.getField("conn_count", connection_count); - header.getField("chunk_count", chunk_count); - header.getField("index_pos", index_pos); - - // TODO: check these values are nonzero and index_pos is > 64 - connections_.resize(connection_count); - chunks_.reserve(chunk_count); - index_pos_ = index_pos; - - break; - } - case RosBagTypes::header_t::op::CHUNK: { - RosBagTypes::chunk_t chunk{record}; - chunk.offset = pos; - - header.getField("compression", chunk.compression); - header.getField("size", chunk.uncompressed_size); - - if (!(chunk.compression == "lz4" || chunk.compression == "bz2" || chunk.compression == "none")) { - throw std::runtime_error("Unsupported compression type: " + chunk.compression); - } - - chunks_.emplace_back(chunk); - - break; - } - case RosBagTypes::header_t::op::INDEX_DATA: { - uint32_t version; - uint32_t connection_id; - uint32_t msg_count; - - // TODO: check these values - header.getField("ver", version); - header.getField("conn", connection_id); - header.getField("count", msg_count); - - RosBagTypes::index_block_t index_block{}; - index_block.into_chunk = &chunks_.back(); - - connections_[connection_id].blocks.emplace_back(index_block); - - break; - } - case RosBagTypes::header_t::op::CONNECTION: { - uint32_t connection_id; - std::string topic; - - header.getField("conn", connection_id); - header.getField("topic", topic); - - if (topic.empty()) - continue; - - // TODO: check these variables along with md5sum - RosBagTypes::connection_data_t connection_data; - connection_data.topic = topic; - - const auto fields = readFields(record.data, record.data_len); - - connection_data.type = fields->at("type"); - const size_t slash_pos = connection_data.type.find_first_of('/'); - if (slash_pos != std::string::npos) { - connection_data.scope = connection_data.type.substr(0, slash_pos); - } - connection_data.md5sum = fields->at("md5sum"); - connection_data.message_definition = fields->at("message_definition"); - if (fields->find("callerid") != fields->end()) { - connection_data.callerid = fields->at("callerid"); - } - if (fields->find("latching") != fields->end()) { - connection_data.latching = fields->at("latching") == "1"; - } - - connections_[connection_id].id = connection_id; - connections_[connection_id].topic = topic; - connections_[connection_id].data = connection_data; - topic_connection_map_[topic].emplace_back(&connections_[connection_id]); - - break; - } - case RosBagTypes::header_t::op::MESSAGE_DATA: { - // Message data is usually found in chunks - break; - } - case RosBagTypes::header_t::op::CHUNK_INFO: { - uint32_t ver; - uint64_t chunk_pos; - RosValue::ros_time_t start_time; - RosValue::ros_time_t end_time; - uint32_t count; - - header.getField("ver", ver); - header.getField("chunk_pos", chunk_pos); - header.getField("start_time", start_time); - header.getField("end_time", end_time); - header.getField("count", count); - - // TODO: It might make sense to save this data in a map or reverse the search. - // At the moment there are only a few chunks so this doesn't really take long - auto chunk_it = std::find_if(chunks_.begin(), chunks_.end(), [&chunk_pos](const RosBagTypes::chunk_t &c) { - return c.offset == chunk_pos; - }); - - if (chunk_it == chunks_.end()) { - throw std::runtime_error("Unable to find chunk for chunk info at pos: " + std::to_string(chunk_pos)); - } - - chunk_it->info.start_time = start_time; - chunk_it->info.end_time = end_time; - chunk_it->info.message_count = count; - - break; - } - case RosBagTypes::header_t::op::UNSET: - default:throw std::runtime_error("Unknown record operation: " + std::to_string(uint8_t(op))); + connection_data.md5sum = fields->at("md5sum"); + connection_data.message_definition = fields->at("message_definition"); + if (fields->find("callerid") != fields->end()) { + connection_data.callerid = fields->at("callerid"); } + if (fields->find("latching") != fields->end()) { + connection_data.latching = fields->at("latching") == "1"; + } + + connections_[connection_id].id = connection_id; + connections_[connection_id].topic = topic; + connections_[connection_id].data = connection_data; + topic_connection_map_[topic].push_back(&connections_[connection_id]); + } + + /** + * Read the CHUNK_INFO records. These are guaranteed to be immediately after the CONNECTION records, + * so no need to seek the file pointer + */ + for (size_t i = 0; i < chunk_count; i++) { + const auto chunk_info_record = readRecord(stream); + const auto chunk_info_header = readHeader(chunk_info_record); + + RosBagTypes::chunk_info_t chunk_info; + + uint32_t ver; + uint64_t chunk_pos; + RosValue::ros_time_t start_time; + RosValue::ros_time_t end_time; + uint32_t count; + + chunk_info_header.getField("ver", ver); + chunk_info_header.getField("chunk_pos", chunk_pos); + chunk_info_header.getField("start_time", start_time); + chunk_info_header.getField("end_time", end_time); + chunk_info_header.getField("count", count); + + chunk_info.chunk_pos = chunk_pos; + chunk_info.start_time = start_time; + chunk_info.end_time = end_time; + chunk_info.message_count = count; + + chunk_infos_[i] = chunk_info; + } + + /** + * Now that we have some chunk metadata from the CHUNK_INFO records, process the CHUNK records from + * earlier in the file. Each CHUNK_INFO knows the position of its corresponding chunk. + */ + for (size_t i = 0; i < chunk_count; i++) { + const auto info = chunk_infos_[i]; + + // TODO: The chunk infos are not necessarily Revisit this logic if seeking back and forth across the file causes a slowdown + stream.seekg(info.chunk_pos, std::ios_base::beg); + + const auto chunk_record = readRecord(stream); + const auto chunk_header = readHeader(chunk_record); + + RosBagTypes::chunk_t chunk{chunk_record}; + chunk.offset = stream.tellg(); + + chunk_header.getField("compression", chunk.compression); + chunk_header.getField("size", chunk.uncompressed_size); + + if (!(chunk.compression == "lz4" || chunk.compression == "bz2" || chunk.compression == "none")) { + throw std::runtime_error("Unsupported compression type: " + chunk.compression); + } + + chunk.info = info; + + chunks_.push_back(chunk); + + // Each chunk is followed by an INDEX_DATA record, so parse that out here + const auto index_data_record = readRecord(stream); + const auto index_data_header = readHeader(index_data_record); + + uint32_t version; + uint32_t connection_id; + uint32_t msg_count; + + // TODO: check these values + index_data_header.getField("ver", version); + index_data_header.getField("conn", connection_id); + index_data_header.getField("count", msg_count); + + RosBagTypes::index_block_t index_block{}; + + // NOTE: It seems like it would be simpler to just do &chunk here right? WRONG. + // C++ resuses the same memory location for the chunk variable for each loop, so + // if you use &chunk, all `into_chunk` values will be exactly the same + index_block.into_chunk = &chunks_[i]; + + connections_[connection_id].blocks.push_back(index_block); } return true; diff --git a/lib/embag.h b/lib/embag.h index 110299f..994879e 100644 --- a/lib/embag.h +++ b/lib/embag.h @@ -124,6 +124,7 @@ class Bag { // Bag data std::vector connections_; std::unordered_map> topic_connection_map_; + std::vector chunk_infos_; std::vector chunks_; uint64_t index_pos_ = 0; std::unordered_map> message_schemata_; diff --git a/lib/ros_bag_types.h b/lib/ros_bag_types.h index 21469f6..40a7272 100644 --- a/lib/ros_bag_types.h +++ b/lib/ros_bag_types.h @@ -56,6 +56,7 @@ struct RosBagTypes { }; struct chunk_info_t { + uint64_t chunk_pos; RosValue::ros_time_t start_time; RosValue::ros_time_t end_time; uint32_t message_count = 0; diff --git a/lib/version.bzl b/lib/version.bzl index e5fbfba..0cbbfa1 100644 --- a/lib/version.bzl +++ b/lib/version.bzl @@ -1 +1 @@ -EMBAG_VERSION = "0.0.34" +EMBAG_VERSION = "0.0.35" diff --git a/test/BUILD b/test/BUILD index 09061c9..4ba0778 100644 --- a/test/BUILD +++ b/test/BUILD @@ -2,7 +2,7 @@ cc_test( name = "embag_test", srcs = ["embag_test.cc"], copts = ["-Iexternal/gtest/include"], - data = ["test.bag"], + data = ["test.bag", "test_2.bag"], deps = [ "//lib:embag", "@gtest", diff --git a/test/embag_test.cc b/test/embag_test.cc index 58988de..224b5e7 100644 --- a/test/embag_test.cc +++ b/test/embag_test.cc @@ -12,6 +12,13 @@ TEST(EmbagTest, OpenCloseBag) { bag.close(); } +TEST(EmbagTest, OpenCloseOddPaddingBag) { + // In some very rare cases rosbags will contain padding between the CHUNK and INDEX section of the file. + // This bagfile is one such case. + Embag::Bag oddly_padded_bag{"test/test_2.bag"}; + oddly_padded_bag.close(); +} + class BagTest : public ::testing::Test { protected: Embag::Bag bag_{"test/test.bag"}; diff --git a/test/test_2.bag b/test/test_2.bag new file mode 100644 index 0000000000000000000000000000000000000000..06ed45d384de843f72a57204f9d32227ff8ffdd6 GIT binary patch literal 5347 zcmeI0&riZI6vq)zlxX~YmPn%EA_6Mo(1RM|(L@rjOKA%%X6>>T6|ep+Uc7nsKk?>& z;(Ne!6GOZhF=>-;+P<~#y?xd#*(+XNG*6Dt3s=>>{rp)fmCAxtKr(C4dyPHvECEZv60ig;0ZYIVummgtOTZGa1T2AR zCQyQ3`cWNVcJOMufN;YX$`R z)GcUi#ReEnKM+J4?)ti;%S{t{va<_zpC9U;Pa2IX6ArB$IdvwO!<-g(oSIkRt5}my5P1J%icmfu91WhUQ~QrV}|0^6`8@-G Zc)LG~$$75nV)H`Y@Hsb&Q literal 0 HcmV?d00001