Skip to content

Commit

Permalink
Implement streaming compression
Browse files Browse the repository at this point in the history
Distro A, OPSEC #4584

Signed-off-by: P. J. Reed <[email protected]>
  • Loading branch information
pjreed committed Dec 8, 2020
1 parent 61280b1 commit eaefd98
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 100 deletions.
52 changes: 0 additions & 52 deletions rosbag2_compression/src/rosbag2_compression/compression_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,58 +39,6 @@ FILE * open_file(const std::string & uri, const std::string & read_mode)

namespace rosbag2_compression
{
std::vector<uint8_t> get_input_buffer(const std::string & uri)
{
// Read in buffer, handling accordingly
const auto file_pointer = open_file(uri, "rb");
if (file_pointer == nullptr) {
std::stringstream errmsg;
errmsg << "Failed to open file: \"" << uri <<
"\" for binary reading! errno(" << errno << ")";

throw std::runtime_error{errmsg.str()};
}

const auto file_path = rcpputils::fs::path{uri};
const auto input_buffer_length = file_path.exists() ? file_path.file_size() : 0u;
if (input_buffer_length == 0) {
fclose(file_pointer);

std::stringstream errmsg;
errmsg << "Unable to get size of file: \"" << uri << "\"";

throw std::runtime_error{errmsg.str()};
}

// Initialize compress_buffer with size = compressed_buffer_length.
// Uniform initialization cannot be used here since it will choose
// the initializer list constructor instead.
std::vector<uint8_t> input_buffer(input_buffer_length);

const auto read_count = fread(
input_buffer.data(), sizeof(decltype(input_buffer)::value_type),
input_buffer_length, file_pointer);

if (read_count != input_buffer_length) {
ROSBAG2_COMPRESSION_LOG_ERROR_STREAM(
"Bytes read !(" <<
read_count << ") != buffer size (" << input_buffer.size() <<
")!");
// An error indicator is set by fread, so the following check will throw.
}

if (ferror(file_pointer)) {
fclose(file_pointer);

std::stringstream errmsg;
errmsg << "Unable to read binary data from file: \"" << uri << "\"!";

throw std::runtime_error{errmsg.str()};
}

fclose(file_pointer);
return input_buffer;
}

void write_output_buffer(
const std::vector<uint8_t> & output_buffer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,6 @@ using ZstdDecompressReturnType = decltype(ZSTD_decompress(
// Used as a parameter type in a function that accepts the output of ZSTD_getFrameContentSize.
using ZstdGetFrameContentSizeReturnType = decltype(ZSTD_getFrameContentSize(nullptr, 0));

/**
* Read a file from the supplied uri into a vector.
*
* \param uri is the path to the file.
* \return the contents of the buffer as a vector.
*/
std::vector<uint8_t> get_input_buffer(const std::string & uri);

/**
* Writes the output buffer to the specified file path.
* \param output_buffer is the data to write.
Expand Down
64 changes: 46 additions & 18 deletions rosbag2_compression/src/rosbag2_compression/zstd_compressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,27 +49,55 @@ std::string ZstdCompressor::compress_uri(const std::string & uri)
{
const auto start = std::chrono::high_resolution_clock::now();
const auto compressed_uri = uri + "." + get_compression_identifier();
const auto decompressed_buffer = get_input_buffer(uri);

// Allocate based on compression bound and compress
const auto compressed_buffer_length = ZSTD_compressBound(decompressed_buffer.size());
std::vector<uint8_t> compressed_buffer(compressed_buffer_length);

// Perform compression and check.
// compression_result is either the actual compressed size or an error code.
const auto compression_result = ZSTD_compressCCtx(
zstd_context_,
compressed_buffer.data(), compressed_buffer.size(),
decompressed_buffer.data(), decompressed_buffer.size(), kDefaultZstdCompressionLevel);
throw_on_zstd_error(compression_result);

// Compression_buffer_length might be larger than the actual compression size
// Resize compressed_buffer so its size is the actual compression size.
compressed_buffer.resize(compression_result);
write_output_buffer(compressed_buffer, compressed_uri);
std::ifstream input(uri, std::ios::in | std::ios::binary);
if (!input.is_open()) {
std::stringstream errmsg;
errmsg << "Failed to open file: \"" << uri <<
"\" for binary reading! errno(" << errno << ")";

throw std::runtime_error{errmsg.str()};
}
std::ofstream output(compressed_uri, std::ios::out | std::ios::binary);
if (!output.is_open()) {
std::stringstream errmsg;
errmsg << "Failed to open file: \"" << uri <<
"\" for binary writing! errno(" << errno << ")";

throw std::runtime_error{errmsg.str()};
}
// Based on the example from https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c
const size_t buff_in_size = ZSTD_CStreamInSize();
const size_t buff_out_size = ZSTD_CStreamOutSize();
std::vector<char> in_buffer(buff_in_size);
std::vector<char> out_buffer(buff_out_size);
size_t total_size = 0;
size_t final_result = 0;
do {
input.read(in_buffer.data(), buff_in_size);
const auto size = size_t(input.gcount());
if (size > 0) {
const ZSTD_EndDirective mode = input.eof() ? ZSTD_e_end : ZSTD_e_continue;
ZSTD_inBuffer z_in_buffer = {in_buffer.data(), static_cast<size_t>(size), 0};
int finished;
do {
ZSTD_outBuffer z_out_buffer = {out_buffer.data(), out_buffer.size(), 0};
const auto remaining =
ZSTD_compressStream2(zstd_context_, &z_out_buffer, &z_in_buffer, mode);
throw_on_zstd_error(remaining);
output.write(out_buffer.data(), z_out_buffer.pos);
total_size += z_out_buffer.pos;
finished = input.eof() ? (remaining == 0) : (z_in_buffer.pos == z_in_buffer.size);
final_result = remaining;
} while (!finished);
}
} while (!input.eof());
output.flush();
output.close();
input.close();

const auto end = std::chrono::high_resolution_clock::now();
print_compression_statistics(start, end, decompressed_buffer.size(), compression_result);
print_compression_statistics(start, end, total_size, final_result);
return compressed_uri;
}

Expand Down
64 changes: 42 additions & 22 deletions rosbag2_compression/src/rosbag2_compression/zstd_decompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,31 +47,51 @@ std::string ZstdDecompressor::decompress_uri(const std::string & uri)
const auto start = std::chrono::high_resolution_clock::now();
const auto uri_path = rcpputils::fs::path{uri};
const auto decompressed_uri = rcpputils::fs::remove_extension(uri_path).string();
const auto compressed_buffer = get_input_buffer(uri);
const auto compressed_buffer_length = compressed_buffer.size();

const auto decompressed_buffer_length =
ZSTD_getFrameContentSize(compressed_buffer.data(), compressed_buffer_length);

throw_on_invalid_frame_content(decompressed_buffer_length);

// Initializes decompressed_buffer with size = decompressed_buffer_length.
// Uniform initialization cannot be used here since it will choose
// the initializer list constructor instead.
std::vector<uint8_t> decompressed_buffer(decompressed_buffer_length);

const auto decompression_result = ZSTD_decompressDCtx(
zstd_context_,
decompressed_buffer.data(), decompressed_buffer_length,
compressed_buffer.data(), compressed_buffer_length);

throw_on_zstd_error(decompression_result);
decompressed_buffer.resize(decompression_result);

write_output_buffer(decompressed_buffer, decompressed_uri);
std::ifstream input(uri, std::ios::in | std::ios::binary);
if (!input.is_open()) {
std::stringstream errmsg;
errmsg << "Failed to open file: \"" << uri <<
"\" for binary reading! errno(" << errno << ")";

throw std::runtime_error{errmsg.str()};
}
std::ofstream output(decompressed_uri, std::ios::out | std::ios::binary);
if (!output.is_open()) {
std::stringstream errmsg;
errmsg << "Failed to open file: \"" << uri <<
"\" for binary writing! errno(" << errno << ")";

throw std::runtime_error{errmsg.str()};
}
// Base on the example from https://github.com/facebook/zstd/blob/dev/examples/streaming_decompression.c
const size_t buff_in_size = ZSTD_DStreamInSize();
const size_t buff_out_size = ZSTD_DStreamOutSize();
size_t total_size = 0;
std::vector<char> in_buffer(buff_in_size);
std::vector<char> out_buffer(buff_out_size);
size_t final_result = 0;
do {
input.read(in_buffer.data(), buff_in_size);
const auto size = size_t(input.gcount());
if (size > 0) {
ZSTD_inBuffer z_in_buffer = {in_buffer.data(), static_cast<size_t>(size), 0};
while (z_in_buffer.pos < z_in_buffer.size) {
ZSTD_outBuffer z_out_buffer = {out_buffer.data(), out_buffer.size(), 0};
const auto ret = ZSTD_decompressStream(zstd_context_, &z_out_buffer, &z_in_buffer);
throw_on_zstd_error(ret);
output.write(out_buffer.data(), z_out_buffer.pos);
total_size += z_out_buffer.pos;
final_result = ret;
}
}
} while (!input.eof());
output.flush();
output.close();
input.close();

const auto end = std::chrono::high_resolution_clock::now();
print_compression_statistics(start, end, decompression_result, compressed_buffer_length);
print_compression_statistics(start, end, final_result, total_size);

return decompressed_uri;
}
Expand Down

0 comments on commit eaefd98

Please sign in to comment.