Skip to content

Commit

Permalink
update docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
james-rms committed Jan 6, 2025
1 parent 476408f commit d588217
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 50 deletions.
86 changes: 41 additions & 45 deletions rust/src/sans_io/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,14 +751,13 @@ mod tests {
use std::collections::BTreeMap;
use std::io::Read;

fn basic_chunked_file(compression: Option<Compression>) -> Vec<u8> {
fn basic_chunked_file(compression: Option<Compression>) -> McapResult<Vec<u8>> {
let mut buf = std::io::Cursor::new(Vec::new());
{
let mut writer = crate::WriteOptions::new()
.compression(compression)
.chunk_size(None)
.create(&mut buf)
.expect("could not construct writer");
.create(&mut buf)?;
let channel = std::sync::Arc::new(crate::Channel {
id: 0,
topic: "chat".to_owned(),
Expand All @@ -767,22 +766,20 @@ mod tests {
metadata: BTreeMap::new(),
});
for n in 0..3 {
writer
.write(&crate::Message {
channel: channel.clone(),
sequence: n,
log_time: n as u64,
publish_time: n as u64,
data: (&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]).into(),
})
.expect("could not construct channel");
writer.write(&crate::Message {
channel: channel.clone(),
sequence: n,
log_time: n as u64,
publish_time: n as u64,
data: (&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]).into(),
})?;
if n == 1 {
writer.flush().expect("failed to flush");
writer.flush()?;
}
}
writer.finish().expect("failed to finish");
writer.finish()?;
}
buf.into_inner()
Ok(buf.into_inner())
}

#[test]
Expand Down Expand Up @@ -845,50 +842,50 @@ mod tests {
}

#[test]
fn test_file_data_validation() {
fn test_file_data_validation() -> McapResult<()> {
let mut reader = LinearReader::new_with_options(
LinearReaderOptions::default()
.with_validate_data_section_crc(true)
.with_validate_summary_section_crc(true),
);
let mut cursor = std::io::Cursor::new(basic_chunked_file(None));
let mut cursor = std::io::Cursor::new(basic_chunked_file(None)?);
let mut opcodes: Vec<u8> = Vec::new();
let mut iter_count = 0;
while let Some(action) = reader.next_action() {
match action.expect("failed to get next action") {
match action? {
ReadAction::NeedMore(n) => {
let written = cursor
.read(reader.insert(n))
.expect("failed to read from buffer");
let written = cursor.read(reader.insert(n))?;
reader.set_written(written);
}
ReadAction::GetRecord { data, opcode } => {
opcodes.push(opcode);
parse_record(opcode, data).expect("failed to parse record");
parse_record(opcode, data)?;
}
}
iter_count += 1;
// guard against infinite loop
assert!(iter_count < 10000);
}
Ok(())
}

fn test_chunked(compression: Option<Compression>, options: LinearReaderOptions) {
fn test_chunked(
compression: Option<Compression>,
options: LinearReaderOptions,
) -> McapResult<()> {
let mut reader = LinearReader::new_with_options(options);
let mut cursor = std::io::Cursor::new(basic_chunked_file(compression));
let mut cursor = std::io::Cursor::new(basic_chunked_file(compression)?);
let mut opcodes: Vec<u8> = Vec::new();
let mut iter_count = 0;
while let Some(action) = reader.next_action() {
match action.expect("failed to get next action") {
match action? {
ReadAction::NeedMore(n) => {
let written = cursor
.read(reader.insert(n))
.expect("failed to read from buffer");
let written = cursor.read(reader.insert(n))?;
reader.set_written(written);
}
ReadAction::GetRecord { data, opcode } => {
opcodes.push(opcode);
parse_record(opcode, data).expect("failed to parse record");
parse_record(opcode, data)?;
}
}
iter_count += 1;
Expand Down Expand Up @@ -916,6 +913,7 @@ mod tests {
op::FOOTER
]
);
Ok(())
}
use paste::paste;

Expand All @@ -924,7 +922,7 @@ mod tests {
$(
paste! {
#[test]
fn [ <test_chunked_ $name> ]() {
fn [ <test_chunked_ $name> ]() -> McapResult<()> {
test_chunked($compression, $options)
}
}
Expand All @@ -946,12 +944,12 @@ mod tests {
}

#[test]
fn test_no_magic() {
fn test_no_magic() -> McapResult<()> {
for options in [
LinearReaderOptions::default().with_skip_start_magic(true),
LinearReaderOptions::default().with_skip_end_magic(true),
] {
let mcap = basic_chunked_file(None);
let mcap = basic_chunked_file(None)?;
let input = if options.skip_start_magic {
&mcap[8..]
} else if options.skip_end_magic {
Expand All @@ -964,16 +962,14 @@ mod tests {
let mut opcodes: Vec<u8> = Vec::new();
let mut iter_count = 0;
while let Some(action) = reader.next_action() {
match action.expect("failed to get next action") {
match action? {
ReadAction::NeedMore(n) => {
let written = cursor
.read(reader.insert(n))
.expect("failed to read from buffer");
let written = cursor.read(reader.insert(n))?;
reader.set_written(written);
}
ReadAction::GetRecord { data, opcode } => {
opcodes.push(opcode);
parse_record(opcode, data).expect("failed to parse record");
parse_record(opcode, data)?;
}
}
iter_count += 1;
Expand Down Expand Up @@ -1002,27 +998,26 @@ mod tests {
]
);
}
Ok(())
}

#[test]
fn test_emit_chunks() {
let mcap = basic_chunked_file(None);
fn test_emit_chunks() -> McapResult<()> {
let mcap = basic_chunked_file(None)?;
let mut reader =
LinearReader::new_with_options(LinearReaderOptions::default().with_emit_chunks(true));
let mut cursor = std::io::Cursor::new(mcap);
let mut opcodes: Vec<u8> = Vec::new();
let mut iter_count = 0;
while let Some(action) = reader.next_action() {
match action.expect("failed to get next action") {
match action? {
ReadAction::NeedMore(n) => {
let written = cursor
.read(reader.insert(n))
.expect("failed to read from buffer");
let written = cursor.read(reader.insert(n))?;
reader.set_written(written);
}
ReadAction::GetRecord { data, opcode } => {
opcodes.push(opcode);
parse_record(opcode, data).expect("failed to parse record");
parse_record(opcode, data)?;
}
}
iter_count += 1;
Expand All @@ -1048,6 +1043,7 @@ mod tests {
op::FOOTER
]
);
Ok(())
}

// Ensures that the internal buffer for the linear reader gets compacted regularly and does not
Expand All @@ -1063,12 +1059,12 @@ mod tests {
.chunk_size(None)
.create(&mut cursor)?;
let channel = std::sync::Arc::new(crate::Channel {
id: 0,
topic: "chat".to_owned(),
schema: None,
message_encoding: "json".to_owned(),
metadata: BTreeMap::new(),
});
writer.add_channel(&channel)?;
for n in 0..3 {
writer.write(&crate::Message {
channel: channel.clone(),
Expand Down
25 changes: 20 additions & 5 deletions rust/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,15 @@ impl<W: Write + Seek> Writer<W> {
})
}

// Adds a schema, returning its ID. If a schema with the same content has been added already,
// its ID is returned.
/// Adds a schema, returning its ID. If a schema with the same content has been added already,
/// its ID is returned.
///
/// * `name`: an identifier for the schema.
/// * `encoding`: Describes the schema format. The [well-known schema
/// encodings](https://mcap.dev/spec/registry#well-known-schema-encodings) are preferred. An
/// empty string indicates no schema is available.
/// * `data`: The serialized schema content. If `encoding` is an empty string, `data` should
/// have zero length.
pub fn add_schema(&mut self, name: &str, encoding: &str, data: &[u8]) -> McapResult<u16> {
if let Some(&id) = self.schemas.get_by_left(&SchemaContent {
name: name.into(),
Expand Down Expand Up @@ -309,9 +316,14 @@ impl<W: Write + Seek> Writer<W> {
/// Adds a channel, returning its ID. If a channel with equivalent content was added previously,
/// its ID is returned.
///
/// Provide a schema_id returned from [`Self::add_schema`], or 0 if the channel has no schema.
/// Useful with subequent calls to [`write_to_known_channel()`](Self::write_to_known_channel).
///
/// Useful with subequent calls to [`write_to_known_channel()`](Self::write_to_known_channel)
/// * `schema_id`: a schema_id returned from [`Self::add_schema`], or 0 if the channel has no
/// schema.
/// * `topic`: The topic name.
/// * `message_encoding`: Encoding for messages on this channel. The [well-known message
/// encodings](https://mcap.dev/spec/registry#well-known-message-encodings) are preferred.
/// * `metadata`: Metadata about this channel.
pub fn add_channel(
&mut self,
schema_id: u16,
Expand Down Expand Up @@ -580,7 +592,8 @@ impl<W: Write + Seek> Writer<W> {
Ok(())
}

/// Write an attachment to the MCAP file
/// Write an attachment to the MCAP file. This finishes any current chunk before writing the
/// attachment.
pub fn attach(&mut self, attachment: &Attachment) -> McapResult<()> {
let header = records::AttachmentHeader {
log_time: attachment.log_time,
Expand All @@ -596,6 +609,8 @@ impl<W: Write + Seek> Writer<W> {
Ok(())
}

/// Write a [Metadata](https://mcap.dev/spec#metadata-op0x0c) record to the MCAP file. This
/// finishes any current chunk before writing the metadata.
pub fn write_metadata(&mut self, metadata: &Metadata) -> McapResult<()> {
let w = self.finish_chunk()?;
let offset = w.stream_position()?;
Expand Down

0 comments on commit d588217

Please sign in to comment.