Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RosBag odd padding parsing error fix #23

Merged
merged 5 commits into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
FROM quay.io/pypa/manylinux2014_x86_64

RUN yum install npm git python-devel python2-pip -y -q && npm install -g @bazel/bazelisk && pip install wheel && pip install --upgrade "pip < 21.0"
RUN yum install npm git python-devel python2-pip gdb -y -q && \
npm install -g @bazel/bazelisk && \
pip install wheel && \
pip install --upgrade "pip < 21.0"

RUN mkdir -p /tmp/embag /tmp/pip_build /tmp/out
COPY WORKSPACE /tmp/embag
Expand Down
274 changes: 144 additions & 130 deletions lib/embag.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,138 +147,152 @@ RosBagTypes::header_t Bag::readHeader(const RosBagTypes::record_t &record) {

template <typename T>
bool Bag::readRecords(boost::iostreams::stream<T> &stream) {
while (true) {
const auto pos = stream.tellg();
if (pos == -1 || pos == std::streampos(bag_bytes_size_)) {
break;
// Docs on the RosBag 2.0 format: http://wiki.ros.org/Bags/Format/2.0

/**
* Read the BAG_HEADER record. So long as the file is not corrupted, this is guaranteed
* to be the first record in the bag
*/
const auto pos = stream.tellg();
uint32_t connection_count;
uint32_t chunk_count;
uint64_t index_pos;

const auto bag_header_record = readRecord(stream);
const auto bag_header_header = readHeader(bag_header_record);

bag_header_header.getField("conn_count", connection_count);
bag_header_header.getField("chunk_count", chunk_count);
bag_header_header.getField("index_pos", index_pos);

// TODO: check these values are nonzero and index_pos is > 64
connections_.resize(connection_count);
chunk_infos_.reserve(chunk_count);
chunks_.reserve(chunk_count);
index_pos_ = index_pos;

/**
* Read the CONNECTION records. As per the docs, these are located at the start of the "INDEX" section,
* which is denoted by the index_pos field of the bag header
*/
stream.seekg(index_pos_, std::ios_base::beg);
for (size_t i = 0; i < connection_count; i++) {
const auto conn_record = readRecord(stream);
const auto conn_header = readHeader(conn_record);

uint32_t connection_id;
std::string topic;

conn_header.getField("conn", connection_id);
conn_header.getField("topic", topic);

if (topic.empty())
continue;

// TODO: check these variables along with md5sum
RosBagTypes::connection_data_t connection_data;
connection_data.topic = topic;

const auto fields = readFields(conn_record.data, conn_record.data_len);

connection_data.type = fields->at("type");
const size_t slash_pos = connection_data.type.find_first_of('/');
if (slash_pos != std::string::npos) {
connection_data.scope = connection_data.type.substr(0, slash_pos);
}
const auto record = readRecord(stream);
const auto header = readHeader(record);

const auto op = header.getOp();

switch (op) {
case RosBagTypes::header_t::op::BAG_HEADER: {
uint32_t connection_count;
uint32_t chunk_count;
uint64_t index_pos;

header.getField("conn_count", connection_count);
header.getField("chunk_count", chunk_count);
header.getField("index_pos", index_pos);

// TODO: check these values are nonzero and index_pos is > 64
connections_.resize(connection_count);
chunks_.reserve(chunk_count);
index_pos_ = index_pos;

break;
}
case RosBagTypes::header_t::op::CHUNK: {
RosBagTypes::chunk_t chunk{record};
chunk.offset = pos;

header.getField("compression", chunk.compression);
header.getField("size", chunk.uncompressed_size);

if (!(chunk.compression == "lz4" || chunk.compression == "bz2" || chunk.compression == "none")) {
throw std::runtime_error("Unsupported compression type: " + chunk.compression);
}

chunks_.emplace_back(chunk);

break;
}
case RosBagTypes::header_t::op::INDEX_DATA: {
uint32_t version;
uint32_t connection_id;
uint32_t msg_count;

// TODO: check these values
header.getField("ver", version);
header.getField("conn", connection_id);
header.getField("count", msg_count);

RosBagTypes::index_block_t index_block{};
index_block.into_chunk = &chunks_.back();

connections_[connection_id].blocks.emplace_back(index_block);

break;
}
case RosBagTypes::header_t::op::CONNECTION: {
uint32_t connection_id;
std::string topic;

header.getField("conn", connection_id);
header.getField("topic", topic);

if (topic.empty())
continue;

// TODO: check these variables along with md5sum
RosBagTypes::connection_data_t connection_data;
connection_data.topic = topic;

const auto fields = readFields(record.data, record.data_len);

connection_data.type = fields->at("type");
const size_t slash_pos = connection_data.type.find_first_of('/');
if (slash_pos != std::string::npos) {
connection_data.scope = connection_data.type.substr(0, slash_pos);
}
connection_data.md5sum = fields->at("md5sum");
connection_data.message_definition = fields->at("message_definition");
if (fields->find("callerid") != fields->end()) {
connection_data.callerid = fields->at("callerid");
}
if (fields->find("latching") != fields->end()) {
connection_data.latching = fields->at("latching") == "1";
}

connections_[connection_id].id = connection_id;
connections_[connection_id].topic = topic;
connections_[connection_id].data = connection_data;
topic_connection_map_[topic].emplace_back(&connections_[connection_id]);

break;
}
case RosBagTypes::header_t::op::MESSAGE_DATA: {
// Message data is usually found in chunks
break;
}
case RosBagTypes::header_t::op::CHUNK_INFO: {
uint32_t ver;
uint64_t chunk_pos;
RosValue::ros_time_t start_time;
RosValue::ros_time_t end_time;
uint32_t count;

header.getField("ver", ver);
header.getField("chunk_pos", chunk_pos);
header.getField("start_time", start_time);
header.getField("end_time", end_time);
header.getField("count", count);

// TODO: It might make sense to save this data in a map or reverse the search.
// At the moment there are only a few chunks so this doesn't really take long
auto chunk_it = std::find_if(chunks_.begin(), chunks_.end(), [&chunk_pos](const RosBagTypes::chunk_t &c) {
return c.offset == chunk_pos;
});

if (chunk_it == chunks_.end()) {
throw std::runtime_error("Unable to find chunk for chunk info at pos: " + std::to_string(chunk_pos));
}

chunk_it->info.start_time = start_time;
chunk_it->info.end_time = end_time;
chunk_it->info.message_count = count;

break;
}
case RosBagTypes::header_t::op::UNSET:
default:throw std::runtime_error("Unknown record operation: " + std::to_string(uint8_t(op)));
connection_data.md5sum = fields->at("md5sum");
connection_data.message_definition = fields->at("message_definition");
if (fields->find("callerid") != fields->end()) {
connection_data.callerid = fields->at("callerid");
}
if (fields->find("latching") != fields->end()) {
connection_data.latching = fields->at("latching") == "1";
}

connections_[connection_id].id = connection_id;
connections_[connection_id].topic = topic;
connections_[connection_id].data = connection_data;
topic_connection_map_[topic].push_back(&connections_[connection_id]);
}

/**
* Read the CHUNK_INFO records. These are guaranteed to be immediately after the CONNECTION records,
* so no need to seek the file pointer
*/
for (size_t i = 0; i < chunk_count; i++) {
const auto chunk_info_record = readRecord(stream);
const auto chunk_info_header = readHeader(chunk_info_record);

RosBagTypes::chunk_info_t chunk_info;

uint32_t ver;
uint64_t chunk_pos;
RosValue::ros_time_t start_time;
RosValue::ros_time_t end_time;
uint32_t count;

chunk_info_header.getField("ver", ver);
chunk_info_header.getField("chunk_pos", chunk_pos);
chunk_info_header.getField("start_time", start_time);
chunk_info_header.getField("end_time", end_time);
chunk_info_header.getField("count", count);

chunk_info.chunk_pos = chunk_pos;
chunk_info.start_time = start_time;
chunk_info.end_time = end_time;
chunk_info.message_count = count;

chunk_infos_[i] = chunk_info;
}

/**
* Now that we have some chunk metadata from the CHUNK_INFO records, process the CHUNK records from
* earlier in the file. Each CHUNK_INFO knows the position of its corresponding chunk.
*/
for (size_t i = 0; i < chunk_count; i++) {
const auto info = chunk_infos_[i];

// TODO: The chunk infos are not necessarily Revisit this logic if seeking back and forth across the file causes a slowdown
stream.seekg(info.chunk_pos, std::ios_base::beg);

const auto chunk_record = readRecord(stream);
const auto chunk_header = readHeader(chunk_record);

RosBagTypes::chunk_t chunk{chunk_record};
chunk.offset = stream.tellg();

chunk_header.getField("compression", chunk.compression);
chunk_header.getField("size", chunk.uncompressed_size);

if (!(chunk.compression == "lz4" || chunk.compression == "bz2" || chunk.compression == "none")) {
throw std::runtime_error("Unsupported compression type: " + chunk.compression);
}

chunk.info = info;

chunks_.push_back(chunk);

// Each chunk is followed by an INDEX_DATA record, so parse that out here
const auto index_data_record = readRecord(stream);
const auto index_data_header = readHeader(index_data_record);

uint32_t version;
uint32_t connection_id;
uint32_t msg_count;

// TODO: check these values
index_data_header.getField("ver", version);
index_data_header.getField("conn", connection_id);
index_data_header.getField("count", msg_count);

RosBagTypes::index_block_t index_block{};

// NOTE: It seems like it would be simpler to just do &chunk here right? WRONG.
// C++ resuses the same memory location for the chunk variable for each loop, so
// if you use &chunk, all `into_chunk` values will be exactly the same
index_block.into_chunk = &chunks_[i];

connections_[connection_id].blocks.push_back(index_block);
}

return true;
Expand Down
1 change: 1 addition & 0 deletions lib/embag.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ class Bag {
// Bag data
std::vector<RosBagTypes::connection_record_t> connections_;
std::unordered_map<std::string, std::vector<RosBagTypes::connection_record_t *>> topic_connection_map_;
std::vector<RosBagTypes::chunk_info_t> chunk_infos_;
std::vector<RosBagTypes::chunk_t> chunks_;
uint64_t index_pos_ = 0;
std::unordered_map<std::string, std::shared_ptr<RosMsgTypes::ros_msg_def>> message_schemata_;
Expand Down
1 change: 1 addition & 0 deletions lib/ros_bag_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ struct RosBagTypes {
};

struct chunk_info_t {
uint64_t chunk_pos;
RosValue::ros_time_t start_time;
RosValue::ros_time_t end_time;
uint32_t message_count = 0;
Expand Down
2 changes: 1 addition & 1 deletion lib/version.bzl
Original file line number Diff line number Diff line change
@@ -1 +1 @@
EMBAG_VERSION = "0.0.34"
EMBAG_VERSION = "0.0.35"
2 changes: 1 addition & 1 deletion test/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ cc_test(
name = "embag_test",
srcs = ["embag_test.cc"],
copts = ["-Iexternal/gtest/include"],
data = ["test.bag"],
data = ["test.bag", "test_2.bag"],
deps = [
"//lib:embag",
"@gtest",
Expand Down
7 changes: 7 additions & 0 deletions test/embag_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ TEST(EmbagTest, OpenCloseBag) {
bag.close();
}

TEST(EmbagTest, OpenCloseOddPaddingBag) {
// In some very rare cases rosbags will contain padding between the CHUNK and INDEX section of the file.
// This bagfile is one such case.
Embag::Bag oddly_padded_bag{"test/test_2.bag"};
oddly_padded_bag.close();
}

class BagTest : public ::testing::Test {
protected:
Embag::Bag bag_{"test/test.bag"};
Expand Down
Binary file added test/test_2.bag
Binary file not shown.