Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable most writer conformance tests for Rust #1313

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
26a1910
Support spec flags in Rust conformance test writer
Muon Jan 15, 2025
645ccf3
Rename chunkin_time() to start_chunk()
Muon Jan 15, 2025
490ff8d
Simplify chunk size check
Muon Jan 15, 2025
fc2d1b5
Use contains_right() rather than get_right().is_none()
Muon Jan 15, 2025
9eaecaf
Use clear() rather than truncate(0)
Muon Jan 15, 2025
6606780
Use mem::take() instead of mem::swap() in finish()
Muon Jan 15, 2025
8661f98
Factor out chunk_mode from WriteMode
Muon Jan 15, 2025
af11109
Compute the CRC for the DataEnd record
Muon Jan 16, 2025
444783f
Add support for disabling summary offsets
Muon Jan 16, 2025
f9e63ef
Fix parsing of data in Rust conformance writer spec parser
Muon Jan 16, 2025
0263e68
Start channel IDs from 1 as per spec
Muon Jan 16, 2025
ffff966
Add support for toggling more options in Rust writer
Muon Jan 16, 2025
98df8ac
Handle channel metadata in Rust conformance writer
Muon Jan 16, 2025
7b4ffab
Add toggles for attachment/metadata indexes in Rust writer
Muon Jan 16, 2025
02d7429
Enable conformance tests for Rust for some variants
Muon Jan 16, 2025
94e5b7f
Actually make disabling statistics work
Muon Jan 17, 2025
ff99bf0
Fix chunk and message index length calculation
Muon Jan 17, 2025
e5b8b2d
Refactor summary section generation
Muon Jan 17, 2025
ac19a1a
Enable all features except pad for Rust writer
Muon Jan 17, 2025
c4a1163
Output statistics before chunk indexes, like the others
Muon Jan 17, 2025
dad153e
Improve documentation for new writer toggles
Muon Jan 17, 2025
b64cf87
Say 'emit' instead of 'output' for writer toggles
Muon Jan 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions rust/examples/common/conformance_writer_spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ impl Record {
.as_array()
.unwrap_or_else(|| panic!("Invalid: {}", name))
.iter()
.filter_map(|v| v.as_u64())
.filter_map(|n| u8::try_from(n).ok())
.filter_map(|v| match v {
Value::String(s) => s.parse::<u8>().ok(),
Value::Number(n) => n.as_u64()?.try_into().ok(),
_ => None,
})
.collect()
}

Expand Down
47 changes: 40 additions & 7 deletions rust/examples/conformance_writer.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,46 @@
use std::{
borrow::Cow,
collections::{BTreeMap, HashMap},
env,
};
use std::{borrow::Cow, collections::HashMap, env};

#[path = "common/conformance_writer_spec.rs"]
mod conformance_writer_spec;

const USE_CHUNKS: &str = "ch";
const USE_MESSAGE_INDEX: &str = "mx";
const USE_STATISTICS: &str = "st";
const USE_REPEATED_SCHEMAS: &str = "rsh";
const USE_REPEATED_CHANNEL_INFOS: &str = "rch";
const USE_ATTACHMENT_INDEX: &str = "ax";
const USE_METADATA_INDEX: &str = "mdx";
const USE_CHUNK_INDEX: &str = "chx";
const USE_SUMMARY_OFFSET: &str = "sum";
const ADD_EXTRA_DATA_TO_RECORDS: &str = "pad";

fn write_file(spec: &conformance_writer_spec::WriterSpec) {
let mut writer = mcap::WriteOptions::new()
let mut write_options = mcap::WriteOptions::new()
.compression(None)
.profile("")
.library("")
.disable_seeking(true)
.emit_summary_records(false)
.emit_summary_offsets(false)
.use_chunks(false)
.emit_message_indexes(false);

for feature in spec.meta.variant.features.iter() {
write_options = match feature.as_str() {
USE_CHUNKS => write_options.use_chunks(true),
USE_STATISTICS => write_options.emit_statistics(true),
USE_SUMMARY_OFFSET => write_options.emit_summary_offsets(true),
USE_REPEATED_CHANNEL_INFOS => write_options.repeat_channels(true),
USE_REPEATED_SCHEMAS => write_options.repeat_schemas(true),
USE_MESSAGE_INDEX => write_options.emit_message_indexes(true),
USE_ATTACHMENT_INDEX => write_options.emit_attachment_indexes(true),
USE_METADATA_INDEX => write_options.emit_metadata_indexes(true),
USE_CHUNK_INDEX => write_options.emit_chunk_indexes(true),
_ => unimplemented!("unknown or unimplemented feature: {}", feature),
}
}

let mut writer = write_options
.create(binrw::io::NoSeek::new(std::io::stdout()))
.expect("Couldn't create writer");

Expand Down Expand Up @@ -46,8 +75,9 @@ fn write_file(spec: &conformance_writer_spec::WriterSpec) {
};
let topic = record.get_field_str("topic");
let message_encoding = record.get_field_str("message_encoding");
let metadata = record.get_field_meta("metadata");
let returned_id = writer
.add_channel(output_schema_id, topic, message_encoding, &BTreeMap::new())
.add_channel(output_schema_id, topic, message_encoding, &metadata)
.expect("Couldn't write channel");
channel_ids.insert(id, returned_id);
}
Expand Down Expand Up @@ -108,6 +138,9 @@ fn write_file(spec: &conformance_writer_spec::WriterSpec) {
};
writer.write_metadata(&meta).expect("Can't write metadata");
}
"MetadataIndex" => {
// written automatically
}
"Schema" => {
let name = record.get_field_str("name");
let encoding = record.get_field_str("encoding");
Expand Down
49 changes: 40 additions & 9 deletions rust/src/chunk_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ use std::io::{Cursor, Seek, Write};
/// The kind of writer that should be used for writing chunks.
///
/// This is used to select what [`ChunkSink`] should be used by the MCAP writer.
#[derive(Default)]
pub(crate) enum ChunkMode {
/// Mode specifying that chunks should be written directly to the output
#[default]
Direct,
/// Mode specifying that chunks should be buffered before writing to the output
Buffered {
Expand All @@ -17,25 +19,54 @@ pub(crate) enum ChunkMode {
///
/// If chunks are buffered they will be written to an internal buffer, which can be flushed to the
/// provided writer once the chunk is completed.
pub(crate) enum ChunkSink<W> {
Direct(W),
Buffered(W, Cursor<Vec<u8>>),
pub(crate) struct ChunkSink<W> {
pub inner: W,
pub buffer: Option<Cursor<Vec<u8>>>,
}

impl<W> ChunkSink<W> {
pub fn new(writer: W, mode: ChunkMode) -> Self {
Self {
inner: writer,
buffer: match mode {
ChunkMode::Buffered { mut buffer } => {
// ensure the buffer is empty before using it for the chunk
buffer.clear();
Some(Cursor::new(buffer))
}
ChunkMode::Direct => None,
},
}
}
}

impl<W: Write> ChunkSink<W> {
fn as_mut_write(&mut self) -> &mut dyn Write {
match self {
Self::Direct(w) => w,
Self::Buffered(_, w) => w,
match &mut self.buffer {
Some(w) => w,
None => &mut self.inner,
}
}

pub fn finish(self) -> std::io::Result<(W, ChunkMode)> {
let ChunkSink { mut inner, buffer } = self;
let mode = match buffer {
Some(buffer) => {
let buffer = buffer.into_inner();
inner.write_all(&buffer)?;
ChunkMode::Buffered { buffer }
}
None => ChunkMode::Direct,
};
Ok((inner, mode))
}
}

impl<W: Seek> ChunkSink<W> {
fn as_mut_seek(&mut self) -> &mut dyn Seek {
match self {
Self::Direct(w) => w,
Self::Buffered(_, w) => w,
match &mut self.buffer {
Some(w) => w,
None => &mut self.inner,
}
}
}
Expand Down
24 changes: 21 additions & 3 deletions rust/src/io_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub struct CountingCrcWriter<W> {
count: u64,
}

impl<W: Write> CountingCrcWriter<W> {
impl<W> CountingCrcWriter<W> {
pub fn new(inner: W) -> Self {
Self {
inner,
Expand All @@ -17,6 +17,14 @@ impl<W: Write> CountingCrcWriter<W> {
}
}

pub fn with_hasher(inner: W, hasher: Hasher) -> Self {
Self {
inner,
hasher,
count: 0,
}
}

pub fn position(&self) -> u64 {
self.count
}
Expand All @@ -26,8 +34,8 @@ impl<W: Write> CountingCrcWriter<W> {
}

/// Consumes the reader and returns the inner writer and the checksum
pub fn finalize(self) -> (W, u32) {
(self.inner, self.hasher.finalize())
pub fn finalize(self) -> (W, Hasher) {
(self.inner, self.hasher)
}
}

Expand All @@ -43,3 +51,13 @@ impl<W: Write> Write for CountingCrcWriter<W> {
self.inner.flush()
}
}

impl<W: Seek> Seek for CountingCrcWriter<W> {
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
self.inner.seek(pos)
}

fn stream_position(&mut self) -> io::Result<u64> {
self.inner.stream_position()
}
}
6 changes: 3 additions & 3 deletions rust/src/sans_io/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -904,9 +904,9 @@ mod tests {
op::MESSAGE_INDEX,
op::DATA_END,
op::CHANNEL,
op::STATISTICS,
op::CHUNK_INDEX,
op::CHUNK_INDEX,
op::STATISTICS,
op::SUMMARY_OFFSET,
op::SUMMARY_OFFSET,
op::SUMMARY_OFFSET,
Expand Down Expand Up @@ -988,9 +988,9 @@ mod tests {
op::MESSAGE_INDEX,
op::DATA_END,
op::CHANNEL,
op::STATISTICS,
op::CHUNK_INDEX,
op::CHUNK_INDEX,
op::STATISTICS,
op::SUMMARY_OFFSET,
op::SUMMARY_OFFSET,
op::SUMMARY_OFFSET,
Expand Down Expand Up @@ -1034,9 +1034,9 @@ mod tests {
op::MESSAGE_INDEX,
op::DATA_END,
op::CHANNEL,
op::STATISTICS,
op::CHUNK_INDEX,
op::CHUNK_INDEX,
op::STATISTICS,
op::SUMMARY_OFFSET,
op::SUMMARY_OFFSET,
op::SUMMARY_OFFSET,
Expand Down
Loading
Loading