Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attachment api #613

Merged
merged 9 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 94 additions & 4 deletions commons/zenoh-buffers/src/zbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
ZSlice,
};
use alloc::{sync::Arc, vec::Vec};
use core::{cmp, iter, mem, num::NonZeroUsize, ptr, slice};
use core::{cmp, iter, mem, num::NonZeroUsize, ops::RangeBounds, ptr};
use zenoh_collections::SingleOrVec;

fn get_mut_unchecked<T>(arc: &mut Arc<T>) -> &mut T {
Expand Down Expand Up @@ -55,6 +55,85 @@ impl ZBuf {
self.slices.push(zslice);
}
}

pub fn splice<Range: RangeBounds<usize>>(&mut self, erased: Range, replacement: &[u8]) {
let start = match erased.start_bound() {
core::ops::Bound::Included(n) => *n,
core::ops::Bound::Excluded(n) => n + 1,
core::ops::Bound::Unbounded => 0,
};
let end = match erased.end_bound() {
core::ops::Bound::Included(n) => n + 1,
core::ops::Bound::Excluded(n) => *n,
core::ops::Bound::Unbounded => self.len(),
};
if start != end {
self.remove(start, end);
}
self.insert(start, replacement);
}
fn remove(&mut self, mut start: usize, mut end: usize) {
assert!(start <= end);
assert!(end <= self.len());
let mut start_slice_idx = 0;
let mut start_idx_in_start_slice = 0;
let mut end_slice_idx = 0;
let mut end_idx_in_end_slice = 0;
for (i, slice) in self.slices.as_mut().iter_mut().enumerate() {
if slice.len() > start {
start_slice_idx = i;
start_idx_in_start_slice = start;
}
if slice.len() >= end {
end_slice_idx = i;
end_idx_in_end_slice = end;
break;
}
start -= slice.len();
end -= slice.len();
}
let start_slice = &mut self.slices.as_mut()[start_slice_idx];
start_slice.end = start_slice.start + start_idx_in_start_slice;
let drain_start = start_slice_idx + (start_slice.start < start_slice.end) as usize;
let end_slice = &mut self.slices.as_mut()[end_slice_idx];
end_slice.start += end_idx_in_end_slice;
let drain_end = end_slice_idx + (end_slice.start >= end_slice.end) as usize;
self.slices.drain(drain_start..drain_end);
}
fn insert(&mut self, mut at: usize, slice: &[u8]) {
if slice.is_empty() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to check if at less than buffer size

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed, but computing len here is a bit expensive... that check can be done by checking for slice_index == usize::MAX a bit later, adding to the todo list

return;
}
let old_at = at;
let mut slice_index = usize::MAX;
for (i, slice) in self.slices.as_ref().iter().enumerate() {
if at < slice.len() {
slice_index = i;
break;
}
if let Some(new_at) = at.checked_sub(slice.len()) {
at = new_at
} else {
panic!(
"Out of bounds insert attempted: at={old_at}, len={}",
self.len()
)
}
}
if at != 0 {
let split = &self.slices.as_ref()[slice_index];
let (l, r) = (
split.subslice(0, at).unwrap(),
split.subslice(at, split.len()).unwrap(),
);
self.slices.drain(slice_index..(slice_index + 1));
self.slices.insert(slice_index, l);
self.slices.insert(slice_index + 1, Vec::from(slice).into());
self.slices.insert(slice_index + 2, r);
} else {
self.slices.insert(slice_index, Vec::from(slice).into())
}
}
}

// Buffer
Expand All @@ -70,7 +149,7 @@ impl Buffer for ZBuf {

// SplitBuffer
impl SplitBuffer for ZBuf {
type Slices<'a> = iter::Map<slice::Iter<'a, ZSlice>, fn(&'a ZSlice) -> &'a [u8]>;
type Slices<'a> = iter::Map<core::slice::Iter<'a, ZSlice>, fn(&'a ZSlice) -> &'a [u8]>;

fn slices(&self) -> Self::Slices<'_> {
self.slices.as_ref().iter().map(ZSlice::as_slice)
Expand Down Expand Up @@ -380,9 +459,20 @@ impl<'a> HasWriter for &'a mut ZBuf {
type Writer = ZBufWriter<'a>;

fn writer(self) -> Self::Writer {
let mut cache = None;
if let Some(ZSlice { buf, end, .. }) = self.slices.last_mut() {
// Verify the ZSlice is actually a Vec<u8>
if let Some(b) = buf.as_any().downcast_ref::<Vec<u8>>() {
// Check for the length
if *end == b.len() {
cache = Some(unsafe { Arc::from_raw(Arc::into_raw(buf.clone()).cast()) })
}
}
}

ZBufWriter {
inner: self,
cache: Arc::new(Vec::new()),
cache: cache.unwrap_or_else(|| Arc::new(Vec::new())),
}
}
}
Expand Down Expand Up @@ -433,7 +523,7 @@ impl Writer for ZBufWriter<'_> {
}

fn write_u8(&mut self, byte: u8) -> Result<(), DidntWrite> {
self.write_exact(slice::from_ref(&byte))
self.write_exact(core::slice::from_ref(&byte))
}

fn remaining(&self) -> usize {
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-codec/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ where
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &NetworkMessage) -> Self::Output {
let NetworkMessage { body } = x;
let NetworkMessage { body, .. } = x;

match body {
NetworkBody::Push(b) => self.write(&mut *writer, b),
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-codec/src/scouting/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ where
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &ScoutingMessage) -> Self::Output {
let ScoutingMessage { body } = x;
let ScoutingMessage { body, .. } = x;

match body {
ScoutingBody::Scout(s) => self.write(&mut *writer, s),
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-codec/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ where
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &TransportMessage) -> Self::Output {
let TransportMessage { body } = x;
let TransportMessage { body, .. } = x;

match body {
TransportBody::Frame(b) => self.write(&mut *writer, b),
Expand Down
65 changes: 64 additions & 1 deletion commons/zenoh-collections/src/single_or_vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use alloc::{vec, vec::Vec};
use core::{
cmp::PartialEq,
fmt, iter,
ops::{Index, IndexMut},
ops::{Index, IndexMut, RangeBounds},
ptr, slice,
};

Expand Down Expand Up @@ -112,6 +113,19 @@ impl<T> SingleOrVec<T> {
matches!(&self.0, SingleOrVecInner::Vec(v) if v.is_empty())
}

fn vectorize(&mut self) -> &mut Vec<T> {
if let SingleOrVecInner::Single(v) = &self.0 {
unsafe {
let v = core::ptr::read(v);
core::ptr::write(&mut self.0, SingleOrVecInner::Vec(vec![v]))
};
}
let SingleOrVecInner::Vec(v) = &mut self.0 else {
unsafe { core::hint::unreachable_unchecked() }
};
v
}

pub fn get(&self, index: usize) -> Option<&T> {
match &self.0 {
SingleOrVecInner::Single(v) => (index == 0).then_some(v),
Expand Down Expand Up @@ -139,6 +153,55 @@ impl<T> SingleOrVec<T> {
SingleOrVecInner::Vec(v) => v.last_mut(),
}
}
pub fn drain<Range: RangeBounds<usize>>(&mut self, range: Range) -> Drain<T> {
match &mut self.0 {
this @ SingleOrVecInner::Single(_) if range.contains(&0) => Drain {
inner: DrainInner::Single(this),
},
SingleOrVecInner::Vec(vec) => Drain {
inner: DrainInner::Vec(vec.drain(range)),
},
_ => Drain {
inner: DrainInner::Done,
},
}
}
pub fn insert(&mut self, at: usize, value: T) {
assert!(at <= self.len());
self.vectorize().insert(at, value);
}
}
enum DrainInner<'a, T> {
Vec(alloc::vec::Drain<'a, T>),
Single(&'a mut SingleOrVecInner<T>),
Done,
}
pub struct Drain<'a, T> {
inner: DrainInner<'a, T>,
}
impl<'a, T> Iterator for Drain<'a, T> {
type Item = T;

fn next(&mut self) -> Option<Self::Item> {
match &mut self.inner {
DrainInner::Vec(drain) => drain.next(),
DrainInner::Single(inner) => match unsafe { core::ptr::read(*inner) } {
SingleOrVecInner::Single(value) => unsafe {
core::ptr::write(*inner, SingleOrVecInner::Vec(Vec::new()));
Some(value)
},
SingleOrVecInner::Vec(_) => None,
},
_ => None,
}
}
}
impl<'a, T> Drop for Drain<'a, T> {
fn drop(&mut self) {
if let DrainInner::Single(_) = self.inner {
self.next();
}
}
}

impl<T> Default for SingleOrVec<T> {
Expand Down
4 changes: 3 additions & 1 deletion commons/zenoh-protocol/src/zenoh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ pub mod ext {
}
}

/// 7 6 5 4 3 2 1 0
/// ```text
/// 7 6 5 4 3 2 1 0
/// +-+-+-+-+-+-+-+-+
/// % num elems %
/// +-------+-+-+---+
Expand All @@ -266,6 +267,7 @@ pub mod ext {
/// ~ val: <u8;z32> ~
/// +---------------+
/// ... -- N times (key, value) tuples
/// ```
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AttachmentType<const ID: u8> {
pub buffer: ZBuf,
Expand Down
76 changes: 49 additions & 27 deletions examples/examples/z_pub_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,37 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use clap::Parser;
use std::convert::TryInto;
use zenoh::config::Config;
use zenoh::prelude::sync::*;
use zenoh::publication::CongestionControl;
use zenoh::sample::AttachmentBuilder;
use zenoh_examples::CommonArgs;

fn main() {
// initiate logging
env_logger::init();
let (config, size, prio, print, number) = parse_args();
let args = Args::parse();

let mut prio = Priority::default();
if let Some(p) = args.priority {
prio = p.try_into().unwrap();
}

let data: Value = (0usize..size)
let mut payload_size = args.payload_size;
if args.attachments_number != 0 {
let mut att_size = 2 * args.attachments_number;
att_size += 2 + (core::mem::size_of::<usize>() * 8 - att_size.leading_zeros() as usize) / 7;
payload_size -= dbg!(att_size);
}

let data: Value = (0usize..dbg!(payload_size))
.map(|i| (i % 10) as u8)
.collect::<Vec<u8>>()
.into();

let session = zenoh::open(config).res().unwrap();
let session = zenoh::open(args.common).res().unwrap();

let publisher = session
.declare_publisher("test/thr")
Expand All @@ -40,10 +53,28 @@ fn main() {
let mut count: usize = 0;
let mut start = std::time::Instant::now();
loop {
publisher.put(data.clone()).res().unwrap();
let attachments = (args.attachments_number != 0).then(|| {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to remove the attachments from the z_pub_thr and to have it only on the z_pub/z_sub.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

z_sub example should also show how to use the attachment.

Copy link
Member

@sashacmc sashacmc Dec 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Mallets in zenoh-c I decided to move attachment usage to separate sample to don't overload main simple z_pub/z_sub samples.
Maybe we should do the same here too?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine to move it to separate examples.

if args.attach_with_insert {
let mut attachments = AttachmentBuilder::new();
for _ in 0..args.attachments_number {
attachments.insert(b"", b"");
}
attachments.into()
} else {
std::iter::repeat((b"".as_slice(), b"".as_slice()))
.take(args.attachments_number)
.collect()
}
});

if print {
if count < number {
let mut put = publisher.put(data.clone());
if let Some(att) = attachments {
put = put.with_attachment(att)
}
put.res().unwrap();

if args.print {
if count < args.number {
count += 1;
} else {
let thpt = count as f64 / start.elapsed().as_secs_f64();
Expand All @@ -57,34 +88,25 @@ fn main() {

#[derive(Parser, Clone, PartialEq, Eq, Hash, Debug)]
struct Args {
#[arg(short, long)]
/// Priority for sending data
#[arg(short, long)]
priority: Option<u8>,
#[arg(short = 't', long)]
/// Print the statistics
#[arg(short = 't', long)]
print: bool,
#[arg(short, long, default_value = "100000")]
/// Number of messages in each throughput measurements
#[arg(short, long, default_value = "100000")]
number: usize,
/// The number of attachments in the message.
///
/// The attachments will be sized such that the attachments replace the payload.
#[arg(long, default_value = "0")]
attachments_number: usize,
/// Attach through insert rather than FromIterator
#[arg(long)]
attach_with_insert: bool,
/// Sets the size of the payload to publish
payload_size: usize,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> (Config, usize, Priority, bool, usize) {
let args = Args::parse();

let mut prio = Priority::default();
if let Some(p) = args.priority {
prio = p.try_into().unwrap();
}

(
args.common.into(),
args.payload_size,
prio,
args.print,
args.number,
)
}
Loading
Loading