diff --git a/data_tamer_cpp/.clang-format b/.clang-format similarity index 69% rename from data_tamer_cpp/.clang-format rename to .clang-format index 40d1a25..80f4f71 100644 --- a/data_tamer_cpp/.clang-format +++ b/.clang-format @@ -1,5 +1,5 @@ --- -BasedOnStyle: Google +BasedOnStyle: Google AccessModifierOffset: -2 ConstructorInitializerIndentWidth: 2 AlignEscapedNewlinesLeft: false @@ -7,15 +7,14 @@ AlignTrailingComments: true AllowAllParametersOfDeclarationOnNextLine: false AllowShortIfStatementsOnASingleLine: false AllowShortLoopsOnASingleLine: false -AllowShortFunctionsOnASingleLine: Inline +AllowShortFunctionsOnASingleLine: None AlwaysBreakTemplateDeclarations: true AlwaysBreakBeforeMultilineStrings: false BreakBeforeBinaryOperators: false BreakBeforeTernaryOperators: false -BreakConstructorInitializersBeforeComma: false -BreakConstructorInitializers: AfterColon +BreakConstructorInitializers: BeforeComma BinPackParameters: true -ColumnLimit: 90 +ColumnLimit: 90 ConstructorInitializerAllOnOneLineOrOnePerLine: true DerivePointerBinding: false PointerBindsToType: true @@ -30,19 +29,20 @@ PenaltyBreakString: 1 PenaltyBreakFirstLessLess: 1000 PenaltyExcessCharacter: 1000 PenaltyReturnTypeOnItsOwnLine: 90 -SpacesBeforeTrailingComments: 3 -Cpp11BracedListStyle: true -Standard: Auto -IndentWidth: 2 -TabWidth: 2 -UseTab: Never +SpacesBeforeTrailingComments: 2 +Cpp11BracedListStyle: false +Standard: Auto +IndentWidth: 2 +TabWidth: 2 +UseTab: Never IndentFunctionDeclarationAfterType: false SpacesInParentheses: false -SpacesInAngles: false +SpacesInAngles: false SpaceInEmptyParentheses: false SpacesInCStyleCastParentheses: false SpaceAfterControlStatementKeyword: true SpaceBeforeAssignmentOperators: true +SpaceBeforeParens: Never ContinuationIndentWidth: 4 SortIncludes: false SpaceAfterCStyleCast: false @@ -53,17 +53,16 @@ BreakBeforeBraces: Custom # Control of individual brace wrapping cases BraceWrapping: { - AfterClass: 'true' - AfterControlStatement: 'true' - AfterEnum : 'true' - AfterFunction : 'true' - AfterNamespace : 'true' - AfterStruct : 'true' - AfterUnion : 'true' - BeforeCatch : 'true' - BeforeElse : 'true' - IndentBraces : 'false' + AfterClass: 'true', + AfterControlStatement: 'true', + AfterEnum : 'true', + AfterFunction : 'true', + AfterNamespace : 'true', + AfterStruct : 'true', + AfterUnion : 'true', + BeforeCatch : 'true', + BeforeElse : 'true', + IndentBraces : 'false', SplitEmptyFunction: 'false' } ... - diff --git a/.github/workflows/cmake_ubuntu.yml b/.github/workflows/cmake_ubuntu.yml index c82856d..66d7237 100644 --- a/.github/workflows/cmake_ubuntu.yml +++ b/.github/workflows/cmake_ubuntu.yml @@ -20,13 +20,13 @@ jobs: steps: - uses: actions/checkout@v2 - + - name: Install Conan id: conan uses: turtlebrowser/get-conan@main with: version: 2.0.13 - + - name: Create default profile run: conan profile detect @@ -48,17 +48,16 @@ jobs: shell: bash working-directory: ${{github.workspace}}/build run: cmake --build . --config Debug - + - name: run test (Linux) working-directory: ${{github.workspace}}/build run: ctest -T test -T Coverage - + - name: Upload coverage reports to Codecov uses: codecov/codecov-action@v3 - with: + with: directory: ${{github.workspace}}/build gcov_ignore: ${{github.workspace}}/3rdparty verbose: true env: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} - diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..f1fb770 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,44 @@ + +# To use: +# +# pre-commit run -a +# +# Or: +# +# pre-commit install # (runs every time you commit in git) +# +# To update this file: +# +# pre-commit autoupdate +# +# See https://github.com/pre-commit/pre-commit + +exclude: ^data_tamer_cpp/3rdparty/ +repos: + + # Standard hooks + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.5.0 + hooks: + - id: check-added-large-files + - id: check-ast + - id: check-case-conflict + - id: check-docstring-first + - id: check-merge-conflict + - id: check-symlinks + - id: check-xml + - id: check-yaml + - id: debug-statements + - id: end-of-file-fixer + exclude_types: [svg] + - id: mixed-line-ending + - id: trailing-whitespace + exclude_types: [svg] + - id: fix-byte-order-marker + + # CPP hooks + - repo: https://github.com/pre-commit/mirrors-clang-format + rev: v18.1.2 + hooks: + - id: clang-format + args: ['-fallback-style=none', '-i'] diff --git a/README.md b/README.md index ccd6e09..c0e3936 100644 --- a/README.md +++ b/README.md @@ -3,17 +3,17 @@ [![cmake Ubuntu](https://github.com/facontidavide/data_tamer/actions/workflows/cmake_ubuntu.yml/badge.svg)](https://github.com/facontidavide/data_tamer/actions/workflows/cmake_ubuntu.yml) [![ros2](https://github.com/PickNikRobotics/data_tamer/actions/workflows/ros2.yml/badge.svg)](https://github.com/PickNikRobotics/data_tamer/actions/workflows/ros2.yml) [![codecov](https://codecov.io/gh/facontidavide/data_tamer/graph/badge.svg?token=D0wtsntWds)](https://codecov.io/gh/facontidavide/data_tamer) - + **DataTamer** is a library to log/trace numerical variables over time and takes periodic "snapshots" of their values, to later visualize them as **timeseries**. It works great with [PlotJuggler](https://github.com/facontidavide/PlotJuggler), the timeseries visualization tool (note: you will need PlotJuggler **3.8.2** or later). -**DataTamer** is "fearless data logger" because you can record hundreds or **thousands of variables**: +**DataTamer** is "fearless data logger" because you can record hundreds or **thousands of variables**: even 1 million points per second should have a fairly small CPU overhead. -Since all the values are aggregated in a single "snapshot", it is usually meant to +Since all the values are aggregated in a single "snapshot", it is usually meant to record data in a periodic loop (a very frequent use case, in robotics applications). Kudos to [pal_statistics](https://github.com/pal-robotics/pal_statistics), for inspiring this project. @@ -27,20 +27,20 @@ DataTamer can be used to monitor multiple variables in your applications. **Channels** are used to take "snapshots" of a subset of variables at a given time. If you want to record at different frequencies, you can use different channels. -DataTamer will forward the collected data to 1 or multiple **sinks**; +DataTamer will forward the collected data to 1 or multiple **sinks**; a sink may save the information immediately in a file (currently, we support [MCAP](https://mcap.dev/)) or publish it using an inter-process communication, for instance, a ROS2 publisher. You can easily create your own, specialized sinks. Use [PlotJuggler](https://github.com/facontidavide/PlotJuggler) to -visualize your logs offline or in real-time. +visualize your logs offline or in real-time. ## Features - **Serialization schema is created at run-time**: no need to do code generation. - **Suitable for real-time applications**: very low latency (on the side of the callee). -- **Multi-sink architecture**: recorded data can be forwarded to multiple "backends". +- **Multi-sink architecture**: recorded data can be forwarded to multiple "backends". - **Very low serialization overhead**, in the order of 1 bit per traced value. - The user can enable/disable traced variables at run-time. @@ -176,11 +176,7 @@ cmake --build build/Debug --parallel # How to deserialize data recorded with DataTamer I will write more extensively about the serialization format used by DataTamer, but for the time being I -created a single header file without external dependencies that you can just copy into your project: +created a single header file without external dependencies that you can just copy into your project: [data_tamer_parser.hpp](data_tamer/include/data_tamer_parser) You can see how it is used in this example: [mcap_reader](data_tamer/examples/mcap_reader.cpp) - - - - diff --git a/data_tamer_cpp/3rdparty/mcap/CMakeLists.txt b/data_tamer_cpp/3rdparty/mcap/CMakeLists.txt new file mode 100644 index 0000000..cb78478 --- /dev/null +++ b/data_tamer_cpp/3rdparty/mcap/CMakeLists.txt @@ -0,0 +1,13 @@ + +set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake") + +find_package(Zstd REQUIRED) +find_package(LZ4 REQUIRED) + +add_library(mcap_lib STATIC mcap.cpp) +target_link_libraries(mcap_lib PRIVATE ${LZ4_LIBRARY} ${ZSTD_LIBRARY}) +target_include_directories(mcap_lib + PUBLIC + $ +) + diff --git a/data_tamer_cpp/3rdparty/mcap/cmake/FindLZ4.cmake b/data_tamer_cpp/3rdparty/mcap/cmake/FindLZ4.cmake new file mode 100644 index 0000000..918568b --- /dev/null +++ b/data_tamer_cpp/3rdparty/mcap/cmake/FindLZ4.cmake @@ -0,0 +1,41 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Finds liblz4. +# +# This module defines: +# LZ4_FOUND +# LZ4_INCLUDE_DIR +# LZ4_LIBRARY +# + +find_path(LZ4_INCLUDE_DIR NAMES lz4.h) + +find_library(LZ4_LIBRARY_DEBUG NAMES lz4d) +find_library(LZ4_LIBRARY_RELEASE NAMES lz4) + +include(SelectLibraryConfigurations) +SELECT_LIBRARY_CONFIGURATIONS(LZ4) + +include(FindPackageHandleStandardArgs) +FIND_PACKAGE_HANDLE_STANDARD_ARGS( + LZ4 DEFAULT_MSG + LZ4_LIBRARY LZ4_INCLUDE_DIR +) + +if (LZ4_FOUND) + message(STATUS "Found LZ4: ${LZ4_LIBRARY}") +endif() + +mark_as_advanced(LZ4_INCLUDE_DIR LZ4_LIBRARY) diff --git a/data_tamer_cpp/3rdparty/mcap/cmake/FindZstd.cmake b/data_tamer_cpp/3rdparty/mcap/cmake/FindZstd.cmake new file mode 100644 index 0000000..42c5dd9 --- /dev/null +++ b/data_tamer_cpp/3rdparty/mcap/cmake/FindZstd.cmake @@ -0,0 +1,41 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# - Try to find Facebook zstd library +# This will define +# ZSTD_FOUND +# ZSTD_INCLUDE_DIR +# ZSTD_LIBRARY +# + +find_path(ZSTD_INCLUDE_DIR NAMES zstd.h) + +find_library(ZSTD_LIBRARY_DEBUG NAMES zstdd zstd_staticd) +find_library(ZSTD_LIBRARY_RELEASE NAMES zstd zstd_static) + +include(SelectLibraryConfigurations) +SELECT_LIBRARY_CONFIGURATIONS(ZSTD) + +include(FindPackageHandleStandardArgs) +FIND_PACKAGE_HANDLE_STANDARD_ARGS( + ZSTD DEFAULT_MSG + ZSTD_LIBRARY ZSTD_INCLUDE_DIR +) + +if (ZSTD_FOUND) + message(STATUS "Found Zstd: ${ZSTD_LIBRARY}") +endif() + +mark_as_advanced(ZSTD_INCLUDE_DIR ZSTD_LIBRARY) diff --git a/data_tamer_cpp/3rdparty/mcap/include/mcap/crc32.hpp b/data_tamer_cpp/3rdparty/mcap/include/mcap/crc32.hpp new file mode 100644 index 0000000..7eff3f7 --- /dev/null +++ b/data_tamer_cpp/3rdparty/mcap/include/mcap/crc32.hpp @@ -0,0 +1,108 @@ +#include +#include +#include + +namespace mcap::internal { + +/** + * Compute CRC32 lookup tables as described at: + * https://github.com/komrad36/CRC#option-6-1-byte-tabular + * + * An iteration of CRC computation can be performed on 8 bits of input at once. By pre-computing a + * table of the values of CRC(?) for all 2^8 = 256 possible byte values, during the final + * computation we can replace a loop over 8 bits with a single lookup in the table. + * + * For further speedup, we can also pre-compute the values of CRC(?0) for all possible bytes when a + * zero byte is appended. Then we can process two bytes of input at once by computing CRC(AB) = + * CRC(A0) ^ CRC(B), using one lookup in the CRC(?0) table and one lookup in the CRC(?) table. + * + * The same technique applies for any number of bytes to be processed at once, although the speed + * improvements diminish. + * + * @param Polynomial The binary representation of the polynomial to use (reversed, i.e. most + * significant bit represents x^0). + * @param NumTables The number of bytes of input that will be processed at once. + */ +template +struct CRC32Table { +private: + std::array table = {}; + +public: + constexpr CRC32Table() { + for (uint32_t i = 0; i < 256; i++) { + uint32_t r = i; + r = ((r & 1) * Polynomial) ^ (r >> 1); + r = ((r & 1) * Polynomial) ^ (r >> 1); + r = ((r & 1) * Polynomial) ^ (r >> 1); + r = ((r & 1) * Polynomial) ^ (r >> 1); + r = ((r & 1) * Polynomial) ^ (r >> 1); + r = ((r & 1) * Polynomial) ^ (r >> 1); + r = ((r & 1) * Polynomial) ^ (r >> 1); + r = ((r & 1) * Polynomial) ^ (r >> 1); + table[i] = r; + } + for (size_t i = 256; i < table.size(); i++) { + uint32_t value = table[i - 256]; + table[i] = table[value & 0xff] ^ (value >> 8); + } + } + + constexpr uint32_t operator[](size_t index) const { + return table[index]; + } +}; + +inline uint32_t getUint32LE(const std::byte* data) { + return (uint32_t(data[0]) << 0) | (uint32_t(data[1]) << 8) | (uint32_t(data[2]) << 16) | + (uint32_t(data[3]) << 24); +} + +static constexpr CRC32Table<0xedb88320, 8> CRC32_TABLE; + +/** + * Initialize a CRC32 to all 1 bits. + */ +static constexpr uint32_t CRC32_INIT = 0xffffffff; + +/** + * Update a streaming CRC32 calculation. + * + * For performance, this implementation processes the data 8 bytes at a time, using the algorithm + * presented at: https://github.com/komrad36/CRC#option-9-8-byte-tabular + */ +inline uint32_t crc32Update(const uint32_t prev, const std::byte* const data, const size_t length) { + // Process bytes one by one until we reach the proper alignment. + uint32_t r = prev; + size_t offset = 0; + for (; (uintptr_t(data + offset) & alignof(uint32_t)) != 0 && offset < length; offset++) { + r = CRC32_TABLE[(r ^ uint8_t(data[offset])) & 0xff] ^ (r >> 8); + } + if (offset == length) { + return r; + } + + // Process 8 bytes (2 uint32s) at a time. + size_t remainingBytes = length - offset; + for (; remainingBytes >= 8; offset += 8, remainingBytes -= 8) { + r ^= getUint32LE(data + offset); + uint32_t r2 = getUint32LE(data + offset + 4); + r = CRC32_TABLE[0 * 256 + ((r2 >> 24) & 0xff)] ^ CRC32_TABLE[1 * 256 + ((r2 >> 16) & 0xff)] ^ + CRC32_TABLE[2 * 256 + ((r2 >> 8) & 0xff)] ^ CRC32_TABLE[3 * 256 + ((r2 >> 0) & 0xff)] ^ + CRC32_TABLE[4 * 256 + ((r >> 24) & 0xff)] ^ CRC32_TABLE[5 * 256 + ((r >> 16) & 0xff)] ^ + CRC32_TABLE[6 * 256 + ((r >> 8) & 0xff)] ^ CRC32_TABLE[7 * 256 + ((r >> 0) & 0xff)]; + } + + // Process any remaining bytes one by one. + for (; offset < length; offset++) { + r = CRC32_TABLE[(r ^ uint8_t(data[offset])) & 0xff] ^ (r >> 8); + } + return r; +} + +/** Finalize a CRC32 by inverting the output value. */ +inline uint32_t crc32Final(uint32_t crc) { + return crc ^ 0xffffffff; +} + +} // namespace mcap::internal diff --git a/data_tamer_cpp/3rdparty/mcap/include/mcap/errors.hpp b/data_tamer_cpp/3rdparty/mcap/include/mcap/errors.hpp new file mode 100644 index 0000000..c25c39a --- /dev/null +++ b/data_tamer_cpp/3rdparty/mcap/include/mcap/errors.hpp @@ -0,0 +1,120 @@ +#pragma once + +#include + +namespace mcap { + +/** + * @brief Status codes for MCAP readers and writers. + */ +enum class StatusCode { + Success = 0, + NotOpen, + InvalidSchemaId, + InvalidChannelId, + FileTooSmall, + ReadFailed, + MagicMismatch, + InvalidFile, + InvalidRecord, + InvalidOpCode, + InvalidChunkOffset, + InvalidFooter, + DecompressionFailed, + DecompressionSizeMismatch, + UnrecognizedCompression, + OpenFailed, + MissingStatistics, + InvalidMessageReadOptions, + NoMessageIndexesAvailable, + UnsupportedCompression, +}; + +/** + * @brief Wraps a status code and string message carrying additional context. + */ +struct [[nodiscard]] Status { + StatusCode code; + std::string message; + + Status() + : code(StatusCode::Success) {} + + Status(StatusCode code) + : code(code) { + switch (code) { + case StatusCode::Success: + break; + case StatusCode::NotOpen: + message = "not open"; + break; + case StatusCode::InvalidSchemaId: + message = "invalid schema id"; + break; + case StatusCode::InvalidChannelId: + message = "invalid channel id"; + break; + case StatusCode::FileTooSmall: + message = "file too small"; + break; + case StatusCode::ReadFailed: + message = "read failed"; + break; + case StatusCode::MagicMismatch: + message = "magic mismatch"; + break; + case StatusCode::InvalidFile: + message = "invalid file"; + break; + case StatusCode::InvalidRecord: + message = "invalid record"; + break; + case StatusCode::InvalidOpCode: + message = "invalid opcode"; + break; + case StatusCode::InvalidChunkOffset: + message = "invalid chunk offset"; + break; + case StatusCode::InvalidFooter: + message = "invalid footer"; + break; + case StatusCode::DecompressionFailed: + message = "decompression failed"; + break; + case StatusCode::DecompressionSizeMismatch: + message = "decompression size mismatch"; + break; + case StatusCode::UnrecognizedCompression: + message = "unrecognized compression"; + break; + case StatusCode::OpenFailed: + message = "open failed"; + break; + case StatusCode::MissingStatistics: + message = "missing statistics"; + break; + case StatusCode::InvalidMessageReadOptions: + message = "message read options conflict"; + break; + case StatusCode::NoMessageIndexesAvailable: + message = "file has no message indices"; + break; + case StatusCode::UnsupportedCompression: + message = "unsupported compression"; + break; + default: + message = "unknown"; + break; + } + } + + Status(StatusCode code, const std::string& message) + : code(code) + , message(message) {} + + bool ok() const { + return code == StatusCode::Success; + } +}; + +} // namespace mcap diff --git a/data_tamer_cpp/3rdparty/mcap/include/mcap/internal.hpp b/data_tamer_cpp/3rdparty/mcap/include/mcap/internal.hpp new file mode 100644 index 0000000..4faedd0 --- /dev/null +++ b/data_tamer_cpp/3rdparty/mcap/include/mcap/internal.hpp @@ -0,0 +1,189 @@ +#pragma once + +#include "types.hpp" +#include + +// Do not compile on systems with non-8-bit bytes +static_assert(std::numeric_limits::digits == 8); + +namespace mcap { + +namespace internal { + +constexpr uint64_t MinHeaderLength = /* magic bytes */ sizeof(Magic) + + /* opcode */ 1 + + /* record length */ 8 + + /* profile length */ 4 + + /* library length */ 4; +constexpr uint64_t FooterLength = /* opcode */ 1 + + /* record length */ 8 + + /* summary start */ 8 + + /* summary offset start */ 8 + + /* summary crc */ 4 + + /* magic bytes */ sizeof(Magic); + +inline std::string ToHex(uint8_t byte) { + std::string result{2, '\0'}; + result[0] = "0123456789ABCDEF"[(uint8_t(byte) >> 4) & 0x0F]; + result[1] = "0123456789ABCDEF"[uint8_t(byte) & 0x0F]; + return result; +} +inline std::string ToHex(std::byte byte) { + return ToHex(uint8_t(byte)); +} + +inline std::string to_string(const std::string& arg) { + return arg; +} +inline std::string to_string(std::string_view arg) { + return std::string(arg); +} +inline std::string to_string(const char* arg) { + return std::string(arg); +} +template +[[nodiscard]] inline std::string StrCat(T&&... args) { + using mcap::internal::to_string; + using std::to_string; + return ("" + ... + to_string(std::forward(args))); +} + +inline uint32_t KeyValueMapSize(const KeyValueMap& map) { + size_t size = 0; + for (const auto& [key, value] : map) { + size += 4 + key.size() + 4 + value.size(); + } + return (uint32_t)(size); +} + +inline const std::string CompressionString(Compression compression) { + switch (compression) { + case Compression::None: + default: + return std::string{}; + case Compression::Lz4: + return "lz4"; + case Compression::Zstd: + return "zstd"; + } +} + +inline uint16_t ParseUint16(const std::byte* data) { + return uint16_t(data[0]) | (uint16_t(data[1]) << 8); +} + +inline uint32_t ParseUint32(const std::byte* data) { + return uint32_t(data[0]) | (uint32_t(data[1]) << 8) | (uint32_t(data[2]) << 16) | + (uint32_t(data[3]) << 24); +} + +inline Status ParseUint32(const std::byte* data, uint64_t maxSize, uint32_t* output) { + if (maxSize < 4) { + const auto msg = StrCat("cannot read uint32 from ", maxSize, " bytes"); + return Status{StatusCode::InvalidRecord, msg}; + } + *output = ParseUint32(data); + return StatusCode::Success; +} + +inline uint64_t ParseUint64(const std::byte* data) { + return uint64_t(data[0]) | (uint64_t(data[1]) << 8) | (uint64_t(data[2]) << 16) | + (uint64_t(data[3]) << 24) | (uint64_t(data[4]) << 32) | (uint64_t(data[5]) << 40) | + (uint64_t(data[6]) << 48) | (uint64_t(data[7]) << 56); +} + +inline Status ParseUint64(const std::byte* data, uint64_t maxSize, uint64_t* output) { + if (maxSize < 8) { + const auto msg = StrCat("cannot read uint64 from ", maxSize, " bytes"); + return Status{StatusCode::InvalidRecord, msg}; + } + *output = ParseUint64(data); + return StatusCode::Success; +} + +inline Status ParseStringView(const std::byte* data, uint64_t maxSize, std::string_view* output) { + uint32_t size = 0; + if (auto status = ParseUint32(data, maxSize, &size); !status.ok()) { + const auto msg = StrCat("cannot read string size: ", status.message); + return Status{StatusCode::InvalidRecord, msg}; + } + if (uint64_t(size) > (maxSize - 4)) { + const auto msg = StrCat("string size ", size, " exceeds remaining bytes ", (maxSize - 4)); + return Status(StatusCode::InvalidRecord, msg); + } + *output = std::string_view(reinterpret_cast(data + 4), size); + return StatusCode::Success; +} + +inline Status ParseString(const std::byte* data, uint64_t maxSize, std::string* output) { + uint32_t size = 0; + if (auto status = ParseUint32(data, maxSize, &size); !status.ok()) { + return status; + } + if (uint64_t(size) > (maxSize - 4)) { + const auto msg = StrCat("string size ", size, " exceeds remaining bytes ", (maxSize - 4)); + return Status(StatusCode::InvalidRecord, msg); + } + *output = std::string(reinterpret_cast(data + 4), size); + return StatusCode::Success; +} + +inline Status ParseByteArray(const std::byte* data, uint64_t maxSize, ByteArray* output) { + uint32_t size = 0; + if (auto status = ParseUint32(data, maxSize, &size); !status.ok()) { + return status; + } + if (uint64_t(size) > (maxSize - 4)) { + const auto msg = StrCat("byte array size ", size, " exceeds remaining bytes ", (maxSize - 4)); + return Status(StatusCode::InvalidRecord, msg); + } + output->resize(size); + std::memcpy(output->data(), data + 4, size); + return StatusCode::Success; +} + +inline Status ParseKeyValueMap(const std::byte* data, uint64_t maxSize, KeyValueMap* output) { + uint32_t sizeInBytes = 0; + if (auto status = ParseUint32(data, maxSize, &sizeInBytes); !status.ok()) { + return status; + } + if (sizeInBytes > (maxSize - 4)) { + const auto msg = + StrCat("key-value map size ", sizeInBytes, " exceeds remaining bytes ", (maxSize - 4)); + return Status(StatusCode::InvalidRecord, msg); + } + + // Account for the byte size prefix in sizeInBytes to make the bounds checking + // below simpler + sizeInBytes += 4; + + output->clear(); + uint64_t pos = 4; + while (pos < sizeInBytes) { + std::string_view key; + if (auto status = ParseStringView(data + pos, sizeInBytes - pos, &key); !status.ok()) { + const auto msg = StrCat("cannot read key-value map key at pos ", pos, ": ", status.message); + return Status{StatusCode::InvalidRecord, msg}; + } + pos += 4 + key.size(); + std::string_view value; + if (auto status = ParseStringView(data + pos, sizeInBytes - pos, &value); !status.ok()) { + const auto msg = StrCat("cannot read key-value map value for key \"", key, "\" at pos ", pos, + ": ", status.message); + return Status{StatusCode::InvalidRecord, msg}; + } + pos += 4 + value.size(); + output->emplace(key, value); + } + return StatusCode::Success; +} + +inline std::string MagicToHex(const std::byte* data) { + return internal::ToHex(data[0]) + internal::ToHex(data[1]) + internal::ToHex(data[2]) + + internal::ToHex(data[3]) + internal::ToHex(data[4]) + internal::ToHex(data[5]) + + internal::ToHex(data[6]) + internal::ToHex(data[7]); +} + +} // namespace internal + +} // namespace mcap diff --git a/data_tamer_cpp/3rdparty/mcap/include/mcap/intervaltree.hpp b/data_tamer_cpp/3rdparty/mcap/include/mcap/intervaltree.hpp new file mode 100644 index 0000000..b9c2b6e --- /dev/null +++ b/data_tamer_cpp/3rdparty/mcap/include/mcap/intervaltree.hpp @@ -0,0 +1,303 @@ +// Adapted from + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace mcap::internal { + +template +class Interval { +public: + Scalar start; + Scalar stop; + Value value; + Interval(const Scalar& s, const Scalar& e, const Value& v) + : start(std::min(s, e)) + , stop(std::max(s, e)) + , value(v) {} +}; + +template +Value intervalStart(const Interval& i) { + return i.start; +} + +template +Value intervalStop(const Interval& i) { + return i.stop; +} + +template +std::ostream& operator<<(std::ostream& out, const Interval& i) { + out << "Interval(" << i.start << ", " << i.stop << "): " << i.value; + return out; +} + +template +class IntervalTree { +public: + using interval = Interval; + using interval_vector = std::vector; + + struct IntervalStartCmp { + bool operator()(const interval& a, const interval& b) { + return a.start < b.start; + } + }; + + struct IntervalStopCmp { + bool operator()(const interval& a, const interval& b) { + return a.stop < b.stop; + } + }; + + IntervalTree() + : left(nullptr) + , right(nullptr) + , center(Scalar(0)) {} + + ~IntervalTree() = default; + + std::unique_ptr clone() const { + return std::unique_ptr(new IntervalTree(*this)); + } + + IntervalTree(const IntervalTree& other) + : intervals(other.intervals) + , left(other.left ? other.left->clone() : nullptr) + , right(other.right ? other.right->clone() : nullptr) + , center(other.center) {} + + IntervalTree& operator=(IntervalTree&&) = default; + IntervalTree(IntervalTree&&) = default; + + IntervalTree& operator=(const IntervalTree& other) { + center = other.center; + intervals = other.intervals; + left = other.left ? other.left->clone() : nullptr; + right = other.right ? other.right->clone() : nullptr; + return *this; + } + + IntervalTree(interval_vector&& ivals, std::size_t depth = 16, std::size_t minbucket = 64, + std::size_t maxbucket = 512, Scalar leftextent = 0, Scalar rightextent = 0) + : left(nullptr) + , right(nullptr) { + --depth; + const auto minmaxStop = std::minmax_element(ivals.begin(), ivals.end(), IntervalStopCmp()); + const auto minmaxStart = std::minmax_element(ivals.begin(), ivals.end(), IntervalStartCmp()); + if (!ivals.empty()) { + center = (minmaxStart.first->start + minmaxStop.second->stop) / 2; + } + if (leftextent == 0 && rightextent == 0) { + // sort intervals by start + std::sort(ivals.begin(), ivals.end(), IntervalStartCmp()); + } else { + assert(std::is_sorted(ivals.begin(), ivals.end(), IntervalStartCmp())); + } + if (depth == 0 || (ivals.size() < minbucket && ivals.size() < maxbucket)) { + std::sort(ivals.begin(), ivals.end(), IntervalStartCmp()); + intervals = std::move(ivals); + assert(is_valid().first); + return; + } else { + Scalar leftp = 0; + Scalar rightp = 0; + + if (leftextent || rightextent) { + leftp = leftextent; + rightp = rightextent; + } else { + leftp = ivals.front().start; + rightp = std::max_element(ivals.begin(), ivals.end(), IntervalStopCmp())->stop; + } + + interval_vector lefts; + interval_vector rights; + + for (typename interval_vector::const_iterator i = ivals.begin(); i != ivals.end(); ++i) { + const interval& interval = *i; + if (interval.stop < center) { + lefts.push_back(interval); + } else if (interval.start > center) { + rights.push_back(interval); + } else { + assert(interval.start <= center); + assert(center <= interval.stop); + intervals.push_back(interval); + } + } + + if (!lefts.empty()) { + left.reset(new IntervalTree(std::move(lefts), depth, minbucket, maxbucket, leftp, center)); + } + if (!rights.empty()) { + right.reset( + new IntervalTree(std::move(rights), depth, minbucket, maxbucket, center, rightp)); + } + } + assert(is_valid().first); + } + + // Call f on all intervals near the range [start, stop]: + template + void visit_near(const Scalar& start, const Scalar& stop, UnaryFunction f) const { + if (!intervals.empty() && !(stop < intervals.front().start)) { + for (auto& i : intervals) { + f(i); + } + } + if (left && start <= center) { + left->visit_near(start, stop, f); + } + if (right && stop >= center) { + right->visit_near(start, stop, f); + } + } + + // Call f on all intervals crossing pos + template + void visit_overlapping(const Scalar& pos, UnaryFunction f) const { + visit_overlapping(pos, pos, f); + } + + // Call f on all intervals overlapping [start, stop] + template + void visit_overlapping(const Scalar& start, const Scalar& stop, UnaryFunction f) const { + auto filterF = [&](const interval& interval) { + if (interval.stop >= start && interval.start <= stop) { + // Only apply f if overlapping + f(interval); + } + }; + visit_near(start, stop, filterF); + } + + // Call f on all intervals contained within [start, stop] + template + void visit_contained(const Scalar& start, const Scalar& stop, UnaryFunction f) const { + auto filterF = [&](const interval& interval) { + if (start <= interval.start && interval.stop <= stop) { + f(interval); + } + }; + visit_near(start, stop, filterF); + } + + interval_vector find_overlapping(const Scalar& start, const Scalar& stop) const { + interval_vector result; + visit_overlapping(start, stop, [&](const interval& interval) { + result.emplace_back(interval); + }); + return result; + } + + interval_vector find_contained(const Scalar& start, const Scalar& stop) const { + interval_vector result; + visit_contained(start, stop, [&](const interval& interval) { + result.push_back(interval); + }); + return result; + } + + bool empty() const { + if (left && !left->empty()) { + return false; + } + if (!intervals.empty()) { + return false; + } + if (right && !right->empty()) { + return false; + } + return true; + } + + template + void visit_all(UnaryFunction f) const { + if (left) { + left->visit_all(f); + } + std::for_each(intervals.begin(), intervals.end(), f); + if (right) { + right->visit_all(f); + } + } + + std::pair extent() const { + struct Extent { + std::pair x{std::numeric_limits::max(), + std::numeric_limits::min()}; + void operator()(const interval& interval) { + x.first = std::min(x.first, interval.start); + x.second = std::max(x.second, interval.stop); + } + }; + Extent extent; + + visit_all([&](const interval& interval) { + extent(interval); + }); + return extent.x; + } + + // Check all constraints. + // If first is false, second is invalid. + std::pair> is_valid() const { + const auto minmaxStop = + std::minmax_element(intervals.begin(), intervals.end(), IntervalStopCmp()); + const auto minmaxStart = + std::minmax_element(intervals.begin(), intervals.end(), IntervalStartCmp()); + + std::pair> result = { + true, {std::numeric_limits::max(), std::numeric_limits::min()}}; + if (!intervals.empty()) { + result.second.first = std::min(result.second.first, minmaxStart.first->start); + result.second.second = std::min(result.second.second, minmaxStop.second->stop); + } + if (left) { + auto valid = left->is_valid(); + result.first &= valid.first; + result.second.first = std::min(result.second.first, valid.second.first); + result.second.second = std::min(result.second.second, valid.second.second); + if (!result.first) { + return result; + } + if (valid.second.second >= center) { + result.first = false; + return result; + } + } + if (right) { + auto valid = right->is_valid(); + result.first &= valid.first; + result.second.first = std::min(result.second.first, valid.second.first); + result.second.second = std::min(result.second.second, valid.second.second); + if (!result.first) { + return result; + } + if (valid.second.first <= center) { + result.first = false; + return result; + } + } + if (!std::is_sorted(intervals.begin(), intervals.end(), IntervalStartCmp())) { + result.first = false; + } + return result; + } + +private: + interval_vector intervals; + std::unique_ptr left; + std::unique_ptr right; + Scalar center; +}; + +} // namespace mcap::internal diff --git a/data_tamer_cpp/3rdparty/mcap/include/mcap/mcap.hpp b/data_tamer_cpp/3rdparty/mcap/include/mcap/mcap.hpp new file mode 100644 index 0000000..71f479c --- /dev/null +++ b/data_tamer_cpp/3rdparty/mcap/include/mcap/mcap.hpp @@ -0,0 +1,4 @@ +#pragma once + +#include "reader.hpp" +#include "writer.hpp" diff --git a/data_tamer_cpp/3rdparty/mcap/include/mcap/read_job_queue.hpp b/data_tamer_cpp/3rdparty/mcap/include/mcap/read_job_queue.hpp new file mode 100644 index 0000000..7faf446 --- /dev/null +++ b/data_tamer_cpp/3rdparty/mcap/include/mcap/read_job_queue.hpp @@ -0,0 +1,147 @@ +#pragma once + +#include "types.hpp" +#include +#include + +namespace mcap::internal { + +// Helper for writing compile-time exhaustive variant visitors. +template +inline constexpr bool always_false_v = false; + +/** + * @brief A job to read a specific message at offset `offset` from the decompressed chunk + * stored in `chunkReaderIndex`. A timestamp is provided to order this job relative to other jobs. + */ +struct ReadMessageJob { + Timestamp timestamp; + RecordOffset offset; + size_t chunkReaderIndex; +}; + +/** + * @brief A job to decompress the chunk starting at `chunkStartOffset`. The message indices + * starting directly after the chunk record and ending at `messageIndexEndOffset` will be used to + * find specific messages within the chunk. + */ +struct DecompressChunkJob { + Timestamp messageStartTime; + Timestamp messageEndTime; + ByteOffset chunkStartOffset; + ByteOffset messageIndexEndOffset; +}; + +/** + * @brief A union of jobs that an indexed MCAP reader executes. + */ +using ReadJob = std::variant; + +/** + * @brief A priority queue of jobs for an indexed MCAP reader to execute. + */ +struct ReadJobQueue { +private: + bool reverse_ = false; + std::vector heap_; + + /** + * @brief return the timestamp key that should be used to compare jobs. + */ + static Timestamp TimeComparisonKey(const ReadJob& job, bool reverse) { + Timestamp result = 0; + std::visit( + [&](auto&& arg) { + using T = std::decay_t; + if constexpr (std::is_same_v) { + result = arg.timestamp; + } else if constexpr (std::is_same_v) { + if (reverse) { + result = arg.messageEndTime; + } else { + result = arg.messageStartTime; + } + } else { + static_assert(always_false_v, "non-exhaustive visitor!"); + } + }, + job); + return result; + } + static RecordOffset PositionComparisonKey(const ReadJob& job, bool reverse) { + RecordOffset result; + std::visit( + [&](auto&& arg) { + using T = std::decay_t; + if constexpr (std::is_same_v) { + result = arg.offset; + } else if constexpr (std::is_same_v) { + if (reverse) { + result.offset = arg.messageIndexEndOffset; + } else { + result.offset = arg.chunkStartOffset; + } + } else { + static_assert(always_false_v, "non-exhaustive visitor!"); + } + }, + job); + return result; + } + + static bool CompareForward(const ReadJob& a, const ReadJob& b) { + auto aTimestamp = TimeComparisonKey(a, false); + auto bTimestamp = TimeComparisonKey(b, false); + if (aTimestamp == bTimestamp) { + return PositionComparisonKey(a, false) > PositionComparisonKey(b, false); + } + return aTimestamp > bTimestamp; + } + + static bool CompareReverse(const ReadJob& a, const ReadJob& b) { + auto aTimestamp = TimeComparisonKey(a, true); + auto bTimestamp = TimeComparisonKey(b, true); + if (aTimestamp == bTimestamp) { + return PositionComparisonKey(a, true) < PositionComparisonKey(b, true); + } + return aTimestamp < bTimestamp; + } + +public: + explicit ReadJobQueue(bool reverse) + : reverse_(reverse) {} + void push(DecompressChunkJob&& decompressChunkJob) { + heap_.emplace_back(std::move(decompressChunkJob)); + if (!reverse_) { + std::push_heap(heap_.begin(), heap_.end(), CompareForward); + } else { + std::push_heap(heap_.begin(), heap_.end(), CompareReverse); + } + } + + void push(ReadMessageJob&& readMessageJob) { + heap_.emplace_back(std::move(readMessageJob)); + if (!reverse_) { + std::push_heap(heap_.begin(), heap_.end(), CompareForward); + } else { + std::push_heap(heap_.begin(), heap_.end(), CompareReverse); + } + } + + ReadJob pop() { + if (!reverse_) { + std::pop_heap(heap_.begin(), heap_.end(), CompareForward); + } else { + std::pop_heap(heap_.begin(), heap_.end(), CompareReverse); + } + auto popped = heap_.back(); + heap_.pop_back(); + return popped; + } + + size_t len() const { + return heap_.size(); + } +}; + +} // namespace mcap::internal diff --git a/data_tamer_cpp/3rdparty/mcap/include/mcap/reader.hpp b/data_tamer_cpp/3rdparty/mcap/include/mcap/reader.hpp new file mode 100644 index 0000000..ab030a9 --- /dev/null +++ b/data_tamer_cpp/3rdparty/mcap/include/mcap/reader.hpp @@ -0,0 +1,726 @@ +#pragma once + +#include "intervaltree.hpp" +#include "read_job_queue.hpp" +#include "types.hpp" +#include "visibility.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace mcap { + +enum struct ReadSummaryMethod { + /** + * @brief Parse the Summary section to produce seeking indexes and summary + * statistics. If the Summary section is not present or corrupt, a failure + * Status is returned and the seeking indexes and summary statistics are not + * populated. + */ + NoFallbackScan, + /** + * @brief If the Summary section is missing or incomplete, allow falling back + * to reading the file sequentially to produce seeking indexes and summary + * statistics. + */ + AllowFallbackScan, + /** + * @brief Read the file sequentially from Header to DataEnd to produce seeking + * indexes and summary statistics. + */ + ForceScan, +}; + +/** + * @brief An abstract interface for reading MCAP data. + */ +struct MCAP_PUBLIC IReadable { + virtual ~IReadable() = default; + + /** + * @brief Returns the size of the file in bytes. + * + * @return uint64_t The total number of bytes in the MCAP file. + */ + virtual uint64_t size() const = 0; + /** + * @brief This method is called by MCAP reader classes when they need to read + * a portion of the file. + * + * @param output A pointer to a pointer to the buffer to write to. This method + * is expected to either maintain an internal buffer, read data into it, and + * update this pointer to point at the internal buffer, or update this + * pointer to point directly at the source data if possible. The pointer and + * data must remain valid and unmodified until the next call to read(). + * @param offset The offset in bytes from the beginning of the file to read. + * @param size The number of bytes to read. + * @return uint64_t Number of bytes actually read. This may be less than the + * requested size if the end of the file is reached. The output pointer must + * be readable from `output` to `output + size`. If the read fails, this + * method should return 0. + */ + virtual uint64_t read(std::byte** output, uint64_t offset, uint64_t size) = 0; +}; + +/** + * @brief IReadable implementation wrapping a FILE* pointer created by fopen() + * and a read buffer. + */ +class MCAP_PUBLIC FileReader final : public IReadable { +public: + FileReader(std::FILE* file); + + uint64_t size() const override; + uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override; + +private: + std::FILE* file_; + std::vector buffer_; + uint64_t size_; + uint64_t position_; +}; + +/** + * @brief IReadable implementation wrapping a std::ifstream input file stream. + */ +class MCAP_PUBLIC FileStreamReader final : public IReadable { +public: + FileStreamReader(std::ifstream& stream); + + uint64_t size() const override; + uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override; + +private: + std::ifstream& stream_; + std::vector buffer_; + uint64_t size_; + uint64_t position_; +}; + +/** + * @brief An abstract interface for compressed readers. + */ +class MCAP_PUBLIC ICompressedReader : public IReadable { +public: + virtual ~ICompressedReader() override = default; + + /** + * @brief Reset the reader state, clearing any internal buffers and state, and + * initialize with new compressed data. + * + * @param data Compressed data to read from. + * @param size Size of the compressed data in bytes. + * @param uncompressedSize Size of the data in bytes after decompression. A + * buffer of this size will be allocated for the uncompressed data. + */ + virtual void reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) = 0; + /** + * @brief Report the current status of decompression. A StatusCode other than + * `StatusCode::Success` after `reset()` is called indicates the decompression + * was not successful and the reader is in an invalid state. + */ + virtual Status status() const = 0; +}; + +/** + * @brief A "null" compressed reader that directly passes through uncompressed + * data. No internal buffers are allocated. + */ +class MCAP_PUBLIC BufferReader final : public ICompressedReader { +public: + void reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) override; + uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override; + uint64_t size() const override; + Status status() const override; + + BufferReader() = default; + BufferReader(const BufferReader&) = delete; + BufferReader& operator=(const BufferReader&) = delete; + BufferReader(BufferReader&&) = delete; + BufferReader& operator=(BufferReader&&) = delete; + +private: + const std::byte* data_; + uint64_t size_; +}; + +#ifndef MCAP_COMPRESSION_NO_ZSTD +/** + * @brief ICompressedReader implementation that decompresses Zstandard + * (https://facebook.github.io/zstd/) data. + */ +class MCAP_PUBLIC ZStdReader final : public ICompressedReader { +public: + void reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) override; + uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override; + uint64_t size() const override; + Status status() const override; + + /** + * @brief Decompresses an entire Zstd-compressed chunk into `output`. + * + * @param data The Zstd-compressed input chunk. + * @param compressedSize The size of the Zstd-compressed input. + * @param uncompressedSize The size of the data once uncompressed. + * @param output The output vector. This will be resized to `uncompressedSize` to fit the data, + * or 0 if the decompression encountered an error. + * @return Status + */ + static Status DecompressAll(const std::byte* data, uint64_t compressedSize, + uint64_t uncompressedSize, ByteArray* output); + ZStdReader() = default; + ZStdReader(const ZStdReader&) = delete; + ZStdReader& operator=(const ZStdReader&) = delete; + ZStdReader(ZStdReader&&) = delete; + ZStdReader& operator=(ZStdReader&&) = delete; + +private: + Status status_; + ByteArray uncompressedData_; +}; +#endif + +#ifndef MCAP_COMPRESSION_NO_LZ4 +/** + * @brief ICompressedReader implementation that decompresses LZ4 + * (https://lz4.github.io/lz4/) data. + */ +class MCAP_PUBLIC LZ4Reader final : public ICompressedReader { +public: + void reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) override; + uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override; + uint64_t size() const override; + Status status() const override; + + /** + * @brief Decompresses an entire LZ4-encoded chunk into `output`. + * + * @param data The LZ4-compressed input chunk. + * @param size The size of the LZ4-compressed input. + * @param uncompressedSize The size of the data once uncompressed. + * @param output The output vector. This will be resized to `uncompressedSize` to fit the data, + * or 0 if the decompression encountered an error. + * @return Status + */ + Status decompressAll(const std::byte* data, uint64_t size, uint64_t uncompressedSize, + ByteArray* output); + LZ4Reader(); + LZ4Reader(const LZ4Reader&) = delete; + LZ4Reader& operator=(const LZ4Reader&) = delete; + LZ4Reader(LZ4Reader&&) = delete; + LZ4Reader& operator=(LZ4Reader&&) = delete; + ~LZ4Reader() override; + +private: + void* decompressionContext_ = nullptr; // LZ4F_dctx* + Status status_; + const std::byte* compressedData_; + ByteArray uncompressedData_; + uint64_t compressedSize_; + uint64_t uncompressedSize_; +}; +#endif + +struct LinearMessageView; + +/** + * @brief Options for reading messages out of an MCAP file. + */ +struct MCAP_PUBLIC ReadMessageOptions { +public: + /** + * @brief Only messages with log timestamps greater or equal to startTime will be included. + */ + Timestamp startTime = 0; + /** + * @brief Only messages with log timestamps less than endTime will be included. + */ + Timestamp endTime = MaxTime; + /** + * @brief If provided, `topicFilter` is called on all topics found in the MCAP file. If + * `topicFilter` returns true for a given channel, messages from that channel will be included. + * if not provided, messages from all channels are provided. + */ + std::function topicFilter; + enum struct ReadOrder { FileOrder, LogTimeOrder, ReverseLogTimeOrder }; + /** + * @brief Set the expected order that messages should be returned in. + * if readOrder == FileOrder, messages will be returned in the order they appear in the MCAP file. + * if readOrder == LogTimeOrder, messages will be returned in ascending log time order. + * if readOrder == ReverseLogTimeOrder, messages will be returned in descending log time order. + */ + ReadOrder readOrder = ReadOrder::FileOrder; + + ReadMessageOptions(Timestamp start, Timestamp end) + : startTime(start) + , endTime(end) {} + + ReadMessageOptions() = default; + + /** + * @brief validate the configuration. + */ + Status validate() const; +}; + +/** + * @brief Provides a read interface to an MCAP file. + */ +class MCAP_PUBLIC McapReader final { +public: + ~McapReader(); + + /** + * @brief Opens an MCAP file for reading from an already constructed IReadable + * implementation. + * + * @param reader An implementation of the IReader interface that provides raw + * MCAP data. + * @return Status StatusCode::Success on success. If a non-success Status is + * returned, the data source is not considered open and McapReader is not + * usable until `open()` is called and a success response is returned. + */ + Status open(IReadable& reader); + /** + * @brief Opens an MCAP file for reading from a given filename. + * + * @param filename Filename to open. + * @return Status StatusCode::Success on success. If a non-success Status is + * returned, the data source is not considered open and McapReader is not + * usable until `open()` is called and a success response is returned. + */ + Status open(std::string_view filename); + /** + * @brief Opens an MCAP file for reading from a std::ifstream input file + * stream. + * + * @param stream Input file stream to read MCAP data from. + * @return Status StatusCode::Success on success. If a non-success Status is + * returned, the file is not considered open and McapReader is not usable + * until `open()` is called and a success response is returned. + */ + Status open(std::ifstream& stream); + + /** + * @brief Closes the MCAP file, clearing any internal data structures and + * state and dropping the data source reference. + * + */ + void close(); + + /** + * @brief Read and parse the Summary section at the end of the MCAP file, if + * available. This will populate internal indexes to allow for efficient + * summarization and random access. This method will automatically be called + * upon requesting summary data or first seek if Summary section parsing is + * allowed by the configuration options. + */ + Status readSummary( + ReadSummaryMethod method, const ProblemCallback& onProblem = [](const Status&) {}); + + /** + * @brief Returns an iterable view with `begin()` and `end()` methods for + * iterating Messages in the MCAP file. If a non-zero `startTime` is provided, + * this will first parse the Summary section (by calling `readSummary()`) if + * allowed by the configuration options and it has not been parsed yet. + * + * @param startTime Optional start time in nanoseconds. Messages before this + * time will not be returned. + * @param endTime Optional end time in nanoseconds. Messages equal to or after + * this time will not be returned. + */ + LinearMessageView readMessages(Timestamp startTime = 0, Timestamp endTime = MaxTime); + /** + * @brief Returns an iterable view with `begin()` and `end()` methods for + * iterating Messages in the MCAP file. If a non-zero `startTime` is provided, + * this will first parse the Summary section (by calling `readSummary()`) if + * allowed by the configuration options and it has not been parsed yet. + * + * @param onProblem A callback that will be called when a parsing error + * occurs. Problems can either be recoverable, indicating some data could + * not be read, or non-recoverable, stopping the iteration. + * @param startTime Optional start time in nanoseconds. Messages before this + * time will not be returned. + * @param endTime Optional end time in nanoseconds. Messages equal to or after + * this time will not be returned. + */ + LinearMessageView readMessages(const ProblemCallback& onProblem, Timestamp startTime = 0, + Timestamp endTime = MaxTime); + + /** + * @brief Returns an iterable view with `begin()` and `end()` methods for + * iterating Messages in the MCAP file. + * Uses the options from `options` to select the messages that are yielded. + */ + LinearMessageView readMessages(const ProblemCallback& onProblem, + const ReadMessageOptions& options); + + /** + * @brief Returns starting and ending byte offsets that must be read to + * iterate all messages in the given time range. If `readSummary()` has been + * successfully called and the recording contains Chunk records, this range + * will be narrowed to Chunk records that contain messages in the given time + * range. Otherwise, this range will be the entire Data section if the Data + * End record has been found or the entire file otherwise. + * + * This method is automatically used by `readMessages()`, and only needs to be + * called directly if the caller is manually constructing an iterator. + * + * @param startTime Start time in nanoseconds. + * @param endTime Optional end time in nanoseconds. + * @return Start and end byte offsets. + */ + std::pair byteRange(Timestamp startTime, + Timestamp endTime = MaxTime) const; + + /** + * @brief Returns a pointer to the IReadable data source backing this reader. + * Will return nullptr if the reader is not open. + */ + IReadable* dataSource(); + + /** + * @brief Returns the parsed Header record, if it has been encountered. + */ + const std::optional
& header() const; + /** + * @brief Returns the parsed Footer record, if it has been encountered. + */ + const std::optional