Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove obsolete zenoh::internal::Value #1577

Merged
merged 3 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions plugins/zenoh-backend-example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ use std::collections::{hash_map::Entry, HashMap};

use async_trait::async_trait;
use tokio::sync::RwLock;
use zenoh::{internal::Value, key_expr::OwnedKeyExpr, time::Timestamp, Result as ZResult};
use zenoh::{
bytes::{Encoding, ZBytes},
key_expr::OwnedKeyExpr,
time::Timestamp,
Result as ZResult,
};
use zenoh_backend_traits::{
config::{StorageConfig, VolumeConfig},
Capability, History, Persistence, Storage, StorageInsertionResult, StoredData, Volume,
Expand Down Expand Up @@ -77,17 +82,26 @@ impl Storage for ExampleStorage {
async fn put(
&mut self,
key: Option<OwnedKeyExpr>,
value: Value,
payload: ZBytes,
encoding: Encoding,
timestamp: Timestamp,
) -> ZResult<StorageInsertionResult> {
let mut map = self.map.write().await;
match map.entry(key) {
Entry::Occupied(mut e) => {
e.insert(StoredData { value, timestamp });
e.insert(StoredData {
payload,
encoding,
timestamp,
});
return Ok(StorageInsertionResult::Replaced);
}
Entry::Vacant(e) => {
e.insert(StoredData { value, timestamp });
e.insert(StoredData {
payload,
encoding,
timestamp,
});
return Ok(StorageInsertionResult::Inserted);
}
}
Expand Down
12 changes: 7 additions & 5 deletions plugins/zenoh-backend-traits/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
//! ```
//! use std::sync::Arc;
//! use async_trait::async_trait;
//! use zenoh::{key_expr::OwnedKeyExpr, time::Timestamp, internal::Value};
//! use zenoh::{key_expr::OwnedKeyExpr, time::Timestamp, bytes::{ZBytes, Encoding}};
//! use zenoh_backend_traits::*;
//! use zenoh_backend_traits::config::*;
//!
Expand Down Expand Up @@ -87,7 +87,7 @@
//! self.config.to_json_value()
//! }
//!
//! async fn put(&mut self, key: Option<OwnedKeyExpr>, value: Value, timestamp: Timestamp) -> zenoh::Result<StorageInsertionResult> {
//! async fn put(&mut self, key: Option<OwnedKeyExpr>, payload: ZBytes, encoding: Encoding, timestamp: Timestamp) -> zenoh::Result<StorageInsertionResult> {
//! // the key will be None if it exactly matched with the strip_prefix
//! // create a storage specific special structure to store it
//! // Store the data with timestamp
Expand Down Expand Up @@ -123,7 +123,7 @@
use async_trait::async_trait;
use const_format::concatcp;
use zenoh::{
internal::Value,
bytes::{Encoding, ZBytes},
key_expr::{keyexpr, OwnedKeyExpr},
time::Timestamp,
Result as ZResult,
Expand Down Expand Up @@ -176,7 +176,8 @@ pub enum StorageInsertionResult {

#[derive(Debug, Clone)]
pub struct StoredData {
pub value: Value,
pub payload: ZBytes,
pub encoding: Encoding,
pub timestamp: Timestamp,
}

Expand Down Expand Up @@ -227,7 +228,8 @@ pub trait Storage: Send + Sync {
async fn put(
&mut self,
key: Option<OwnedKeyExpr>,
value: Value,
payload: ZBytes,
encoding: Encoding,
timestamp: Timestamp,
) -> ZResult<StorageInsertionResult>;

Expand Down
22 changes: 18 additions & 4 deletions plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ use std::{collections::HashMap, sync::Arc};

use async_trait::async_trait;
use tokio::sync::RwLock;
use zenoh::{internal::Value, key_expr::OwnedKeyExpr, time::Timestamp, Result as ZResult};
use zenoh::{
bytes::{Encoding, ZBytes},
key_expr::OwnedKeyExpr,
time::Timestamp,
Result as ZResult,
};
use zenoh_backend_traits::{
config::{StorageConfig, VolumeConfig},
*,
Expand Down Expand Up @@ -92,18 +97,27 @@ impl Storage for MemoryStorage {
async fn put(
&mut self,
key: Option<OwnedKeyExpr>,
value: Value,
payload: ZBytes,
encoding: Encoding,
timestamp: Timestamp,
) -> ZResult<StorageInsertionResult> {
tracing::trace!("put for {:?}", key);
let mut map = self.map.write().await;
match map.entry(key) {
std::collections::hash_map::Entry::Occupied(mut e) => {
e.insert(StoredData { value, timestamp });
e.insert(StoredData {
payload,
encoding,
timestamp,
});
return Ok(StorageInsertionResult::Replaced);
}
std::collections::hash_map::Entry::Vacant(e) => {
e.insert(StoredData { value, timestamp });
e.insert(StoredData {
payload,
encoding,
timestamp,
});
return Ok(StorageInsertionResult::Inserted);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
use std::collections::{HashMap, HashSet};

use serde::{Deserialize, Serialize};
use zenoh::{bytes::ZBytes, internal::Value, key_expr::keyexpr_tree::IKeyExprTree, query::Query};
use zenoh::{
bytes::{Encoding, ZBytes},
key_expr::keyexpr_tree::IKeyExprTree,
query::Query,
};

use super::aligner_reply::AlignmentReply;
use crate::replication::{
Expand Down Expand Up @@ -304,7 +308,7 @@ impl Replication {
.into_iter()
.find(|data| data.timestamp == *event_to_retrieve.timestamp());
match requested_data {
Some(data) => Some(data.value),
Some(data) => Some((data.payload, data.encoding)),
None => {
// NOTE: This is not necessarily an error. There is a possibility that the
// data associated with this specific key was updated between the time
Expand All @@ -326,7 +330,7 @@ impl Replication {
let wildcard_puts_guard = self.storage_service.wildcard_puts.read().await;

if let Some(update) = wildcard_puts_guard.weight_at(wildcard_ke) {
Some(update.value().clone())
Some((update.payload().clone(), update.encoding().clone()))
} else {
tracing::error!(
"Ignoring Wildcard Update < {wildcard_ke} >: found no associated `Update`."
Expand All @@ -340,9 +344,9 @@ impl Replication {
}
}

/// Replies to a Query, adding the [AlignmentReply] as an attachment and, if provided, the [Value]
/// as the payload (not forgetting to set the Encoding!).
async fn reply_to_query(query: &Query, reply: AlignmentReply, value: Option<Value>) {
/// Replies to a Query, adding the [AlignmentReply] as an attachment and, if provided, the payload
/// with the corresponding [zenoh::bytes::Encoding].
async fn reply_to_query(query: &Query, reply: AlignmentReply, value: Option<(ZBytes, Encoding)>) {
let attachment = match bincode::serialize(&reply) {
Ok(attachment) => attachment,
Err(e) => {
Expand All @@ -353,8 +357,8 @@ async fn reply_to_query(query: &Query, reply: AlignmentReply, value: Option<Valu

let reply_fut = if let Some(value) = value {
query
.reply(query.key_expr(), value.payload)
.encoding(value.encoding)
.reply(query.key_expr(), value.0)
.encoding(value.1)
.attachment(attachment)
} else {
query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ use std::collections::{HashMap, HashSet};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLockWriteGuard;
use zenoh::{
internal::Value,
bytes::{Encoding, ZBytes},
key_expr::{format::keformat, keyexpr_tree::IKeyExprTreeMut, OwnedKeyExpr},
sample::{Sample, SampleKind},
sample::{Sample, SampleFields, SampleKind},
session::ZenohId,
Result as ZResult,
};
use zenoh_backend_traits::StorageInsertionResult;
use zenoh_backend_traits::{StorageInsertionResult, StoredData};

use crate::{
replication::{
Expand Down Expand Up @@ -295,7 +295,8 @@ impl Replication {
self.apply_wildcard_update(
&mut replication_log_guard,
&replica_event,
Value::empty(),
ZBytes::default(),
Encoding::default(),
)
.await;
}
Expand Down Expand Up @@ -358,19 +359,24 @@ impl Replication {
wildcard_delete_ke.clone(),
SampleKind::Delete,
replica_event.timestamp,
zenoh::internal::Value::empty(),
ZBytes::default(),
Encoding::default(),
)
.await;
}
Action::Put => {
let SampleFields {
payload, encoding, ..
} = sample.into();
if matches!(
self.storage_service
.storage
.lock()
.await
.put(
replica_event.stripped_key.clone(),
sample.into(),
payload,
encoding,
replica_event.timestamp,
)
.await,
Expand All @@ -385,10 +391,14 @@ impl Replication {
}
}
Action::WildcardPut(_) => {
let SampleFields {
payload, encoding, ..
} = sample.into();
self.apply_wildcard_update(
&mut replication_log_guard,
&replica_event,
sample.into(),
payload,
encoding,
)
.await;
}
Expand Down Expand Up @@ -544,7 +554,8 @@ impl Replication {
&self,
replication_log_guard: &mut RwLockWriteGuard<'_, LogLatest>,
replica_event: &EventMetadata,
value: Value,
payload: ZBytes,
encoding: Encoding,
) {
let (wildcard_ke, wildcard_kind) = match &replica_event.action {
Action::Put | Action::Delete => unreachable!(),
Expand Down Expand Up @@ -603,7 +614,8 @@ impl Replication {
.await
.put(
overridden_event.key_expr().clone(),
value.clone(),
payload.clone(),
encoding.clone(),
replica_event.timestamp,
)
.await,
Expand Down Expand Up @@ -664,7 +676,8 @@ impl Replication {
wildcard_ke.clone(),
(&replica_event.action).into(),
replica_event.timestamp,
value,
payload,
encoding,
)
.await;
}
Expand Down Expand Up @@ -724,7 +737,12 @@ impl Replication {
wildcard_ke: OwnedKeyExpr,
wildcard_update: Update,
) {
let wildcard_timestamp = *wildcard_update.timestamp();
let kind = wildcard_update.kind();
let StoredData {
payload,
encoding,
timestamp,
} = wildcard_update.into();

// A Wildcard Update overrides another Wildcard Update, we have nothing to do.
if matches!(
Expand All @@ -736,26 +754,27 @@ impl Replication {

tracing::trace!(
"Overriding < {replica_event:?} > with < {wildcard_ke} {} >",
wildcard_timestamp
timestamp
);

// Generate the action that will be used to override the metadata of the Event. We do it
// now to avoid having to clone the `wildcard_update` because we move it below.
let wildcard_action = match wildcard_update.kind() {
let wildcard_action = match kind {
SampleKind::Put => Action::WildcardPut(wildcard_ke),
SampleKind::Delete => Action::WildcardDelete(wildcard_ke),
};

if wildcard_update.kind() == SampleKind::Put
if kind == SampleKind::Put
&& matches!(
self.storage_service
.storage
.lock()
.await
.put(
replica_event.stripped_key.clone(),
wildcard_update.into_value(),
wildcard_timestamp
payload,
encoding,
timestamp
)
.await,
Ok(StorageInsertionResult::Outdated) | Err(_)
Expand All @@ -772,7 +791,7 @@ impl Replication {
// `timestamp_last_non_wildcard_update`.
let mut event: Event = replica_event.into();
// Then update the Event with the values of the Wildcard Update.
event.set_timestamp_and_action(wildcard_timestamp, wildcard_action);
event.set_timestamp_and_action(timestamp, wildcard_action);

replication_log_guard.insert_event(event);
}
Expand Down
Loading