Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge/main into dev/1.0.0 #1279

Merged
merged 24 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ce43f58
Add NOTE for LowLatency transport. (#1088)
evshary Jun 5, 2024
c279982
fix: Improve debug messages in `zenoh-transport` (#1090)
fuzzypixelz Jun 5, 2024
0942a69
Improve pipeline backoff (#1097)
Mallets Jun 10, 2024
de78221
Add typos check to CI (#1065)
kilpkonn Jun 10, 2024
528b87a
Start link tx_task before notifying router (#1098)
Mallets Jun 10, 2024
d8e66de
Fix typos (#1110)
Mallets Jun 10, 2024
9d09742
bump quinn & rustls (#1086)
JLerxky Jun 11, 2024
ed6c636
Fix interface name scanning when listening on IP unspecified for TCP/…
Mallets Jun 12, 2024
8160b01
Enable releasing from any branch (#1136)
fuzzypixelz Jun 13, 2024
7adad94
Fix cargo clippy (#1145)
Mallets Jun 14, 2024
93f93d2
Release tables locks before propagating subscribers and queryables de…
OlivierHecart Jun 17, 2024
2500e5a
feat: make `TerminatableTask` terminate itself when dropped (#1151)
YuanYuYuan Jun 20, 2024
869ace6
Fix bug in keyexpr::includes leading to call get_unchecked on empty a…
OlivierHecart Jul 2, 2024
b93ca84
REST plugin uses unbounded flume channels for queries (#1213)
OlivierHecart Jul 3, 2024
b3e42ce
fix: typo in selector.rs (#1228)
diogomatsubara Jul 8, 2024
0a969cb
fix: zenohd --cfg (#1263)
YuanYuYuan Jul 25, 2024
65f7f88
Fix failover brokering bug reacting to linkstate changes (#1272)
OlivierHecart Jul 26, 2024
3b9e824
Code format
OlivierHecart Jul 26, 2024
664915a
Fix clippy warnings
OlivierHecart Jul 26, 2024
021f7c6
Code format
OlivierHecart Jul 26, 2024
e587aa9
Fix Clippy errors from Rust 1.80 (#1273)
fuzzypixelz Jul 26, 2024
2d88c7b
Update Cargo.toml (#1277)
kydos Jul 29, 2024
821eb06
Merge main into dev/1.0.0
Mallets Jul 29, 2024
08b7f17
Merge ci.yaml
Mallets Jul 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ authors = [
edition = "2021"
license = "EPL-2.0 OR Apache-2.0"
categories = ["network-programming"]
description = "Zenoh: Zero Overhead Pub/sub, Store/Query and Compute."
description = "Zenoh: The Zero Overhead Pub/Sub/Query Protocol."

# DEFAULT-FEATURES NOTE: Be careful with default-features and additivity!
# (https://github.com/rust-lang/cargo/issues/11329)
Expand Down
1 change: 1 addition & 0 deletions commons/zenoh-keyexpr/src/key_expr/borrowed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ impl keyexpr {
/// For instance, if `self` is `"a/**/c/*" and `prefix` is `a/b/c` then:
/// - the `prefix` matches `"a/**/c"` leading to a result of `"*"` when stripped from `self`
/// - the `prefix` matches `"a/**"` leading to a result of `"**/c/*"` when stripped from `self`
///
/// So the result is `["*", "**/c/*"]`.
/// If `prefix` cannot match the beginning of `self`, an empty list is reuturned.
///
Expand Down
26 changes: 17 additions & 9 deletions commons/zenoh-task/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,16 @@ impl TaskController {
}

pub struct TerminatableTask {
handle: JoinHandle<()>,
handle: Option<JoinHandle<()>>,
token: CancellationToken,
}

impl Drop for TerminatableTask {
fn drop(&mut self) {
self.terminate(std::time::Duration::from_secs(10));
}
}

impl TerminatableTask {
pub fn create_cancellation_token() -> CancellationToken {
CancellationToken::new()
Expand All @@ -146,7 +152,7 @@ impl TerminatableTask {
T: Send + 'static,
{
TerminatableTask {
handle: rt.spawn(future.map(|_f| ())),
handle: Some(rt.spawn(future.map(|_f| ()))),
token,
}
}
Expand All @@ -167,24 +173,26 @@ impl TerminatableTask {
};

TerminatableTask {
handle: rt.spawn(task),
handle: Some(rt.spawn(task)),
token,
}
}

/// Attempts to terminate the task.
/// Returns true if task completed / aborted within timeout duration, false otherwise.
pub fn terminate(self, timeout: Duration) -> bool {
pub fn terminate(&mut self, timeout: Duration) -> bool {
ResolveFuture::new(async move { self.terminate_async(timeout).await }).wait()
}

/// Async version of [`TerminatableTask::terminate()`].
pub async fn terminate_async(self, timeout: Duration) -> bool {
pub async fn terminate_async(&mut self, timeout: Duration) -> bool {
self.token.cancel();
if tokio::time::timeout(timeout, self.handle).await.is_err() {
tracing::error!("Failed to terminate the task");
return false;
};
if let Some(handle) = self.handle.take() {
if tokio::time::timeout(timeout, handle).await.is_err() {
tracing::error!("Failed to terminate the task");
return false;
};
}
true
}
}
3 changes: 2 additions & 1 deletion plugins/zenoh-plugin-rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,8 @@ async fn query(mut req: Request<(Arc<Session>, String)>) -> tide::Result<Respons
.state()
.0
.get(Selector::borrowed(&key_expr, &parameters))
.consolidation(consolidation);
.consolidation(consolidation)
.with(flume::unbounded());
if !body.is_empty() {
let encoding: Encoding = req
.content_type()
Expand Down
1 change: 0 additions & 1 deletion zenoh-ext/src/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
//

//! To manage groups and group memberships

use std::{
collections::HashMap,
convert::TryInto,
Expand Down
2 changes: 1 addition & 1 deletion zenoh-ext/src/publication_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ impl<'a> PublicationCache<'a> {
let PublicationCache {
_queryable,
local_sub,
task,
mut task,
} = self;
_queryable.undeclare().await?;
local_sub.undeclare().await?;
Expand Down
1 change: 0 additions & 1 deletion zenoh/src/api/key_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use std::{
convert::{TryFrom, TryInto},
future::{IntoFuture, Ready},
Expand Down
1 change: 0 additions & 1 deletion zenoh/src/net/routing/hat/client/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ fn propagate_simple_queryable(
.local_qabls
.insert(res.clone(), (id, info));
let key_expr = Resource::decl_key(res, &mut dst_face);
println!("Decled key = {key_expr:?}");
send_declare(
&dst_face.primitives,
RoutingContext::with_expr(
Expand Down
3 changes: 1 addition & 2 deletions zenoh/src/net/routing/hat/linkstate_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ struct HatTables {

impl Drop for HatTables {
fn drop(&mut self) {
if self.linkstatepeers_trees_task.is_some() {
let task = self.linkstatepeers_trees_task.take().unwrap();
if let Some(mut task) = self.linkstatepeers_trees_task.take() {
task.terminate(Duration::from_secs(10));
}
}
Expand Down
10 changes: 5 additions & 5 deletions zenoh/src/net/routing/hat/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,10 @@ struct HatTables {

impl Drop for HatTables {
fn drop(&mut self) {
if self.linkstatepeers_trees_task.is_some() {
let task = self.linkstatepeers_trees_task.take().unwrap();
if let Some(mut task) = self.linkstatepeers_trees_task.take() {
task.terminate(Duration::from_secs(10));
}
if self.routers_trees_task.is_some() {
let task = self.routers_trees_task.take().unwrap();
if let Some(mut task) = self.routers_trees_task.take() {
task.terminate(Duration::from_secs(10));
}
}
Expand Down Expand Up @@ -253,7 +251,9 @@ impl HatTables {
.as_ref()
.map(|net| {
let links = net.get_links(peer1);
HatTables::failover_brokering_to(links, peer2)
let res = HatTables::failover_brokering_to(links, peer2);
tracing::trace!("failover_brokering {} {} : {}", peer1, peer2, res);
res
})
.unwrap_or(false)
}
Expand Down
5 changes: 5 additions & 0 deletions zenohd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,11 @@ fn config_from_args(args: &Args) -> Config {
}
Err(e) => tracing::warn!("Couldn't perform configuration {}: {}", json, e),
}
} else {
panic!(
"--cfg accepts KEY:VALUE pairs. {} is not a valid KEY:VALUE pair.",
json
)
}
}
tracing::debug!("Config: {:?}", &config);
Expand Down