Skip to content

Commit

Permalink
feat(clp::streaming_compression): Update compressors/decompressors to…
Browse files Browse the repository at this point in the history
… accept generic `WriterInterface`/`ReaderInterface` for improved flexibility over `FileWriter`/`FileReader`. (#696)

Co-authored-by: Lin Zhihao <[email protected]>
  • Loading branch information
gibber9809 and LinZhihao-723 authored Jan 28, 2025
1 parent 029b50b commit 9eca636
Show file tree
Hide file tree
Showing 13 changed files with 118 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "../../Defs.h"
#include "../../ErrorCode.hpp"
#include "../../FileWriter.hpp"
#include "../../streaming_compression/passthrough/Compressor.hpp"
#include "../../streaming_compression/zstd/Compressor.hpp"
#include "../../TraceableException.hpp"
Expand Down
5 changes: 2 additions & 3 deletions components/core/src/clp/streaming_compression/Compressor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include <cstddef>

#include "../ErrorCode.hpp"
#include "../FileWriter.hpp"
#include "../TraceableException.hpp"
#include "../WriterInterface.hpp"

Expand Down Expand Up @@ -70,9 +69,9 @@ class Compressor : public WriterInterface {

/**
* Initializes the compression stream
* @param file_writer
* @param writer
*/
virtual auto open(FileWriter& file_writer) -> void = 0;
virtual auto open(WriterInterface& writer) -> void = 0;
};
} // namespace clp::streaming_compression

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

#include <string>

#include "../FileReader.hpp"
#include "../ReaderInterface.hpp"
#include "../TraceableException.hpp"
#include "Constants.hpp"
Expand Down Expand Up @@ -42,11 +41,11 @@ class Decompressor : public ReaderInterface {
*/
virtual void open(char const* compressed_data_buffer, size_t compressed_data_buffer_size) = 0;
/**
* Initializes the decompressor to decompress from an open file
* @param file_reader
* @param file_read_buffer_capacity The maximum amount of data to read from a file at a time
* Initializes the decompressor to decompress from a reader interface
* @param reader
* @param read_buffer_capacity The maximum amount of data to read from a reader at a time
*/
virtual void open(FileReader& file_reader, size_t file_read_buffer_capacity) = 0;
virtual void open(ReaderInterface& reader, size_t read_buffer_capacity) = 0;
/**
* Closes decompression stream
*/
Expand Down
20 changes: 10 additions & 10 deletions components/core/src/clp/streaming_compression/lzma/Compressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
#include <spdlog/spdlog.h>

#include "../../ErrorCode.hpp"
#include "../../FileWriter.hpp"
#include "../../TraceableException.hpp"
#include "../../type_utils.hpp"
#include "../../WriterInterface.hpp"

namespace clp::streaming_compression::lzma {
auto Compressor::open(FileWriter& file_writer) -> void {
if (nullptr != m_compressed_stream_file_writer) {
auto Compressor::open(WriterInterface& writer) -> void {
if (nullptr != m_compressed_stream_writer) {
throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__);
}

Expand All @@ -28,12 +28,12 @@ auto Compressor::open(FileWriter& file_writer) -> void {
{
throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__);
}
m_compressed_stream_file_writer = &file_writer;
m_compressed_stream_writer = &writer;
m_uncompressed_stream_pos = 0;
}

auto Compressor::close() -> void {
if (nullptr == m_compressed_stream_file_writer) {
if (nullptr == m_compressed_stream_writer) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

Expand All @@ -43,11 +43,11 @@ auto Compressor::close() -> void {

flush_lzma(LZMA_FINISH);
m_lzma_stream.end_and_detach_output();
m_compressed_stream_file_writer = nullptr;
m_compressed_stream_writer = nullptr;
}

auto Compressor::write(char const* data, size_t data_length) -> void {
if (nullptr == m_compressed_stream_file_writer) {
if (nullptr == m_compressed_stream_writer) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}
if (false
Expand All @@ -62,14 +62,14 @@ auto Compressor::write(char const* data, size_t data_length) -> void {
}

auto Compressor::flush() -> void {
if (nullptr == m_compressed_stream_file_writer) {
if (nullptr == m_compressed_stream_writer) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}
flush_lzma(LZMA_SYNC_FLUSH);
}

auto Compressor::try_get_pos(size_t& pos) const -> ErrorCode {
if (nullptr == m_compressed_stream_file_writer) {
if (nullptr == m_compressed_stream_writer) {
return ErrorCode_NotInit;
}
pos = m_uncompressed_stream_pos;
Expand Down Expand Up @@ -143,7 +143,7 @@ auto Compressor::flush_stream_output_block_buffer() -> void {
if (cCompressedStreamBlockBufferSize == m_lzma_stream.avail_out()) {
return;
}
m_compressed_stream_file_writer->write(
m_compressed_stream_writer->write(
clp::size_checked_pointer_cast<char>(m_compressed_stream_block_buffer.data()),
cCompressedStreamBlockBufferSize - m_lzma_stream.avail_out()
);
Expand Down
19 changes: 11 additions & 8 deletions components/core/src/clp/streaming_compression/lzma/Compressor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@

#include "../../Array.hpp"
#include "../../ErrorCode.hpp"
#include "../../FileWriter.hpp"
#include "../../TraceableException.hpp"
#include "../../WriterInterface.hpp"
#include "../Compressor.hpp"
#include "Constants.hpp"

namespace clp::streaming_compression::lzma {
/**
* Implements a LZMA compressor that compresses byte input data to a file.
* Implements a LZMA compressor that compresses byte input data to a `clp::WriterInterface`
* instance.
*/
class Compressor : public ::clp::streaming_compression::Compressor {
public:
Expand Down Expand Up @@ -58,7 +59,8 @@ class Compressor : public ::clp::streaming_compression::Compressor {
auto write(char const* data, size_t data_length) -> void override;

/**
* Writes any internally buffered data to file and ends the current frame
* Writes any internally buffered data to the underlying `clp::WriterInterface` instance and
* ends the current frame.
*
* Forces all the encoded data buffered by LZMA to be available at output
*/
Expand All @@ -79,11 +81,11 @@ class Compressor : public ::clp::streaming_compression::Compressor {
auto close() -> void override;

/**
* Open the compression stream for encoding to the file_writer.
* Opens the compression stream for encoding to the writer.
*
* @param file_writer
* @param writer
*/
auto open(FileWriter& file_writer) -> void override;
auto open(WriterInterface& writer) -> void override;

private:
/**
Expand Down Expand Up @@ -211,14 +213,15 @@ class Compressor : public ::clp::streaming_compression::Compressor {
auto flush_lzma(lzma_action flush_action) -> void;

/**
* Flushes the current compressed data in the output block buffer to the output file handler.
* Flushes the current compressed data in the output block buffer to the underlying
* `clp::WriterInterface` instance.
*
* Also resets the output block buffer to receive new data.
*/
auto flush_stream_output_block_buffer() -> void;

// Variables
FileWriter* m_compressed_stream_file_writer{nullptr};
WriterInterface* m_compressed_stream_writer{nullptr};

// Compressed stream variables
Array<uint8_t> m_compressed_stream_block_buffer{cCompressedStreamBlockBufferSize};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@

#include "../../ErrorCode.hpp"
#include "../../TraceableException.hpp"
#include "../../WriterInterface.hpp"

namespace clp::streaming_compression::passthrough {
auto Compressor::write(char const* data, size_t const data_length) -> void {
if (nullptr == m_compressed_stream_file_writer) {
if (nullptr == m_compressed_stream_writer) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

Expand All @@ -19,30 +20,30 @@ auto Compressor::write(char const* data, size_t const data_length) -> void {
throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__);
}

m_compressed_stream_file_writer->write(data, data_length);
m_compressed_stream_writer->write(data, data_length);
}

auto Compressor::flush() -> void {
if (nullptr == m_compressed_stream_file_writer) {
if (nullptr == m_compressed_stream_writer) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

m_compressed_stream_file_writer->flush();
m_compressed_stream_writer->flush();
}

auto Compressor::try_get_pos(size_t& pos) const -> ErrorCode {
if (nullptr == m_compressed_stream_file_writer) {
if (nullptr == m_compressed_stream_writer) {
return ErrorCode_NotInit;
}

return m_compressed_stream_file_writer->try_get_pos(pos);
return m_compressed_stream_writer->try_get_pos(pos);
}

auto Compressor::close() -> void {
m_compressed_stream_file_writer = nullptr;
m_compressed_stream_writer = nullptr;
}

auto Compressor::open(FileWriter& file_writer) -> void {
m_compressed_stream_file_writer = &file_writer;
auto Compressor::open(WriterInterface& writer) -> void {
m_compressed_stream_writer = &writer;
}
} // namespace clp::streaming_compression::passthrough
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
#include <cstddef>

#include "../../ErrorCode.hpp"
#include "../../FileWriter.hpp"
#include "../../TraceableException.hpp"
#include "../../WriterInterface.hpp"
#include "../Compressor.hpp"

namespace clp::streaming_compression::passthrough {
Expand Down Expand Up @@ -70,13 +70,13 @@ class Compressor : public ::clp::streaming_compression::Compressor {

/**
* Initializes the compression stream
* @param file_writer
* @param writer
*/
auto open(FileWriter& file_writer) -> void override;
auto open(WriterInterface& writer) -> void override;

private:
// Variables
FileWriter* m_compressed_stream_file_writer{nullptr};
WriterInterface* m_compressed_stream_writer{nullptr};
};
} // namespace clp::streaming_compression::passthrough

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ ErrorCode Decompressor::try_read(char* buf, size_t num_bytes_to_read, size_t& nu
);
memcpy(buf, &m_compressed_data_buf[m_decompressed_stream_pos], num_bytes_read);
break;
case InputType::File: {
auto error_code = m_file_reader->try_read(buf, num_bytes_to_read, num_bytes_read);
case InputType::ReaderInterface: {
auto error_code = m_reader->try_read(buf, num_bytes_to_read, num_bytes_read);
if (ErrorCode_Success != error_code) {
return error_code;
}
Expand All @@ -49,8 +49,8 @@ ErrorCode Decompressor::try_seek_from_begin(size_t pos) {
return ErrorCode_Truncated;
}
break;
case InputType::File: {
auto error_code = m_file_reader->try_seek_from_begin(pos);
case InputType::ReaderInterface: {
auto error_code = m_reader->try_seek_from_begin(pos);
if (ErrorCode_Success != error_code) {
return error_code;
}
Expand Down Expand Up @@ -85,14 +85,14 @@ void Decompressor::open(char const* compressed_data_buf, size_t compressed_data_
m_input_type = InputType::CompressedDataBuf;
}

void Decompressor::open(FileReader& file_reader, size_t file_read_buffer_capacity) {
void Decompressor::open(ReaderInterface& reader, size_t read_buffer_capacity) {
if (InputType::NotInitialized != m_input_type) {
throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__);
}

m_file_reader = &file_reader;
m_reader = &reader;
m_decompressed_stream_pos = 0;
m_input_type = InputType::File;
m_input_type = InputType::ReaderInterface;
}

void Decompressor::close() {
Expand All @@ -101,8 +101,8 @@ void Decompressor::close() {
m_compressed_data_buf = nullptr;
m_compressed_data_buf_len = 0;
break;
case InputType::File:
m_file_reader = nullptr;
case InputType::ReaderInterface:
m_reader = nullptr;
break;
case InputType::NotInitialized:
// Do nothing
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#ifndef CLP_STREAMING_COMPRESSION_PASSTHROUGH_DECOMPRESSOR_HPP
#define CLP_STREAMING_COMPRESSION_PASSTHROUGH_DECOMPRESSOR_HPP

#include "../../FileReader.hpp"
#include "../../ReaderInterface.hpp"
#include "../../TraceableException.hpp"
#include "../Decompressor.hpp"

Expand Down Expand Up @@ -69,7 +69,7 @@ class Decompressor : public ::clp::streaming_compression::Decompressor {

// Methods implementing the Decompressor interface
void open(char const* compressed_data_buf, size_t compressed_data_buf_size) override;
void open(FileReader& file_reader, size_t file_read_buffer_capacity) override;
void open(ReaderInterface& reader, size_t read_buffer_capacity) override;
void close() override;
/**
* Decompresses and copies the range of uncompressed data described by
Expand All @@ -90,13 +90,13 @@ class Decompressor : public ::clp::streaming_compression::Decompressor {
enum class InputType {
NotInitialized,
CompressedDataBuf,
File
ReaderInterface
};

// Variables
InputType m_input_type;

FileReader* m_file_reader;
ReaderInterface* m_reader;
char const* m_compressed_data_buf;
size_t m_compressed_data_buf_len;

Expand Down
Loading

0 comments on commit 9eca636

Please sign in to comment.