-
Notifications
You must be signed in to change notification settings - Fork 178
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Attachment api #613
Attachment api #613
Changes from 8 commits
0359739
e10eff2
3e9eb0a
4f7b886
091c84f
2371e83
3e96a0a
62633bf
dd35540
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,11 +11,12 @@ | |
// Contributors: | ||
// ZettaScale Zenoh Team, <[email protected]> | ||
// | ||
|
||
use alloc::{vec, vec::Vec}; | ||
use core::{ | ||
cmp::PartialEq, | ||
fmt, iter, | ||
ops::{Index, IndexMut}, | ||
ops::{Index, IndexMut, RangeBounds}, | ||
ptr, slice, | ||
}; | ||
|
||
|
@@ -112,6 +113,19 @@ impl<T> SingleOrVec<T> { | |
matches!(&self.0, SingleOrVecInner::Vec(v) if v.is_empty()) | ||
} | ||
|
||
fn vectorize(&mut self) -> &mut Vec<T> { | ||
if let SingleOrVecInner::Single(v) = &self.0 { | ||
unsafe { | ||
let v = core::ptr::read(v); | ||
core::ptr::write(&mut self.0, SingleOrVecInner::Vec(vec![v])) | ||
}; | ||
} | ||
let SingleOrVecInner::Vec(v) = &mut self.0 else { | ||
unsafe { core::hint::unreachable_unchecked() } | ||
}; | ||
v | ||
} | ||
|
||
pub fn get(&self, index: usize) -> Option<&T> { | ||
match &self.0 { | ||
SingleOrVecInner::Single(v) => (index == 0).then_some(v), | ||
|
@@ -139,6 +153,55 @@ impl<T> SingleOrVec<T> { | |
SingleOrVecInner::Vec(v) => v.last_mut(), | ||
} | ||
} | ||
pub fn drain<Range: RangeBounds<usize>>(&mut self, range: Range) -> Drain<T> { | ||
match &mut self.0 { | ||
this @ SingleOrVecInner::Single(_) if range.contains(&0) => Drain { | ||
inner: DrainInner::Single(this), | ||
}, | ||
SingleOrVecInner::Vec(vec) => Drain { | ||
inner: DrainInner::Vec(vec.drain(range)), | ||
}, | ||
_ => Drain { | ||
inner: DrainInner::Done, | ||
}, | ||
} | ||
} | ||
pub fn insert(&mut self, at: usize, value: T) { | ||
assert!(at <= self.len()); | ||
self.vectorize().insert(at, value); | ||
} | ||
} | ||
enum DrainInner<'a, T> { | ||
Vec(alloc::vec::Drain<'a, T>), | ||
Single(&'a mut SingleOrVecInner<T>), | ||
Done, | ||
} | ||
pub struct Drain<'a, T> { | ||
inner: DrainInner<'a, T>, | ||
} | ||
impl<'a, T> Iterator for Drain<'a, T> { | ||
type Item = T; | ||
|
||
fn next(&mut self) -> Option<Self::Item> { | ||
match &mut self.inner { | ||
DrainInner::Vec(drain) => drain.next(), | ||
DrainInner::Single(inner) => match unsafe { core::ptr::read(*inner) } { | ||
SingleOrVecInner::Single(value) => unsafe { | ||
core::ptr::write(*inner, SingleOrVecInner::Vec(Vec::new())); | ||
Some(value) | ||
}, | ||
SingleOrVecInner::Vec(_) => None, | ||
}, | ||
_ => None, | ||
} | ||
} | ||
} | ||
impl<'a, T> Drop for Drain<'a, T> { | ||
fn drop(&mut self) { | ||
if let DrainInner::Single(_) = self.inner { | ||
self.next(); | ||
} | ||
} | ||
} | ||
|
||
impl<T> Default for SingleOrVec<T> { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,24 +11,37 @@ | |
// Contributors: | ||
// ZettaScale Zenoh Team, <[email protected]> | ||
// | ||
|
||
use clap::Parser; | ||
use std::convert::TryInto; | ||
use zenoh::config::Config; | ||
use zenoh::prelude::sync::*; | ||
use zenoh::publication::CongestionControl; | ||
use zenoh::sample::AttachmentBuilder; | ||
use zenoh_examples::CommonArgs; | ||
|
||
fn main() { | ||
// initiate logging | ||
env_logger::init(); | ||
let (config, size, prio, print, number) = parse_args(); | ||
let args = Args::parse(); | ||
|
||
let mut prio = Priority::default(); | ||
if let Some(p) = args.priority { | ||
prio = p.try_into().unwrap(); | ||
} | ||
|
||
let data: Value = (0usize..size) | ||
let mut payload_size = args.payload_size; | ||
if args.attachments_number != 0 { | ||
let mut att_size = 2 * args.attachments_number; | ||
att_size += 2 + (core::mem::size_of::<usize>() * 8 - att_size.leading_zeros() as usize) / 7; | ||
payload_size -= dbg!(att_size); | ||
} | ||
|
||
let data: Value = (0usize..dbg!(payload_size)) | ||
.map(|i| (i % 10) as u8) | ||
.collect::<Vec<u8>>() | ||
.into(); | ||
|
||
let session = zenoh::open(config).res().unwrap(); | ||
let session = zenoh::open(args.common).res().unwrap(); | ||
|
||
let publisher = session | ||
.declare_publisher("test/thr") | ||
|
@@ -40,10 +53,28 @@ fn main() { | |
let mut count: usize = 0; | ||
let mut start = std::time::Instant::now(); | ||
loop { | ||
publisher.put(data.clone()).res().unwrap(); | ||
let attachments = (args.attachments_number != 0).then(|| { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest to remove the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm fine to move it to separate examples. |
||
if args.attach_with_insert { | ||
let mut attachments = AttachmentBuilder::new(); | ||
for _ in 0..args.attachments_number { | ||
attachments.insert(b"", b""); | ||
} | ||
attachments.into() | ||
} else { | ||
std::iter::repeat((b"".as_slice(), b"".as_slice())) | ||
.take(args.attachments_number) | ||
.collect() | ||
} | ||
}); | ||
|
||
if print { | ||
if count < number { | ||
let mut put = publisher.put(data.clone()); | ||
if let Some(att) = attachments { | ||
put = put.with_attachment(att) | ||
} | ||
put.res().unwrap(); | ||
|
||
if args.print { | ||
if count < args.number { | ||
count += 1; | ||
} else { | ||
let thpt = count as f64 / start.elapsed().as_secs_f64(); | ||
|
@@ -57,34 +88,25 @@ fn main() { | |
|
||
#[derive(Parser, Clone, PartialEq, Eq, Hash, Debug)] | ||
struct Args { | ||
#[arg(short, long)] | ||
/// Priority for sending data | ||
#[arg(short, long)] | ||
priority: Option<u8>, | ||
#[arg(short = 't', long)] | ||
/// Print the statistics | ||
#[arg(short = 't', long)] | ||
print: bool, | ||
#[arg(short, long, default_value = "100000")] | ||
/// Number of messages in each throughput measurements | ||
#[arg(short, long, default_value = "100000")] | ||
number: usize, | ||
/// The number of attachments in the message. | ||
/// | ||
/// The attachments will be sized such that the attachments replace the payload. | ||
#[arg(long, default_value = "0")] | ||
attachments_number: usize, | ||
/// Attach through insert rather than FromIterator | ||
#[arg(long)] | ||
attach_with_insert: bool, | ||
/// Sets the size of the payload to publish | ||
payload_size: usize, | ||
#[command(flatten)] | ||
common: CommonArgs, | ||
} | ||
|
||
fn parse_args() -> (Config, usize, Priority, bool, usize) { | ||
let args = Args::parse(); | ||
|
||
let mut prio = Priority::default(); | ||
if let Some(p) = args.priority { | ||
prio = p.try_into().unwrap(); | ||
} | ||
|
||
( | ||
args.common.into(), | ||
args.payload_size, | ||
prio, | ||
args.print, | ||
args.number, | ||
) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to check if
at
less than buffer sizeThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indeed, but computing
len
here is a bit expensive... that check can be done by checking forslice_index == usize::MAX
a bit later, adding to the todo list