Skip to content

Commit

Permalink
RosBag odd padding parsing error fix (#23)
Browse files Browse the repository at this point in the history
* Small reformat and adding gdb to yum install list to use with --compilation_mode dbg

* Removing order-naive parsing of bagfiles in favor of a more informed algorithm.

* Bumping version number

* Adding unit tests to verify we support the 'oddly padded bag' edge case

Co-authored-by: Ubuntu <[email protected]>
  • Loading branch information
brendangeck and Ubuntu authored Sep 16, 2021
1 parent 1c43ca2 commit 77bd63c
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 133 deletions.
5 changes: 4 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
FROM quay.io/pypa/manylinux2014_x86_64

RUN yum install npm git python-devel python2-pip -y -q && npm install -g @bazel/bazelisk && pip install wheel && pip install --upgrade "pip < 21.0"
RUN yum install npm git python-devel python2-pip gdb -y -q && \
npm install -g @bazel/bazelisk && \
pip install wheel && \
pip install --upgrade "pip < 21.0"

RUN mkdir -p /tmp/embag /tmp/pip_build /tmp/out
COPY WORKSPACE /tmp/embag
Expand Down
274 changes: 144 additions & 130 deletions lib/embag.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,138 +147,152 @@ RosBagTypes::header_t Bag::readHeader(const RosBagTypes::record_t &record) {

template <typename T>
bool Bag::readRecords(boost::iostreams::stream<T> &stream) {
while (true) {
const auto pos = stream.tellg();
if (pos == -1 || pos == std::streampos(bag_bytes_size_)) {
break;
// Docs on the RosBag 2.0 format: http://wiki.ros.org/Bags/Format/2.0

/**
* Read the BAG_HEADER record. So long as the file is not corrupted, this is guaranteed
* to be the first record in the bag
*/
const auto pos = stream.tellg();
uint32_t connection_count;
uint32_t chunk_count;
uint64_t index_pos;

const auto bag_header_record = readRecord(stream);
const auto bag_header_header = readHeader(bag_header_record);

bag_header_header.getField("conn_count", connection_count);
bag_header_header.getField("chunk_count", chunk_count);
bag_header_header.getField("index_pos", index_pos);

// TODO: check these values are nonzero and index_pos is > 64
connections_.resize(connection_count);
chunk_infos_.reserve(chunk_count);
chunks_.reserve(chunk_count);
index_pos_ = index_pos;

/**
* Read the CONNECTION records. As per the docs, these are located at the start of the "INDEX" section,
* which is denoted by the index_pos field of the bag header
*/
stream.seekg(index_pos_, std::ios_base::beg);
for (size_t i = 0; i < connection_count; i++) {
const auto conn_record = readRecord(stream);
const auto conn_header = readHeader(conn_record);

uint32_t connection_id;
std::string topic;

conn_header.getField("conn", connection_id);
conn_header.getField("topic", topic);

if (topic.empty())
continue;

// TODO: check these variables along with md5sum
RosBagTypes::connection_data_t connection_data;
connection_data.topic = topic;

const auto fields = readFields(conn_record.data, conn_record.data_len);

connection_data.type = fields->at("type");
const size_t slash_pos = connection_data.type.find_first_of('/');
if (slash_pos != std::string::npos) {
connection_data.scope = connection_data.type.substr(0, slash_pos);
}
const auto record = readRecord(stream);
const auto header = readHeader(record);

const auto op = header.getOp();

switch (op) {
case RosBagTypes::header_t::op::BAG_HEADER: {
uint32_t connection_count;
uint32_t chunk_count;
uint64_t index_pos;

header.getField("conn_count", connection_count);
header.getField("chunk_count", chunk_count);
header.getField("index_pos", index_pos);

// TODO: check these values are nonzero and index_pos is > 64
connections_.resize(connection_count);
chunks_.reserve(chunk_count);
index_pos_ = index_pos;

break;
}
case RosBagTypes::header_t::op::CHUNK: {
RosBagTypes::chunk_t chunk{record};
chunk.offset = pos;

header.getField("compression", chunk.compression);
header.getField("size", chunk.uncompressed_size);

if (!(chunk.compression == "lz4" || chunk.compression == "bz2" || chunk.compression == "none")) {
throw std::runtime_error("Unsupported compression type: " + chunk.compression);
}

chunks_.emplace_back(chunk);

break;
}
case RosBagTypes::header_t::op::INDEX_DATA: {
uint32_t version;
uint32_t connection_id;
uint32_t msg_count;

// TODO: check these values
header.getField("ver", version);
header.getField("conn", connection_id);
header.getField("count", msg_count);

RosBagTypes::index_block_t index_block{};
index_block.into_chunk = &chunks_.back();

connections_[connection_id].blocks.emplace_back(index_block);

break;
}
case RosBagTypes::header_t::op::CONNECTION: {
uint32_t connection_id;
std::string topic;

header.getField("conn", connection_id);
header.getField("topic", topic);

if (topic.empty())
continue;

// TODO: check these variables along with md5sum
RosBagTypes::connection_data_t connection_data;
connection_data.topic = topic;

const auto fields = readFields(record.data, record.data_len);

connection_data.type = fields->at("type");
const size_t slash_pos = connection_data.type.find_first_of('/');
if (slash_pos != std::string::npos) {
connection_data.scope = connection_data.type.substr(0, slash_pos);
}
connection_data.md5sum = fields->at("md5sum");
connection_data.message_definition = fields->at("message_definition");
if (fields->find("callerid") != fields->end()) {
connection_data.callerid = fields->at("callerid");
}
if (fields->find("latching") != fields->end()) {
connection_data.latching = fields->at("latching") == "1";
}

connections_[connection_id].id = connection_id;
connections_[connection_id].topic = topic;
connections_[connection_id].data = connection_data;
topic_connection_map_[topic].emplace_back(&connections_[connection_id]);

break;
}
case RosBagTypes::header_t::op::MESSAGE_DATA: {
// Message data is usually found in chunks
break;
}
case RosBagTypes::header_t::op::CHUNK_INFO: {
uint32_t ver;
uint64_t chunk_pos;
RosValue::ros_time_t start_time;
RosValue::ros_time_t end_time;
uint32_t count;

header.getField("ver", ver);
header.getField("chunk_pos", chunk_pos);
header.getField("start_time", start_time);
header.getField("end_time", end_time);
header.getField("count", count);

// TODO: It might make sense to save this data in a map or reverse the search.
// At the moment there are only a few chunks so this doesn't really take long
auto chunk_it = std::find_if(chunks_.begin(), chunks_.end(), [&chunk_pos](const RosBagTypes::chunk_t &c) {
return c.offset == chunk_pos;
});

if (chunk_it == chunks_.end()) {
throw std::runtime_error("Unable to find chunk for chunk info at pos: " + std::to_string(chunk_pos));
}

chunk_it->info.start_time = start_time;
chunk_it->info.end_time = end_time;
chunk_it->info.message_count = count;

break;
}
case RosBagTypes::header_t::op::UNSET:
default:throw std::runtime_error("Unknown record operation: " + std::to_string(uint8_t(op)));
connection_data.md5sum = fields->at("md5sum");
connection_data.message_definition = fields->at("message_definition");
if (fields->find("callerid") != fields->end()) {
connection_data.callerid = fields->at("callerid");
}
if (fields->find("latching") != fields->end()) {
connection_data.latching = fields->at("latching") == "1";
}

connections_[connection_id].id = connection_id;
connections_[connection_id].topic = topic;
connections_[connection_id].data = connection_data;
topic_connection_map_[topic].push_back(&connections_[connection_id]);
}

/**
* Read the CHUNK_INFO records. These are guaranteed to be immediately after the CONNECTION records,
* so no need to seek the file pointer
*/
for (size_t i = 0; i < chunk_count; i++) {
const auto chunk_info_record = readRecord(stream);
const auto chunk_info_header = readHeader(chunk_info_record);

RosBagTypes::chunk_info_t chunk_info;

uint32_t ver;
uint64_t chunk_pos;
RosValue::ros_time_t start_time;
RosValue::ros_time_t end_time;
uint32_t count;

chunk_info_header.getField("ver", ver);
chunk_info_header.getField("chunk_pos", chunk_pos);
chunk_info_header.getField("start_time", start_time);
chunk_info_header.getField("end_time", end_time);
chunk_info_header.getField("count", count);

chunk_info.chunk_pos = chunk_pos;
chunk_info.start_time = start_time;
chunk_info.end_time = end_time;
chunk_info.message_count = count;

chunk_infos_[i] = chunk_info;
}

/**
* Now that we have some chunk metadata from the CHUNK_INFO records, process the CHUNK records from
* earlier in the file. Each CHUNK_INFO knows the position of its corresponding chunk.
*/
for (size_t i = 0; i < chunk_count; i++) {
const auto info = chunk_infos_[i];

// TODO: The chunk infos are not necessarily Revisit this logic if seeking back and forth across the file causes a slowdown
stream.seekg(info.chunk_pos, std::ios_base::beg);

const auto chunk_record = readRecord(stream);
const auto chunk_header = readHeader(chunk_record);

RosBagTypes::chunk_t chunk{chunk_record};
chunk.offset = stream.tellg();

chunk_header.getField("compression", chunk.compression);
chunk_header.getField("size", chunk.uncompressed_size);

if (!(chunk.compression == "lz4" || chunk.compression == "bz2" || chunk.compression == "none")) {
throw std::runtime_error("Unsupported compression type: " + chunk.compression);
}

chunk.info = info;

chunks_.push_back(chunk);

// Each chunk is followed by an INDEX_DATA record, so parse that out here
const auto index_data_record = readRecord(stream);
const auto index_data_header = readHeader(index_data_record);

uint32_t version;
uint32_t connection_id;
uint32_t msg_count;

// TODO: check these values
index_data_header.getField("ver", version);
index_data_header.getField("conn", connection_id);
index_data_header.getField("count", msg_count);

RosBagTypes::index_block_t index_block{};

// NOTE: It seems like it would be simpler to just do &chunk here right? WRONG.
// C++ resuses the same memory location for the chunk variable for each loop, so
// if you use &chunk, all `into_chunk` values will be exactly the same
index_block.into_chunk = &chunks_[i];

connections_[connection_id].blocks.push_back(index_block);
}

return true;
Expand Down
1 change: 1 addition & 0 deletions lib/embag.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ class Bag {
// Bag data
std::vector<RosBagTypes::connection_record_t> connections_;
std::unordered_map<std::string, std::vector<RosBagTypes::connection_record_t *>> topic_connection_map_;
std::vector<RosBagTypes::chunk_info_t> chunk_infos_;
std::vector<RosBagTypes::chunk_t> chunks_;
uint64_t index_pos_ = 0;
std::unordered_map<std::string, std::shared_ptr<RosMsgTypes::ros_msg_def>> message_schemata_;
Expand Down
1 change: 1 addition & 0 deletions lib/ros_bag_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ struct RosBagTypes {
};

struct chunk_info_t {
uint64_t chunk_pos;
RosValue::ros_time_t start_time;
RosValue::ros_time_t end_time;
uint32_t message_count = 0;
Expand Down
2 changes: 1 addition & 1 deletion lib/version.bzl
Original file line number Diff line number Diff line change
@@ -1 +1 @@
EMBAG_VERSION = "0.0.34"
EMBAG_VERSION = "0.0.35"
2 changes: 1 addition & 1 deletion test/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ cc_test(
name = "embag_test",
srcs = ["embag_test.cc"],
copts = ["-Iexternal/gtest/include"],
data = ["test.bag"],
data = ["test.bag", "test_2.bag"],
deps = [
"//lib:embag",
"@gtest",
Expand Down
7 changes: 7 additions & 0 deletions test/embag_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ TEST(EmbagTest, OpenCloseBag) {
bag.close();
}

TEST(EmbagTest, OpenCloseOddPaddingBag) {
// In some very rare cases rosbags will contain padding between the CHUNK and INDEX section of the file.
// This bagfile is one such case.
Embag::Bag oddly_padded_bag{"test/test_2.bag"};
oddly_padded_bag.close();
}

class BagTest : public ::testing::Test {
protected:
Embag::Bag bag_{"test/test.bag"};
Expand Down
Binary file added test/test_2.bag
Binary file not shown.

0 comments on commit 77bd63c

Please sign in to comment.