Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port to recent zenoh sync Rust API #37

Merged
merged 2 commits into from
Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ crate-type = ["cdylib"]
[dependencies]
zenoh = { git = "https://github.com/eclipse-zenoh/zenoh" }
async-std = "=1.9.0"
uhlc = "0.2"
uhlc = "0.3"
futures = "0.3.12"
log = "0.4"
env_logger = "0.8.2"
Expand Down
4 changes: 3 additions & 1 deletion examples/zenoh/z_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ def eval_callback(get_request):
name = dataset[0].value.get_content()

print(' >> Replying string: "Eval from {}"'.format(name))
get_request.reply(path, 'Eval from {}'.format(name))
get_request.reply(path, '#1Eval from {}'.format(name))
get_request.reply(path, '#2Eval from {}'.format(name))
get_request.reply(path, '#3Eval from {}'.format(name))


print("Register eval for '{}'...".format(path))
Expand Down
2 changes: 1 addition & 1 deletion src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ impl GetRequest {
fn reply(&self, path: String, value: &PyAny) -> PyResult<()> {
let p = path_of_string(path)?;
let v = zvalue_of_pyany(value)?;
task::block_on(self.r.reply(p, v));
self.r.reply(p, v);
Ok(())
}
}
Expand Down
44 changes: 23 additions & 21 deletions src/workspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use futures::select;
use log::warn;
use pyo3::prelude::*;
use pyo3::types::PyTuple;
use zenoh::net::Receiver;
use zenoh::ZFuture;

/// A Workspace to operate on zenoh.
///
Expand Down Expand Up @@ -81,7 +83,7 @@ impl Workspace {
fn put(&self, path: String, value: &PyAny) -> PyResult<()> {
let p = path_of_string(path)?;
let v = zvalue_of_pyany(value)?;
task::block_on(self.w.put(&p, v)).map_err(to_pyerr)
self.w.put(&p, v).wait().map_err(to_pyerr)
}

/// Delete a path and its value from zenoh.
Expand All @@ -102,7 +104,7 @@ impl Workspace {
#[text_signature = "(self, path)"]
fn delete(&self, path: String) -> PyResult<()> {
let p = path_of_string(path)?;
task::block_on(self.w.delete(&p)).map_err(to_pyerr)
self.w.delete(&p).wait().map_err(to_pyerr)
}

/// Get a selection of path/value from zenoh.
Expand All @@ -126,14 +128,12 @@ impl Workspace {
#[text_signature = "(self, selector)"]
fn get(&self, selector: String) -> PyResult<Vec<Data>> {
let s = selector_of_string(selector)?;
task::block_on(async {
let mut data_stream = self.w.get(&s).await.map_err(to_pyerr)?;
let mut result = vec![];
while let Some(d) = data_stream.next().await {
result.push(Data { d })
}
Ok(result)
})
let data_stream = self.w.get(&s).wait().map_err(to_pyerr)?;
let mut result = vec![];
while let Ok(d) = data_stream.recv() {
result.push(Data { d })
}
Ok(result)
}

/// Subscribe to changes for a selection of path/value (specified via a selector) from zenoh.
Expand Down Expand Up @@ -163,11 +163,13 @@ impl Workspace {
#[text_signature = "(self, selector, callback)"]
fn subscribe(&self, selector: String, callback: &PyAny) -> PyResult<Subscriber> {
let s = selector_of_string(selector)?;
let stream = task::block_on(self.w.subscribe(&s)).map_err(to_pyerr)?;
// Note: workaround to allow moving of stream into the task below.
let receiver = self.w.subscribe(&s).wait().map_err(to_pyerr)?;
// Note: workaround to allow moving of receiver into the task below.
// Otherwise, s is moved also, but can't because it doesn't have 'static lifetime.
let mut static_stream = unsafe {
std::mem::transmute::<zenoh::ChangeStream<'_>, zenoh::ChangeStream<'static>>(stream)
let mut static_receiver = unsafe {
std::mem::transmute::<zenoh::ChangeReceiver<'_>, zenoh::ChangeReceiver<'static>>(
receiver,
)
};

// Note: callback cannot be passed as such in task below because it's not Send
Expand All @@ -181,7 +183,7 @@ impl Workspace {
task::block_on(async move {
loop {
select!(
change = static_stream.next().fuse() => {
change = static_receiver.next().fuse() => {
// Acquire Python GIL to call the callback
let gil = Python::acquire_gil();
let py = gil.python();
Expand All @@ -192,7 +194,7 @@ impl Workspace {
}
},
_ = close_rx.recv().fuse() => {
if let Err(e) = static_stream.close().await {
if let Err(e) = static_receiver.close().await {
warn!("Error closing Subscriber: {}", e);
}
return
Expand Down Expand Up @@ -236,12 +238,12 @@ impl Workspace {
#[text_signature = "(self, path_expr, callback)"]
fn register_eval(&self, path_expr: String, callback: &PyAny) -> PyResult<Eval> {
let p = pathexpr_of_string(path_expr)?;
let stream = task::block_on(self.w.register_eval(&p)).map_err(to_pyerr)?;
let receiver = self.w.register_eval(&p).wait().map_err(to_pyerr)?;
// Note: workaround to allow moving of stream into the task below.
// Otherwise, s is moved also, but can't because it doesn't have 'static lifetime.
let mut static_stream = unsafe {
let mut static_receiver = unsafe {
std::mem::transmute::<zenoh::GetRequestStream<'_>, zenoh::GetRequestStream<'static>>(
stream,
receiver,
)
};

Expand All @@ -256,7 +258,7 @@ impl Workspace {
task::block_on(async move {
loop {
select!(
req = static_stream.next().fuse() => {
req = static_receiver.next().fuse() => {
// Acquire Python GIL to call the callback
let gil = Python::acquire_gil();
let py = gil.python();
Expand All @@ -267,7 +269,7 @@ impl Workspace {
}
},
_ = close_rx.recv().fuse() => {
if let Err(e) = static_stream.close().await {
if let Err(e) = static_receiver.close().await {
warn!("Error closing Subscriber: {}", e);
}
return
Expand Down
6 changes: 4 additions & 2 deletions src/zenoh_net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,11 @@ fn open(config: &PyDict) -> PyResult<Session> {
fn scout(whatami: ZInt, config: &PyDict, scout_duration: f64) -> PyResult<Vec<Hello>> {
task::block_on(async move {
let mut result = Vec::<Hello>::new();
let mut stream = zenoh::net::scout(whatami, pydict_to_props(config).into()).await;
let mut receiver = zenoh::net::scout(whatami, pydict_to_props(config).into())
.await
.unwrap();
let scout = async {
while let Some(h) = stream.next().await {
while let Some(h) = receiver.next().await {
result.push(Hello { h })
}
};
Expand Down
38 changes: 21 additions & 17 deletions src/zenoh_net/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use log::warn;
use pyo3::prelude::*;
use pyo3::types::{PyList, PyTuple};
use zenoh::net::{data_kind, encoding, ResourceId, ZInt};
use zenoh::ZFuture;

/// A zenoh-net session.
#[pyclass]
Expand All @@ -33,7 +34,7 @@ impl Session {
/// Close the zenoh-net Session.
fn close(&mut self) -> PyResult<()> {
let s = self.take()?;
task::block_on(s.close()).map_err(to_pyerr)
s.close().wait().map_err(to_pyerr)
}

/// Get informations about the zenoh-net Session.
Expand All @@ -49,7 +50,7 @@ impl Session {
/// >>> print("{} : {}".format(key, info[key]))
fn info(&self, py: Python) -> PyResult<PyObject> {
let s = self.as_ref()?;
let props = task::block_on(s.info());
let props = s.info().wait();
Ok(props_to_pydict(py, props.into()).to_object(py))
}

Expand Down Expand Up @@ -91,7 +92,8 @@ impl Session {
let encoding = encoding.unwrap_or(encoding::DEFAULT);
let kind = kind.unwrap_or(data_kind::DEFAULT);
let congestion_control = congestion_control.unwrap_or_default().cc;
task::block_on(s.write_ext(&k, payload.into(), encoding, kind, congestion_control))
s.write_ext(&k, payload.into(), encoding, kind, congestion_control)
.wait()
.map_err(to_pyerr)
}

Expand Down Expand Up @@ -119,7 +121,7 @@ impl Session {
fn declare_resource(&self, resource: &PyAny) -> PyResult<ResourceId> {
let s = self.as_ref()?;
let k = znreskey_of_pyany(resource)?;
task::block_on(s.declare_resource(&k)).map_err(to_pyerr)
s.declare_resource(&k).wait().map_err(to_pyerr)
}

/// Undeclare the *numerical Id/resource key* association previously declared
Expand All @@ -137,7 +139,7 @@ impl Session {
#[text_signature = "(self, rid)"]
fn undeclare_resource(&self, rid: ResourceId) -> PyResult<()> {
let s = self.as_ref()?;
task::block_on(s.undeclare_resource(rid)).map_err(to_pyerr)
s.undeclare_resource(rid).wait().map_err(to_pyerr)
}

/// Declare a Publisher for the given resource key.
Expand Down Expand Up @@ -165,7 +167,7 @@ impl Session {
fn declare_publisher(&self, resource: &PyAny) -> PyResult<Publisher> {
let s = self.as_ref()?;
let k = znreskey_of_pyany(resource)?;
let zn_pub = task::block_on(s.declare_publisher(&k)).map_err(to_pyerr)?;
let zn_pub = s.declare_publisher(&k).wait().map_err(to_pyerr)?;

// Note: this is a workaround for pyo3 not supporting lifetime in PyClass. See https://github.com/PyO3/pyo3/issues/502.
// We extend zenoh::net::Publisher's lifetime to 'static to be wrapped in Publisher PyClass
Expand Down Expand Up @@ -212,7 +214,7 @@ impl Session {
) -> PyResult<Subscriber> {
let s = self.as_ref()?;
let k = znreskey_of_pyany(resource)?;
let zn_sub = task::block_on(s.declare_subscriber(&k, &info.i)).map_err(to_pyerr)?;
let zn_sub = s.declare_subscriber(&k, &info.i).wait().map_err(to_pyerr)?;
// Note: workaround to allow moving of zn_sub into the task below.
// Otherwise, s is moved also, but can't because it doesn't have 'static lifetime.
let mut static_zn_sub = unsafe {
Expand All @@ -232,7 +234,7 @@ impl Session {
task::block_on(async move {
loop {
select!(
s = static_zn_sub.stream().next().fuse() => {
s = static_zn_sub.receiver().next().fuse() => {
// Acquire Python GIL to call the callback
let gil = Python::acquire_gil();
let py = gil.python();
Expand Down Expand Up @@ -304,7 +306,7 @@ impl Session {
) -> PyResult<Queryable> {
let s = self.as_ref()?;
let k = znreskey_of_pyany(resource)?;
let zn_quer = task::block_on(s.declare_queryable(&k, kind)).map_err(to_pyerr)?;
let zn_quer = s.declare_queryable(&k, kind).wait().map_err(to_pyerr)?;
// Note: workaround to allow moving of zn_quer into the task below.
// Otherwise, s is moved also, but can't because it doesn't have 'static lifetime.
let mut static_zn_quer = unsafe {
Expand All @@ -324,7 +326,7 @@ impl Session {
task::block_on(async move {
loop {
select!(
q = static_zn_quer.stream().next().fuse() => {
q = static_zn_quer.receiver().next().fuse() => {
// Acquire Python GIL to call the callback
let gil = Python::acquire_gil();
let py = gil.python();
Expand Down Expand Up @@ -392,13 +394,15 @@ impl Session {
) -> PyResult<()> {
let s = self.as_ref()?;
let k = znreskey_of_pyany(resource)?;
let mut zn_recv = task::block_on(s.query(
&k,
predicate,
target.unwrap_or_default().t,
consolidation.unwrap_or_default().c,
))
.map_err(to_pyerr)?;
let mut zn_recv = s
.query(
&k,
predicate,
target.unwrap_or_default().t,
consolidation.unwrap_or_default().c,
)
.wait()
.map_err(to_pyerr)?;

// Note: callback cannot be passed as such in task below because it's not Send
let cb_obj: Py<PyAny> = callback.into();
Expand Down
5 changes: 3 additions & 2 deletions src/zenoh_net/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use pyo3::prelude::*;
use pyo3::types::{PyBytes, PyTuple};
use pyo3::PyObjectProtocol;
use zenoh::net::{ResourceId, ZInt};
use zenoh::ZFuture;

// zenoh.net.config (simulate the package as a class, and consts as class attributes)
/// Constants and helpers to build the configuration to pass to :func:`zenoh.net.open`.
Expand Down Expand Up @@ -663,7 +664,7 @@ impl Publisher {
/// Undeclare the publisher.
fn undeclare(&mut self) -> PyResult<()> {
match self.p.take() {
Some(p) => task::block_on(p.undeclare()).map_err(to_pyerr),
Some(p) => p.undeclare().wait().map_err(to_pyerr),
None => Ok(()),
}
}
Expand Down Expand Up @@ -768,7 +769,7 @@ impl Query {
/// :type: Sample
#[text_signature = "(self, sample)"]
fn reply(&self, sample: Sample) {
task::block_on(self.q.reply(sample.s));
self.q.reply(sample.s);
}
}

Expand Down