From 55ad5fea4ed1526cd3e7ca20b72863b01bbebe71 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Fri, 19 Apr 2024 11:51:16 +0200 Subject: [PATCH 1/5] feat: add Attachment to API --- examples/z_put.py | 6 ++- examples/z_sub.py | 7 ++-- src/lib.rs | 1 + src/session.rs | 25 +++++++++-- src/value.rs | 66 +++++++++++++++++++++++++++++ zenoh/session.py | 37 +++++++++++----- zenoh/value.py | 105 ++++++++++++++++++++++++++++++++++++++++------ 7 files changed, 217 insertions(+), 30 deletions(-) diff --git a/examples/z_put.py b/examples/z_put.py index f73a9c1e..007b8dda 100644 --- a/examples/z_put.py +++ b/examples/z_put.py @@ -61,6 +61,7 @@ key = args.key value = args.value + # Zenoh code --- --- --- --- --- --- --- --- --- --- --- def main(): # initiate logging @@ -70,7 +71,7 @@ def main(): session = zenoh.open(conf) print("Putting Data ('{}': '{}')...".format(key, value)) - session.put(key, value) + session.put(key, value, attachment={'key': b'value'}) # --- Examples of put with other types: @@ -103,4 +104,5 @@ def main(): session.close() -main() \ No newline at end of file + +main() diff --git a/examples/z_sub.py b/examples/z_sub.py index 65ba6d33..1576ab5f 100644 --- a/examples/z_sub.py +++ b/examples/z_sub.py @@ -58,6 +58,7 @@ conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen)) key = args.key + # Zenoh code --- --- --- --- --- --- --- --- --- --- --- def main(): # initiate logging @@ -68,10 +69,8 @@ def main(): print("Declaring Subscriber on '{}'...".format(key)) - def listener(sample: Sample): print(f">> [Subscriber] Received {sample.kind} ('{sample.key_expr}': '{sample.payload.decode('utf-8')}')") - # WARNING, you MUST store the return value in order for the subscription to work!! # This is because if you don't, the reference counter will reach 0 and the subscription @@ -86,4 +85,6 @@ def listener(sample: Sample): # the reference counter reaches 0 # sub.undeclare() # session.close() -main() \ No newline at end of file + + +main() diff --git a/src/lib.rs b/src/lib.rs index 7f2a573f..a35b873f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -74,6 +74,7 @@ fn zenoh(_py: Python, m: &Bound) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/src/session.rs b/src/session.rs index c71d8894..a458555d 100644 --- a/src/session.rs +++ b/src/session.rs @@ -34,7 +34,7 @@ use crate::enums::{ }; use crate::keyexpr::{_KeyExpr, _Selector}; use crate::queryable::{_Query, _Queryable}; -use crate::value::{_Hello, _Reply, _Sample, _Value, _ZenohId}; +use crate::value::{_Attachment, _Hello, _Reply, _Sample, _Value, _ZenohId}; use crate::{PyAnyToValue, PyExtract, ToPyErr}; #[pyclass(subclass)] @@ -88,6 +88,11 @@ impl _Session { Err(crate::ExtractError::Other(e)) => return Err(e), _ => {} } + match kwargs.extract_item::<_Attachment>("attachment") { + Ok(attachment) => builder = builder.with_attachment(attachment.0), + Err(crate::ExtractError::Other(e)) => return Err(e), + _ => {} + } } builder.res_sync().map_err(|e| e.to_pyerr()) } @@ -115,6 +120,11 @@ impl _Session { Err(crate::ExtractError::Other(e)) => return Err(e), _ => {} } + match kwargs.extract_item::<_Attachment>("attachment") { + Ok(attachment) => builder = builder.with_attachment(attachment.0), + Err(crate::ExtractError::Other(e)) => return Err(e), + _ => {} + } } builder.res_sync().map_err(|e| e.to_pyerr()) } @@ -144,6 +154,11 @@ impl _Session { Err(crate::ExtractError::Other(e)) => return Err(e), _ => {} } + match kwargs.extract_item::<_Attachment>("attachment") { + Ok(attachment) => builder = builder.with_attachment(attachment.0), + Err(crate::ExtractError::Other(e)) => return Err(e), + _ => {} + } } builder.res_sync().map_err(|e| e.to_pyerr()) } @@ -275,8 +290,12 @@ impl _Publisher { pub fn key_expr(&self) -> _KeyExpr { _KeyExpr(self.0.key_expr().clone()) } - pub fn put(&self, value: _Value) -> PyResult<()> { - self.0.put(value).res_sync().map_err(|e| e.to_pyerr()) + pub fn put(&self, value: _Value, attachment: Option<_Attachment>) -> PyResult<()> { + let mut builder = self.0.put(value); + if let Some(attachment) = attachment { + builder = builder.with_attachment(attachment.0); + } + builder.res_sync().map_err(|e| e.to_pyerr()) } pub fn delete(&self) -> PyResult<()> { self.0.delete().res_sync().map_err(|e| e.to_pyerr()) diff --git a/src/value.rs b/src/value.rs index abeb4b40..613a825f 100644 --- a/src/value.rs +++ b/src/value.rs @@ -11,7 +11,9 @@ // ZettaScale Zenoh team, use pyo3::{prelude::*, types::PyBytes}; +use std::collections::HashMap; use uhlc::Timestamp; +use zenoh::sample::{Attachment, AttachmentBuilder}; use zenoh::{ prelude::{Encoding, KeyExpr, Sample, Value, ZenohId}, query::Reply, @@ -188,6 +190,59 @@ impl _QoS { } } +#[pyclass(subclass)] +#[derive(Clone, Debug)] +pub struct _Attachment(pub Attachment); + +#[pymethods] +impl _Attachment { + #[new] + pub fn pynew(this: Self) -> Self { + this + } + + #[staticmethod] + fn new(attachment: HashMap, Vec>) -> Self { + Self(attachment.iter().map(|(k, v)| (&k[..], &v[..])).collect()) + } + + fn is_empty(&self) -> bool { + self.0.is_empty() + } + + fn len(&self) -> usize { + self.0.len() + } + + fn items(&self) -> HashMap, Vec> { + self.0 + .iter() + .map(|(k, v)| (k.to_vec(), v.to_vec())) + .collect() + } + + fn get(&self, key: Vec) -> Option> { + self.0.get(&key).map(|v| v.to_vec()) + } + + fn insert(&mut self, key: Vec, value: Vec) { + self.0.insert(&key, &value) + } + + fn extend(mut this: PyRefMut, attachment: HashMap, Vec>) -> PyRefMut { + this.0.extend( + attachment + .iter() + .map(|(k, v)| (&k[..], &v[..])) + .collect::(), + ); + this + } + fn as_str(&self) -> String { + format!("{:?}", self.0) + } +} + #[pyclass(subclass)] #[derive(Clone, Debug)] pub struct _Sample { @@ -196,6 +251,7 @@ pub struct _Sample { kind: _SampleKind, timestamp: Option<_Timestamp>, qos: _QoS, + attachment: Option<_Attachment>, } impl From for _Sample { fn from(sample: Sample) -> Self { @@ -205,6 +261,7 @@ impl From for _Sample { kind, timestamp, qos, + attachment, .. } = sample; _Sample { @@ -213,6 +270,7 @@ impl From for _Sample { kind: _SampleKind(kind), timestamp: timestamp.map(_Timestamp), qos: _QoS(qos), + attachment: attachment.map(_Attachment), } } } @@ -310,6 +368,10 @@ impl _Sample { pub fn timestamp(&self) -> Option<_Timestamp> { self.timestamp } + #[getter] + pub fn attachment(&self) -> Option<_Attachment> { + self.attachment.clone() + } #[staticmethod] pub fn new( key_expr: _KeyExpr, @@ -317,6 +379,7 @@ impl _Sample { qos: _QoS, kind: _SampleKind, timestamp: Option<_Timestamp>, + attachment: Option<_Attachment>, ) -> Self { _Sample { key_expr: key_expr.0, @@ -324,6 +387,7 @@ impl _Sample { qos, kind, timestamp, + attachment, } } fn __str__(&self) -> String { @@ -339,11 +403,13 @@ impl From<_Sample> for Sample { kind, timestamp, qos, + attachment, } = sample; let mut sample = Sample::new(key_expr, value); sample.kind = kind.0; sample.timestamp = timestamp.map(|t| t.0); sample.qos = qos.0; + sample.attachment = attachment.map(|a| a.0); sample } } diff --git a/zenoh/session.py b/zenoh/session.py index 02f62849..20839b19 100644 --- a/zenoh/session.py +++ b/zenoh/session.py @@ -19,7 +19,7 @@ from .config import Config from .closures import IntoHandler, Handler, Receiver from .enums import * -from .value import IntoValue, Value, Sample, Reply, ZenohId +from .value import IntoValue, Value, Sample, Reply, ZenohId, IntoAttachment, Attachment from .queryable import Queryable, Query @@ -29,9 +29,10 @@ class Publisher: def __init__(self, p: _Publisher): self._inner_ = p - def put(self, value: IntoValue, encoding: Encoding = None): - "An optimised version of ``session.put(self.key_expr, value, encoding=encoding)``" - self._inner_.put(Value(value, encoding)) + def put(self, value: IntoValue, encoding: Encoding = None, attachment: IntoAttachment = None): + "An optimised version of ``session.put(self.key_expr, value, encoding=encoding, attachment=attachment)``" + attachment = Attachment(attachment) if attachment is not None else attachment + self._inner_.put(Value(value, encoding), attachment) def delete(self): "An optimised version of ``session.delete(self.key_expr)``" @@ -100,6 +101,7 @@ class Session(_Session): Note that most applications will only need a single instance of ``Session``. You should _never_ construct one session per publisher/subscriber, as this will significantly increase the size of your Zenoh network, while preventing potential locality-based optimizations. """ + def __new__(cls, config: Union[Config, Any] = None): if config is None: return super().__new__(cls) @@ -110,7 +112,7 @@ def __new__(cls, config: Union[Config, Any] = None): def put(self, keyexpr: IntoKeyExpr, value: IntoValue, encoding=None, priority: Priority = None, congestion_control: CongestionControl = None, - sample_kind: SampleKind = None): + sample_kind: SampleKind = None, attachment: IntoAttachment = None): """ Sends a value over Zenoh. @@ -122,6 +124,7 @@ def put(self, keyexpr: IntoKeyExpr, value: IntoValue, encoding=None, :param priority: The priority to use when routing the published data :param congestion_control: The congestion control to use when routing the published data :param sample_kind: The kind of sample to send + :param attachment: The attachment to attach to the value sent :Examples: @@ -138,6 +141,8 @@ def put(self, keyexpr: IntoKeyExpr, value: IntoValue, encoding=None, kwargs['congestion_control'] = congestion_control if sample_kind is not None: kwargs['sample_kind'] = sample_kind + if attachment is not None: + kwargs['attachment'] = Attachment(attachment) return super().put(keyexpr, value, **kwargs) def config(self) -> Config: @@ -149,7 +154,8 @@ def config(self) -> Config: return super().config() def delete(self, keyexpr: IntoKeyExpr, - priority: Priority = None, congestion_control: CongestionControl = None): + priority: Priority = None, congestion_control: CongestionControl = None, + attachment: IntoAttachment = None): """ Deletes the values associated with the keys included in ``keyexpr``. @@ -159,6 +165,7 @@ def delete(self, keyexpr: IntoKeyExpr, :param keyexpr: The key expression to publish :param priority: The priority to use when routing the delete :param congestion_control: The congestion control to use when routing the delete + :param attachment: The attachment to attach to the request :Examples: @@ -172,9 +179,13 @@ def delete(self, keyexpr: IntoKeyExpr, kwargs['priority'] = priority if congestion_control is not None: kwargs['congestion_control'] = congestion_control + if attachment is not None: + kwargs['attachment'] = Attachment(attachment) return super().delete(keyexpr, **kwargs) - def get(self, selector: IntoSelector, handler: IntoHandler[Reply, Any, Receiver], consolidation: QueryConsolidation = None, target: QueryTarget = None, value: IntoValue = None) -> Receiver: + def get(self, selector: IntoSelector, handler: IntoHandler[Reply, Any, Receiver], + consolidation: QueryConsolidation = None, target: QueryTarget = None, value: IntoValue = None, + attachment: IntoAttachment = None) -> Receiver: """ Emits a query, which queryables with intersecting selectors will be able to reply to. @@ -187,6 +198,7 @@ def get(self, selector: IntoSelector, handler: IntoHandler[Reply, Any, Receiver] :param consolidation: The consolidation to apply to replies :param target: The queryables that should be target to this query :param value: An optional value to attach to this query + :param attachment: An optional attachment to attach to this query :return: The receiver of the handler :rtype: Receiver @@ -225,6 +237,8 @@ def get(self, selector: IntoSelector, handler: IntoHandler[Reply, Any, Receiver] kwargs["target"] = target if value is not None: kwargs["value"] = Value(value) + if attachment is not None: + kwargs["attachment"] = Attachment(attachment) super().get(Selector(selector), handler.closure, **kwargs) return handler.receiver @@ -276,7 +290,8 @@ def declare_queryable(self, keyexpr: IntoKeyExpr, handler: IntoHandler[Query, An inner = super().declare_queryable(KeyExpr(keyexpr), handler.closure, **kwargs) return Queryable(inner, handler.receiver) - def declare_publisher(self, keyexpr: IntoKeyExpr, priority: Priority = None, congestion_control: CongestionControl = None): + def declare_publisher(self, keyexpr: IntoKeyExpr, priority: Priority = None, + congestion_control: CongestionControl = None): """ Declares a publisher, which may be used to send values repeatedly onto a same key expression. @@ -302,7 +317,8 @@ def declare_publisher(self, keyexpr: IntoKeyExpr, priority: Priority = None, con kwargs['congestion_control'] = congestion_control return Publisher(super().declare_publisher(KeyExpr(keyexpr), **kwargs)) - def declare_subscriber(self, keyexpr: IntoKeyExpr, handler: IntoHandler[Sample, Any, Any], reliability: Reliability = None) -> Subscriber: + def declare_subscriber(self, keyexpr: IntoKeyExpr, handler: IntoHandler[Sample, Any, Any], + reliability: Reliability = None) -> Subscriber: """ Declares a subscriber, which will receive any published sample with a key expression intersecting ``keyexpr``. @@ -342,7 +358,8 @@ def declare_subscriber(self, keyexpr: IntoKeyExpr, handler: IntoHandler[Sample, s = super().declare_subscriber(KeyExpr(keyexpr), handler.closure, **kwargs) return Subscriber(s, handler.receiver) - def declare_pull_subscriber(self, keyexpr: IntoKeyExpr, handler: IntoHandler[Sample, Any, Any], reliability: Reliability = None) -> PullSubscriber: + def declare_pull_subscriber(self, keyexpr: IntoKeyExpr, handler: IntoHandler[Sample, Any, Any], + reliability: Reliability = None) -> PullSubscriber: """ Declares a pull-mode subscriber, which will receive a single published sample with a key expression intersecting ``keyexpr`` any time its ``pull`` method is called. diff --git a/zenoh/value.py b/zenoh/value.py index 22ca40f2..7b5e4e77 100644 --- a/zenoh/value.py +++ b/zenoh/value.py @@ -12,36 +12,41 @@ # ZettaScale Zenoh Team, # import abc -from typing import Union, Tuple, Optional, List +from typing import Union, Tuple, Optional, List, Dict import json from .enums import Encoding, SampleKind, Priority, CongestionControl -from .zenoh import _Value, _Encoding, _Sample, _SampleKind, _Reply, _ZenohId, _Timestamp, _Hello, _QoS +from .zenoh import _Value, _Encoding, _Sample, _SampleKind, _Reply, _ZenohId, _Timestamp, _Hello, _QoS, _Attachment from .keyexpr import KeyExpr, IntoKeyExpr + class IValue: "The IValue interface exposes how to recover a value's payload in a binary-serialized format, as well as that format's encoding." + @property @abc.abstractmethod def payload(self) -> bytes: "The value itself, as an array of bytes" ... - + @property @abc.abstractmethod def encoding(self) -> Encoding: "The value's encoding" ... + IntoValue = Union[IValue, bytes, str, int, float, object] + class Value(_Value, IValue): """ A Value is a pair of a binary payload, and a mime-type-like encoding string. When constructed with ``encoding==None``, the encoding will be selected depending on the payload's type. """ - def __new__(cls, payload: IntoValue, encoding: Encoding=None): + + def __new__(cls, payload: IntoValue, encoding: Encoding = None): if encoding is None: if isinstance(payload, Value): return payload @@ -50,7 +55,7 @@ def __new__(cls, payload: IntoValue, encoding: Encoding=None): if not isinstance(payload, bytes): raise TypeError("`encoding` was passed, but `payload` is not of type `bytes`") return Value.new(payload, encoding) - + @staticmethod def autoencode(value: IntoValue) -> 'Value': "Automatically encodes the value based on its type" @@ -65,7 +70,7 @@ def autoencode(value: IntoValue) -> 'Value': if isinstance(value, float): return Value.new(f"{value}".encode(), Encoding.APP_FLOAT()) return Value.new(json.dumps(value).encode(), Encoding.APP_JSON()) - + @staticmethod def new(payload: bytes, encoding: Encoding = None) -> 'Value': return Value._upgrade_(_Value.new(payload, encoding)) @@ -94,31 +99,39 @@ def _upgrade_(inner: _Value) -> 'Value': return inner return _Value.__new__(Value, inner) + class ZenohId(_ZenohId): """A Zenoh UUID""" + @staticmethod def _upgrade_(this: _ZenohId) -> 'ZenohId': return _ZenohId.__new__(ZenohId, this) + def __str__(self) -> str: return super().__str__() + def __repr__(self) -> str: return str(self) + class Timestamp(_Timestamp): """ A timestamp taken from the Zenoh HLC (Hybrid Logical Clock). These timestamps are guaranteed to be unique, as each machine annotates its perceived time with a UUID, which is used as the least significant part of the comparison operation. """ + @staticmethod def _upgrade_(this: _Timestamp) -> 'Timestamp': return _Timestamp.__new__(Timestamp, this) + @property def get_time(self) -> int: """ Returns the time part, as generated by the Zenoh HLC in NTP64 format (See https://datatracker.ietf.org/doc/html/rfc5905#section-6). """ return super().time + @property def seconds_since_unix_epoch(self) -> float: """ @@ -129,90 +142,151 @@ def seconds_since_unix_epoch(self) -> float: """ return super().seconds_since_unix_epoch + class QoS(_QoS): """ Quality of Service settings. """ + def __new__(cls): return super().new() + @property def priority(self) -> Priority: "Priority" return Priority(super().priority) + @property def congestion_control(self) -> CongestionControl: "Congestion control" return CongestionControl(super().congestion_control) + @property def express(self) -> bool: "Express flag: if True, the message is not batched during transmission, in order to reduce latency." return super().express + @staticmethod def _upgrade_(inner: _QoS) -> 'QoS': if isinstance(inner, QoS): return inner return _QoS.__new__(QoS, inner) - + + QoS.DEFAULT = QoS() - + +IntoAttachment = Dict[Union[str, bytes], Union[str, bytes]] + + +def _into_bytes(v: Union[str, bytes]) -> bytes: + return v if isinstance(v, bytes) else v.encode() + + +class Attachment(_Attachment): + def __new__(cls, into: IntoAttachment): + return Attachment._upgrade_(super().new({_into_bytes(k): _into_bytes(v) for k, v in into.items()})) + + def get(self, key: Union[str, bytes]) -> Optional[bytes]: + return super().get(_into_bytes(key)) + + def insert(self, key: Union[str, bytes], value: Union[str, bytes]): + return super().insert(_into_bytes(key), _into_bytes(value)) + + def extend(self, into: IntoAttachment) -> 'Attachment': + return super().extend({_into_bytes(k): _into_bytes(v) for k, v in into.items()}) + + def items(self) -> Dict[bytes, bytes]: + return super().items() + + def __str__(self) -> str: + return super().as_str() + + @staticmethod + def _upgrade_(inner: _Attachment) -> 'Attachment': + if isinstance(inner, Attachment): + return inner + return _Attachment.__new__(Attachment, inner) + IntoSample = Union[_Sample, Tuple[IntoKeyExpr, IntoValue, SampleKind], Tuple[KeyExpr, IntoValue]] + + class Sample(_Sample): """ A KeyExpr-Value pair, annotated with the kind (PUT or DELETE) of publication used to emit it and a timestamp. """ - def __new__(cls, key: IntoKeyExpr, value: IntoValue, kind: SampleKind = None, qos:QoS = None, timestamp: Timestamp = None): + + def __new__(cls, key: IntoKeyExpr, value: IntoValue, kind: SampleKind = None, qos: QoS = None, + timestamp: Timestamp = None, attachment: IntoAttachment = None): kind = _SampleKind.PUT if kind is None else kind qos = QoS.DEFAULT if qos is None else qos - return Sample._upgrade_(super().new(KeyExpr(key), Value(value), qos, kind, timestamp)) + attachment = Attachment(attachment) if attachment is not None else attachment + return Sample._upgrade_(super().new(KeyExpr(key), Value(value), qos, kind, timestamp, attachment)) + @property def key_expr(self) -> KeyExpr: "The sample's key expression" return KeyExpr(super().key_expr) + @property def value(self) -> Value: "The sample's value" return Value._upgrade_(super().value) + @property def payload(self) -> bytes: "A shortcut to ``self.value.payload``" return super().payload + @property def encoding(self) -> Encoding: "A shortcut to ``self.value.encoding``" return Encoding(super().encoding) + @property def kind(self) -> SampleKind: "The sample's kind" return SampleKind(super().kind) + @property def timestamp(self) -> Optional[Timestamp]: "The sample's timestamp. May be None." ts = super().timestamp return None if ts is None else Timestamp._upgrade_(ts) + @property def qos(self) -> QoS: "Quality of service settings the sample was sent with" return QoS._upgrade_(super().qos) + + @property + def attachment(self) -> Attachment: + """The sample attachment""" + attachment = super().attachment + return Attachment._upgrade_(attachment) if attachment is not None else None + @staticmethod def _upgrade_(inner: _Sample) -> 'Sample': if isinstance(inner, Sample): return inner return _Sample.__new__(Sample, inner) + class Reply(_Reply): """ A reply to a query (``Session.get``). A single query can result in multiple replies from multiple queryables. """ + def __new__(cls, inner: _Reply): return super().__new__(cls, inner) + @property def replier_id(self) -> ZenohId: "The reply's sender's id." return ZenohId._upgrade_(super().replier_id) - + @property def is_ok(self) -> bool: """ @@ -230,6 +304,7 @@ def ok(self) -> Sample: Raises a ``ZError`` if the ``self`` is actually an ``err`` reply. """ return Sample._upgrade_(super().ok) + @property def err(self) -> Value: """ @@ -239,25 +314,31 @@ def err(self) -> Value: """ return Value._upgrade_(super().err) + class Hello(_Hello): "Represents a single Zenoh node discovered through scouting." + @property def zid(self) -> ZenohId: "The node's Zenoh UUID." zid = super().zid return None if zid is None else ZenohId._upgrade_(zid) + @property def whatami(self) -> str: "The node's type, returning either None, 'peer', 'router', or 'client'." return super().whatami + @property def locators(self) -> List[str]: "The locators through which this node may be adressed." return super().locators + @staticmethod def _upgrade_(inner: _Hello) -> 'Sample': if isinstance(inner, Hello): return inner return _Hello.__new__(Hello, inner) + def __str__(self): - return super().__str__() \ No newline at end of file + return super().__str__() From 5e79d83010650426a0125fcbfdafce1d61e441ff Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Mon, 22 Apr 2024 11:27:40 +0200 Subject: [PATCH 2/5] fix: make attachment a CLI arg in z_put example --- examples/z_put.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/examples/z_put.py b/examples/z_put.py index 007b8dda..e846ab42 100644 --- a/examples/z_put.py +++ b/examples/z_put.py @@ -44,6 +44,9 @@ default='Put from Python!', type=str, help='The value to write.') +parser.add_argument('--attach', '-a', dest='attach', + type=str, + help='The key-values to attach') parser.add_argument('--config', '-c', dest='config', metavar='FILE', type=str, @@ -60,6 +63,13 @@ conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen)) key = args.key value = args.value +attachment = args.attach +if attachment is not None: + attachment = { + k: v + for pair in attachment.split("&") + for (k, v) in [pair.split("=")] + } # Zenoh code --- --- --- --- --- --- --- --- --- --- --- @@ -71,7 +81,7 @@ def main(): session = zenoh.open(conf) print("Putting Data ('{}': '{}')...".format(key, value)) - session.put(key, value, attachment={'key': b'value'}) + session.put(key, value, attachment=attachment) # --- Examples of put with other types: From ce7a6d6658dd3d824f952b07330844c3053254f1 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Mon, 22 Apr 2024 15:03:17 +0200 Subject: [PATCH 3/5] Update zenoh/session.py Co-authored-by: Luca Cominardi --- zenoh/session.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenoh/session.py b/zenoh/session.py index 20839b19..4aaf3322 100644 --- a/zenoh/session.py +++ b/zenoh/session.py @@ -124,7 +124,7 @@ def put(self, keyexpr: IntoKeyExpr, value: IntoValue, encoding=None, :param priority: The priority to use when routing the published data :param congestion_control: The congestion control to use when routing the published data :param sample_kind: The kind of sample to send - :param attachment: The attachment to attach to the value sent + :param attachment: The attachment to send along with the put :Examples: From 168a00a4bc7afe1ab5d666d0ef465126cdfffd86 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Mon, 22 Apr 2024 15:11:47 +0200 Subject: [PATCH 4/5] Apply suggestions from code review Co-authored-by: Luca Cominardi --- zenoh/session.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zenoh/session.py b/zenoh/session.py index 4aaf3322..f49a6644 100644 --- a/zenoh/session.py +++ b/zenoh/session.py @@ -165,7 +165,7 @@ def delete(self, keyexpr: IntoKeyExpr, :param keyexpr: The key expression to publish :param priority: The priority to use when routing the delete :param congestion_control: The congestion control to use when routing the delete - :param attachment: The attachment to attach to the request + :param attachment: The attachment to send along with the delete :Examples: @@ -198,7 +198,7 @@ def get(self, selector: IntoSelector, handler: IntoHandler[Reply, Any, Receiver] :param consolidation: The consolidation to apply to replies :param target: The queryables that should be target to this query :param value: An optional value to attach to this query - :param attachment: An optional attachment to attach to this query + :param attachment: An optional attachment to send along with this query :return: The receiver of the handler :rtype: Receiver From 215121de668a8d45431e3987a0fd87560d1a20b4 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Mon, 22 Apr 2024 18:14:01 +0200 Subject: [PATCH 5/5] test: add test for attachment --- src/value.rs | 16 ++++++++++------ tests/test_session.py | 44 ++++++++++++++++++++++++++++++++++++++----- zenoh/value.py | 2 +- 3 files changed, 50 insertions(+), 12 deletions(-) diff --git a/src/value.rs b/src/value.rs index 613a825f..f992bce4 100644 --- a/src/value.rs +++ b/src/value.rs @@ -10,7 +10,10 @@ // Contributors: // ZettaScale Zenoh team, -use pyo3::{prelude::*, types::PyBytes}; +use pyo3::{ + prelude::*, + types::{PyBytes, PyDict}, +}; use std::collections::HashMap; use uhlc::Timestamp; use zenoh::sample::{Attachment, AttachmentBuilder}; @@ -214,11 +217,12 @@ impl _Attachment { self.0.len() } - fn items(&self) -> HashMap, Vec> { - self.0 - .iter() - .map(|(k, v)| (k.to_vec(), v.to_vec())) - .collect() + fn items<'py>(&self, py: Python<'py>) -> PyResult> { + let items = PyDict::new_bound(py); + for (k, v) in self.0.iter() { + items.set_item(PyBytes::new_bound(py, &k), PyBytes::new_bound(py, &v))?; + } + Ok(items) } fn get(&self, key: Vec) -> Option> { diff --git a/tests/test_session.py b/tests/test_session.py index 4833323f..c3d45310 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -1,7 +1,7 @@ import zenoh import json from zenoh import Session, Query, Sample, Priority, CongestionControl -from typing import List, Tuple +from typing import List, Tuple, Optional import time SLEEP = 1 @@ -92,9 +92,9 @@ def sub_callback(sample: Sample): nonlocal num_received nonlocal num_errors if sample.key_expr != keyexpr \ - or sample.qos.priority != Priority.DATA_HIGH() \ - or sample.qos.congestion_control != CongestionControl.BLOCK() \ - or sample.payload != msg: + or sample.qos.priority != Priority.DATA_HIGH() \ + or sample.qos.congestion_control != CongestionControl.BLOCK() \ + or sample.payload != msg: num_errors += 1 num_received += 1 @@ -124,9 +124,43 @@ def sub_callback(sample: Sample): subscriber.undeclare() +def run_session_attachment(peer01, peer02): + keyexpr = "test_attachment/session" + + last_sample: Optional[Sample] = None + + def callback(sample: Sample): + nonlocal last_sample + last_sample = sample + + print("[A][01d] Publisher on peer01 session"); + publisher = peer01.declare_publisher(keyexpr) + time.sleep(SLEEP) + + print("[A][02d] Publisher on peer01 session"); + subscriber = peer02.declare_subscriber(keyexpr, callback) + time.sleep(SLEEP) + + publisher.put("no attachment") + time.sleep(SLEEP) + assert last_sample is not None + assert last_sample.attachment is None + + publisher.put("attachment", attachment={"key1": "value1", b"key2": b"value2"}) + time.sleep(SLEEP) + assert last_sample.attachment is not None + assert last_sample.attachment.items() == {b"key1": b"value1", b"key2": b"value2"} + + print("[A][03d] Undeclare publisher on peer01 session"); + publisher.undeclare() + print("[A][04d] Undeclare subscriber on peer02 session"); + subscriber.undeclare() + + def test_session(): zenoh.init_logger() (peer01, peer02) = open_session(["tcp/127.0.0.1:17447"]) run_session_qryrep(peer01, peer02) run_session_pubsub(peer01, peer02) - close_session(peer01, peer02) \ No newline at end of file + run_session_attachment(peer01, peer02) + close_session(peer01, peer02) diff --git a/zenoh/value.py b/zenoh/value.py index 7b5e4e77..cd92c159 100644 --- a/zenoh/value.py +++ b/zenoh/value.py @@ -260,7 +260,7 @@ def qos(self) -> QoS: return QoS._upgrade_(super().qos) @property - def attachment(self) -> Attachment: + def attachment(self) -> Optional[Attachment]: """The sample attachment""" attachment = super().attachment return Attachment._upgrade_(attachment) if attachment is not None else None