diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/classification.rs b/plugins/zenoh-plugin-storage-manager/src/replication/classification.rs index 57d30cad45..528b470f3a 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/classification.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/classification.rs @@ -216,6 +216,50 @@ impl Interval { result } + + /// Removes and returns the [Event] present in this `Interval` that are overridden by the + /// provided Wildcard Update. + /// + /// If the Wildcard Update should be recorded in this `Interval` then the index of the + /// `SubInterval` should also be provided as the removal can be stopped right after: all the + /// [Event]s contained in greater `SubInterval` will, by construction of the Replication Log, + /// only have greater timestamps and thus cannot be overridden by this Wildcard Update. + pub(crate) fn remove_events_overridden_by_wildcard_update( + &mut self, + prefix: Option<&OwnedKeyExpr>, + wildcard_key_expr: &OwnedKeyExpr, + wildcard_timestamp: &Timestamp, + wildcard_sub_idx: Option, + ) -> HashSet { + let mut overridden_events = HashSet::new(); + for (sub_interval_idx, sub_interval) in self.sub_intervals.iter_mut() { + let mut timestamp = None; + if let Some(wildcard_sub_idx) = wildcard_sub_idx { + if *sub_interval_idx > wildcard_sub_idx { + break; + } + + // We only provide the timestamp of the Wildcard Update if the index of the + // SubInterval (if provided) equals the index of SubInterval where the Wildcard + // Update will be stored. + if *sub_interval_idx == wildcard_sub_idx { + timestamp = Some(wildcard_timestamp); + } + } + + self.fingerprint ^= sub_interval.fingerprint; + + overridden_events.extend(sub_interval.remove_events_overridden_by_wildcard_update( + prefix, + wildcard_key_expr, + timestamp, + )); + + self.fingerprint ^= sub_interval.fingerprint; + } + + overridden_events + } } /// A `SubIntervalIdx` represents the index of a [SubInterval]. @@ -356,6 +400,34 @@ impl SubInterval { EventRemoval::NotFound } + + /// Removes and returns the [Event] present in this `SubInterval` that are overridden by the + /// provided Wildcard Update. + /// + /// The timestamp of the Wildcard Update should only be provided if the considered `SubInterval` + /// is where the Wildcard Update should be recorded. + /// It is only in that specific scenario that we are not sure that all [Event]s have a lower + /// timestamp. + fn remove_events_overridden_by_wildcard_update( + &mut self, + prefix: Option<&OwnedKeyExpr>, + wildcard_key_expr: &OwnedKeyExpr, + wildcard_timestamp: Option<&Timestamp>, + ) -> HashSet { + let overridden_events = + crate::replication::core::remove_events_overridden_by_wildcard_update( + &mut self.events, + prefix, + wildcard_key_expr, + wildcard_timestamp, + ); + + overridden_events + .iter() + .for_each(|overridden_event| self.fingerprint ^= overridden_event.fingerprint()); + + overridden_events + } } #[cfg(test)] diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/configuration.rs b/plugins/zenoh-plugin-storage-manager/src/replication/configuration.rs index 64a792b6b1..50744a4ae5 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/configuration.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/configuration.rs @@ -79,6 +79,10 @@ impl Configuration { } } + pub fn prefix(&self) -> Option<&OwnedKeyExpr> { + self.prefix.as_ref() + } + /// Returns the [Fingerprint] of the `Configuration`. /// /// The fingerprint is the hash of all its fields, using the `xxhash_rust` crate. diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/core.rs b/plugins/zenoh-plugin-storage-manager/src/replication/core.rs index d4ebd6352f..ac78bb8a4f 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/core.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/core.rs @@ -16,16 +16,13 @@ mod aligner_query; mod aligner_reply; use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, sync::Arc, time::{Duration, Instant, SystemTime}, }; use rand::Rng; -use tokio::{ - sync::{Mutex, RwLock}, - task::JoinHandle, -}; +use tokio::{sync::RwLock, task::JoinHandle}; use tracing::{debug_span, Instrument}; use zenoh::{ key_expr::{ @@ -34,13 +31,16 @@ use zenoh::{ }, query::{ConsolidationMode, Selector}, sample::Locality, + time::Timestamp, Session, }; -use zenoh_backend_traits::Storage; use self::aligner_reply::AlignmentReply; -use super::{digest::Digest, log::LogLatest}; -use crate::{replication::core::aligner_query::AlignmentQuery, storages_mgt::LatestUpdates}; +use super::{digest::Digest, log::LogLatest, Action, Event}; +use crate::{ + replication::core::aligner_query::AlignmentQuery, + storages_mgt::{LatestUpdates, StorageService}, +}; kedefine!( pub digest_key_expr_formatter: "@-digest/${zid:*}/${hash_configuration:*}", @@ -53,7 +53,7 @@ pub(crate) struct Replication { pub(crate) replication_log: Arc>, pub(crate) storage_key_expr: OwnedKeyExpr, pub(crate) latest_updates: Arc>, - pub(crate) storage: Arc>>, + pub(crate) storage_service: Arc, } impl Replication { @@ -579,3 +579,54 @@ impl Replication { }) } } + +pub(crate) fn remove_events_overridden_by_wildcard_update( + events: &mut HashMap, Event>, + prefix: Option<&OwnedKeyExpr>, + wildcard_ke: &OwnedKeyExpr, + wildcard_ts: Option<&Timestamp>, +) -> HashSet { + let mut overridden_events = HashSet::default(); + + events.retain(|stripped_key_expr, event| { + // We only provide the timestamp of the Wildcard Update if the Wildcard Update belongs + // in this SubInterval. + // + // Only then do we need to compare its timestamp with the timestamp of the Events + // contained in the SubInterval. + if let Some(wildcard_timestamp) = wildcard_ts { + if wildcard_timestamp < event.timestamp() { + return true; + } + } + + let full_event_key_expr = match event.action() { + Action::Put | Action::Delete => { + match crate::prefix(prefix, stripped_key_expr.as_ref()) { + Ok(full_ke) => full_ke, + Err(e) => { + tracing::error!( + "Internal error while attempting to prefix < {:?} > with < {:?} >: \ + {e:?}", + stripped_key_expr, + prefix + ); + return true; + } + } + } + Action::WildcardPut(wildcard_ke) | Action::WildcardDelete(wildcard_ke) => { + wildcard_ke.clone() + } + }; + + if wildcard_ke.includes(&full_event_key_expr) { + overridden_events.insert(event.clone()); + return false; + } + + true + }); + + overridden_events +} diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_query.rs b/plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_query.rs index bb550ac8a7..a2d981c67f 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_query.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_query.rs @@ -15,14 +15,14 @@ use std::collections::{HashMap, HashSet}; use serde::{Deserialize, Serialize}; -use zenoh::{bytes::ZBytes, internal::Value, query::Query, sample::SampleKind}; +use zenoh::{bytes::ZBytes, internal::Value, key_expr::keyexpr_tree::IKeyExprTree, query::Query}; use super::aligner_reply::AlignmentReply; use crate::replication::{ classification::{IntervalIdx, SubIntervalIdx}, core::Replication, digest::DigestDiff, - log::EventMetadata, + log::{Action, EventMetadata}, }; /// The `AlignmentQuery` enumeration represents the information requested by a Replica to align @@ -271,54 +271,70 @@ impl Replication { /// Replies to the [Query] with the [EventMetadata] and [Value] identified as missing. /// - /// This method will fetch the [StoredData] from the Storage. + /// Depending on the associated action, this method will fetch the [Value] either from the + /// Storage or from the wildcard updates. pub(crate) async fn reply_event_retrieval( &self, query: &Query, event_to_retrieve: EventMetadata, ) { - let mut value = None; + let value = match &event_to_retrieve.action { + // For a Delete or WildcardDelete there is no associated `Value`. + Action::Delete | Action::WildcardDelete(_) => None, + // For a Put we need to retrieve the `Value` in the Storage. + Action::Put => { + let stored_data = { + let mut storage = self.storage_service.storage.lock().await; + match storage + .get(event_to_retrieve.stripped_key.clone(), "") + .await + { + Ok(stored_data) => stored_data, + Err(e) => { + tracing::error!( + "Failed to retrieve data associated to key < {:?} >: {e:?}", + event_to_retrieve.key_expr() + ); + return; + } + } + }; - if event_to_retrieve.action == SampleKind::Put { - let stored_data = { - let mut storage = self.storage.lock().await; - match storage - .get(event_to_retrieve.stripped_key.clone(), "") - .await - { - Ok(stored_data) => stored_data, - Err(e) => { - tracing::error!( - "Failed to retrieve data associated to key < {:?} >: {e:?}", - event_to_retrieve.key_expr() + let requested_data = stored_data + .into_iter() + .find(|data| data.timestamp == *event_to_retrieve.timestamp()); + match requested_data { + Some(data) => Some(data.value), + None => { + // NOTE: This is not necessarily an error. There is a possibility that the + // data associated with this specific key was updated between the time + // the [AlignmentQuery] was sent and when it is processed. + // + // Hence, at the time it was "valid" but it no longer is. + tracing::debug!( + "Found no data in the Storage associated to key < {:?} > with a \ + Timestamp equal to: {}", + event_to_retrieve.key_expr(), + event_to_retrieve.timestamp() ); return; } } - }; + } + // For a WildcardPut we need to retrieve the `Value` in the `StorageService`. + Action::WildcardPut(wildcard_ke) => { + let wildcard_update_guard = self.storage_service.wildcard_updates.read().await; - let requested_data = stored_data - .into_iter() - .find(|data| data.timestamp == *event_to_retrieve.timestamp()); - match requested_data { - Some(data) => { - value = Some(data.value); - } - None => { - // NOTE: This is not necessarily an error. There is a possibility that the data - // associated with this specific key was updated between the time the - // [AlignmentQuery] was sent and when it is processed. - // - // Hence, at the time it was "valid" but it no longer is. - tracing::debug!( - "Found no data in the Storage associated to key < {:?} > with a Timestamp \ - equal to: {}", - event_to_retrieve.key_expr(), - event_to_retrieve.timestamp() + if let Some(update) = wildcard_update_guard.weight_at(wildcard_ke) { + Some(update.value().clone()) + } else { + tracing::error!( + "Ignoring Wildcard Update < {wildcard_ke} >: found no associated `Update`." ); + return; } } - } + }; reply_to_query(query, AlignmentReply::Retrieval(event_to_retrieve), value).await; } diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_reply.rs b/plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_reply.rs index 5fa4346bf3..701ea559f7 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_reply.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_reply.rs @@ -15,18 +15,26 @@ use std::collections::{HashMap, HashSet}; use serde::{Deserialize, Serialize}; +use tokio::sync::RwLockWriteGuard; use zenoh::{ - key_expr::{format::keformat, OwnedKeyExpr}, + internal::Value, + key_expr::{format::keformat, keyexpr_tree::IKeyExprTreeMut, OwnedKeyExpr}, sample::{Sample, SampleKind}, session::ZenohId, + time::Timestamp, + Result as ZResult, }; use zenoh_backend_traits::StorageInsertionResult; -use crate::replication::{ - classification::{EventRemoval, IntervalIdx, SubIntervalIdx}, - core::{aligner_key_expr_formatter, aligner_query::AlignmentQuery, Replication}, - digest::Fingerprint, - log::EventMetadata, +use crate::{ + replication::{ + classification::{EventRemoval, IntervalIdx, SubIntervalIdx}, + core::{aligner_key_expr_formatter, aligner_query::AlignmentQuery, Replication}, + digest::Fingerprint, + log::{Action, EventMetadata}, + LogLatest, + }, + storages_mgt::service::Update, }; /// The `AlignmentReply` enumeration contains the possible information needed by a Replica to align @@ -192,9 +200,14 @@ impl Replication { let mut diff_events = Vec::default(); for replica_event in replica_events { + tracing::trace!("Checking if < {replica_event:?} > is missing"); if let Some(missing_event_metadata) = self.process_event_metadata(replica_event).await { + tracing::trace!( + "Requesting: < {:?} >", + missing_event_metadata.stripped_key + ); diff_events.push(missing_event_metadata); } } @@ -235,32 +248,33 @@ impl Replication { .read() .await .get(&replica_event.stripped_key) - .is_some_and(|latest_event| latest_event.timestamp >= replica_event.timestamp) + .is_some_and(|latest_event| latest_event.timestamp() >= replica_event.timestamp()) + { + return None; + } + + let mut replication_log_guard = self.replication_log.write().await; + if !self + .needs_further_processing(&mut replication_log_guard, &replica_event) + .await { return None; } match &replica_event.action { - SampleKind::Put => { - let replication_log_guard = self.replication_log.read().await; - if let Some(latest_event) = - replication_log_guard.lookup(&replica_event.stripped_key) - { - if latest_event.timestamp >= replica_event.timestamp { - return None; - } - } - return Some(replica_event); - } - SampleKind::Delete => { - let mut replication_log_guard = self.replication_log.write().await; + // To apply a Put or a WildcardPut we need the payload so we return here to indicate + // that we need to retrieve it from the Replica. + Action::Put | Action::WildcardPut(_) => return Some(replica_event), + + // A Delete can be applied right away, we have all the information we need. + Action::Delete => { match replication_log_guard .remove_older(&replica_event.stripped_key, &replica_event.timestamp) { EventRemoval::NotFound => {} EventRemoval::KeptNewer => return None, EventRemoval::RemovedOlder(older_event) => { - if older_event.action == SampleKind::Put { + if older_event.action() == &Action::Put { // NOTE: In some of our backend implementation, a deletion on a // non-existing key will return an error. Given that we cannot // distinguish an error from a missing key, we will assume @@ -269,6 +283,7 @@ impl Replication { // FIXME: Once the behaviour described above is fixed, check for // errors. let _ = self + .storage_service .storage .lock() .await @@ -277,12 +292,20 @@ impl Replication { } } } + } - replication_log_guard.insert_event_unchecked(replica_event.clone().into()); + Action::WildcardDelete(_) => { + self.apply_wildcard_update( + &mut replication_log_guard, + &replica_event, + Value::empty(), + ) + .await; } } - Some(replica_event) + replication_log_guard.insert_event_unchecked(replica_event.clone().into()); + None } /// Processes the [EventMetadata] and [Sample] sent by the Replica, adding it to our Storage if @@ -299,49 +322,397 @@ impl Replication { /// steps and the Replica goes straight to sending all its Replication Log and data in its /// Storage. Including for the deleted events. async fn process_event_retrieval(&self, replica_event: EventMetadata, sample: Sample) { - tracing::trace!("Processing `AlignmentReply::Retrieval`"); + tracing::trace!("Processing `AlignmentReply::Retrieval` for < {replica_event:?} >"); if self .latest_updates .read() .await .get(&replica_event.stripped_key) - .is_some_and(|latest_event| latest_event.timestamp >= replica_event.timestamp) + .is_some_and(|latest_event| latest_event.timestamp() >= replica_event.timestamp()) { return; } let mut replication_log_guard = self.replication_log.write().await; + + if !self + .needs_further_processing(&mut replication_log_guard, &replica_event) + .await + { + return; + } + + // The Event is newer than what we have and is not overridden by a Wildcard Update, we + // need to process it. + replication_log_guard.remove_older(&replica_event.stripped_key, replica_event.timestamp()); + + match &replica_event.action { + // NOTE: This code can only be called with `action` set to `Delete` or `WildcardDelete` + // on an initial alignment, in which case the Storage of the receiving Replica is empty + // => there is no need to actually call `storage.delete`. + // + // Outside of an initial alignment, the `Delete` or `WildcardDelete` actions will be + // performed at the step above, in `AlignmentReply::EventsMetadata`. + Action::Delete => {} + Action::WildcardDelete(wildcard_delete_ke) => { + self.storage_service + .register_wildcard_update( + wildcard_delete_ke.clone(), + SampleKind::Delete, + replica_event.timestamp, + zenoh::internal::Value::empty(), + ) + .await; + } + Action::Put => { + if matches!( + self.storage_service + .storage + .lock() + .await + .put( + replica_event.stripped_key.clone(), + sample.into(), + replica_event.timestamp, + ) + .await, + Ok(StorageInsertionResult::Outdated) | Err(_) + ) { + // NOTE: This is not necessarily an error: as we are not locking the + // `latest_updates` more events on the same key expression can be received + // before we have time to store this one. + // + // In that scenario the Storage should either return an error or `Outdated`. + return; + } + } + Action::WildcardPut(_) => { + self.apply_wildcard_update( + &mut replication_log_guard, + &replica_event, + sample.into(), + ) + .await; + } + } + + replication_log_guard.insert_event_unchecked(replica_event.into()); + } + + /// Returns `true` if the provided `replica_event` requires more processing. + /// + /// This method will: + /// - check if there is a Wildcard Update that overrides it or a more recent Event in the + /// Replication Log, + /// - if there is a Wildcard Update that overrides it, apply the Wildcard Update and return + /// `false` as a Wildcard Update is self-contained, + /// - if there is a more recent Event, return `false` as this Replica is up-to-date, + /// - if there is neither, return `true` as depending on where this method was called, different + /// operation(s) need to be performed. + async fn needs_further_processing( + &self, + replication_log_guard: &mut RwLockWriteGuard<'_, LogLatest>, + replica_event: &EventMetadata, + ) -> bool { + // We received an EventMetadata, we need to check if we don't have: + // 1. a Wildcard Update that overrides it, + // 2. a more recent Event on that same key expression already in the Replication Log. + let Ok(maybe_wildcard_update) = self + .is_overridden_by_wildcard_update( + replica_event.stripped_key.as_ref(), + replica_event.timestamp(), + &replica_event.action, + ) + .await + else { + tracing::error!( + "Internal error attempting to prefix < {:?} > with < {:?} >", + replica_event.stripped_key, + self.storage_service.configuration.strip_prefix + ); + return false; + }; + + let maybe_newer_event = replication_log_guard.lookup(&replica_event.stripped_key); + + match (maybe_wildcard_update, maybe_newer_event) { + // No Event in the Replication Log or Wildcard Update, we go on, we need to process it. + (None, None) => {} + // If the timestamp of the Event in the Replication Log for the same key expression has + // a greater Timestamp then we discard this EventMetadata. + (None, Some(log_event)) => { + if log_event.timestamp() >= replica_event.timestamp() { + return false; + } + } + // We have a Wildcard Update that overrides the Event -> we override it and we're done. + (Some(wildcard_update), None) => { + self.store_event_overridden_by_wildcard_update( + replication_log_guard, + replica_event.clone(), + wildcard_update, + ) + .await; + return false; + } + // We have both a Wildcard Update and an Event for the same key expression in the + // Replication Log: we need to compare timestamps and apply the one with the greater + // value. + (Some(wildcard_update), Some(log_event)) => { + if wildcard_update.timestamp() > log_event.timestamp() { + self.store_event_overridden_by_wildcard_update( + replication_log_guard, + replica_event.clone(), + wildcard_update, + ) + .await; + return false; + } else if log_event.timestamp() >= replica_event.timestamp() { + return false; + } + } + } + + true + } + + /// Applies the Wildcard Update provided in the `replica_event`, unless there is a more recent + /// one already recorded in the Replication Log. + /// + /// ⚠️ This method does NOT insert the Wildcard Update in the Replication Log. It still needs + /// to be added afterwards. This is why we only take a mutable reference on the Write lock. + /// (We do not perform this operation to avoid code duplication in methods calling this one.) + /// + /// Applying a Wildcard Update involves several steps: + /// + /// - We need to override any Event present in the latest cache and in the Replication Log. That + /// means we need to: + /// (i) remove them, + /// (ii) if needed, override or delete the data associated in the Storage, + /// (iii) update their metadata (putting the timestamp and action of the Wildcard Update), + /// (iv) re-insert them. + /// + /// - We need to register the Wildcard Update such that the Storage is aware of it and can + /// apply it on late-comers. + async fn apply_wildcard_update( + &self, + replication_log_guard: &mut RwLockWriteGuard<'_, LogLatest>, + replica_event: &EventMetadata, + value: Value, + ) { + let (wildcard_ke, wildcard_kind) = match &replica_event.action { + Action::Put | Action::Delete => return, + Action::WildcardPut(wildcard_ke) => (wildcard_ke, SampleKind::Put), + Action::WildcardDelete(wildcard_ke) => (wildcard_ke, SampleKind::Delete), + }; + + if matches!( + replication_log_guard + .remove_older(&replica_event.stripped_key, &replica_event.timestamp), + EventRemoval::KeptNewer + ) { + return; + } + + // We lock the Cache until we are done processing this Wildcard Update to not let pass Event + // that should be overridden: the Wildcard Update will only be applied once on older Events + // and that time is now. + let mut latest_updates_guard = self.latest_updates.write().await; + let mut overridden_events = + crate::replication::core::remove_events_overridden_by_wildcard_update( + &mut latest_updates_guard, + self.storage_service.configuration.strip_prefix.as_ref(), + wildcard_ke, + Some(replica_event.timestamp()), + ); + match replication_log_guard - .remove_older(&replica_event.stripped_key, &replica_event.timestamp) + .remove_events_overridden_by_wildcard_update(wildcard_ke, replica_event.timestamp()) { - EventRemoval::KeptNewer => return, - EventRemoval::RemovedOlder(_) | EventRemoval::NotFound => {} + Ok(events) => overridden_events.extend(events), + Err(e) => { + tracing::error!("Skipping Wildcard Update < {wildcard_ke} >: {e:?}"); + return; + } + }; + + for mut overridden_event in overridden_events { + let overridden_event_action = overridden_event.action(); + tracing::trace!("Overriding < {overridden_event:?} with: {wildcard_ke:?} >"); + match overridden_event_action { + // The overridden event is not a Wildcard Update: we need to re-insert in the + // Replication Log but with updated values… + Action::Delete | Action::Put => { + // But depending on the type of Wildcard Update and the action of the overridden + // event we may or may not need to delete or put data from/to the Storage. + match (overridden_event_action, wildcard_kind) { + // If the Wildcard Update is a Put, we don't need to delete first, + // the new Put will (normally) override any previous value. + (_, SampleKind::Put) => { + if matches!( + self.storage_service + .storage + .lock() + .await + .put( + overridden_event.key_expr().clone(), + value.clone(), + replica_event.timestamp, + ) + .await, + Ok(StorageInsertionResult::Outdated) | Err(_) + ) { + continue; + } + } + // If the action of the overridden event is a Put and the Wildcard Update is + // a delete, we need to remove the previous data. + (Action::Put, SampleKind::Delete) => { + if matches!( + self.storage_service + .storage + .lock() + .await + .delete( + overridden_event.key_expr().clone(), + *overridden_event.timestamp(), + ) + .await, + Ok(StorageInsertionResult::Outdated) + ) { + continue; + } + } + // A Delete Wildcard Update overriding a Delete action needs to further + // interaction with the Storage. + (_, _) => {} + } + + overridden_event.set_timestamp(replica_event.timestamp); + overridden_event.set_action(Action::Put); + replication_log_guard.insert_event_unchecked(overridden_event); + } + + // We are overriding a Wildcard Update with another Wildcard Update, there is no + // need to touch the Storage. + Action::WildcardPut(overridden_wildcard_ke) + | Action::WildcardDelete(overridden_wildcard_ke) => { + // NOTE: If a Wildcard Update overrides another Wildcard Update, there is no + // need to reinsert the previous Wildcard Update, that would lead to extra data + // stored for no valid reason, since the newer Wildcard Update will always + // "win". + // + // Example: + // 1. z_put -k "test/replication/**" -v "1" @t_0 + // 2. z_put -k "test/**" -v 42 @t_1 (t_1 > t_0) + // + // 2. overrides 1., so we remove 1. from the `wildcard_updates` structure. + let mut wildcard_updates = self.storage_service.wildcard_updates.write().await; + wildcard_updates.remove(overridden_wildcard_ke); + wildcard_updates.prune(); + } + } } - // NOTE: This code can only be called with `action` set to `delete` on an initial - // alignment, in which case the Storage of the receiving Replica is empty => there - // is no need to actually call `storage.delete`. - // - // Outside of an initial alignment, the `delete` action will be performed at the - // step above, in `AlignmentReply::EventsMetadata`. - if replica_event.action == SampleKind::Put + self.storage_service + .register_wildcard_update( + wildcard_ke.clone(), + (&replica_event.action).into(), + replica_event.timestamp, + value, + ) + .await; + } + + /// Checks if the provided `replica_event` is overridden by a Wildcard Update and, if so, + /// returns the associated [Update]. + /// + /// If there is no Wildcard Update that overrides it, `None` is returned. + /// + /// # Errors + /// + /// This method will return an error if the stripped key of the `replica_event` is set to `None` + /// and this Storage was configured without any `strip_prefix`. + /// + /// This is a "fatal" error that cannot be recovered from. + async fn is_overridden_by_wildcard_update( + &self, + stripped_key: Option<&OwnedKeyExpr>, + timestamp: &Timestamp, + action: &Action, + ) -> ZResult> { + let full_event_key_expr = match action { + Action::Put | Action::Delete => crate::prefix( + self.storage_service.configuration.strip_prefix.as_ref(), + stripped_key, + )?, + Action::WildcardPut(wildcard_ke) | Action::WildcardDelete(wildcard_ke) => { + wildcard_ke.clone() + } + }; + + Ok(self + .storage_service + .overriding_wild_update(&full_event_key_expr, timestamp) + .await) + } + + /// Stores in the Storage and/or the Replication Log the provided `replica_event` *that is being + /// overridden by the `wildcard_update`*. + /// + /// The `replica_event` was sent by a Replica during the alignment process. It is not associated + /// to any data in the Storage. + /// + /// A payload will be pushed to the Storage if the `wildcard_update` is a put. + // + // NOTE: There is no need to attempt to delete an event in the Storage if the `wildcard_update` + // is a delete. Indeed, if the wildcard update is a delete then it is impossible to have + // a previous event associated to the same key expression as, by definition of a wildcard + // update, it would have been deleted. + async fn store_event_overridden_by_wildcard_update( + &self, + replication_log_guard: &mut RwLockWriteGuard<'_, LogLatest>, + mut replica_event: EventMetadata, + wildcard_update: Update, + ) { + tracing::trace!("Overriding < {replica_event:?} > with Wildcard Update"); + let wildcard_timestamp = *wildcard_update.timestamp(); + let wildcard_action = wildcard_update.kind().into(); + + // If a Wildcard Update is overridden by another Wildcard Update, we don't have to do + // anything. + if matches!( + replica_event.action, + Action::WildcardPut(_) | Action::WildcardDelete(_) + ) { + return; + } + + if wildcard_action == Action::Put && matches!( - self.storage + self.storage_service + .storage .lock() .await .put( replica_event.stripped_key.clone(), - sample.into(), - replica_event.timestamp, + wildcard_update.into_value(), + wildcard_timestamp ) .await, Ok(StorageInsertionResult::Outdated) | Err(_) ) { + tracing::error!( + "Failed to insert Wildcard Put Update applied to < {:?} >", + replica_event.stripped_key + ); return; } - replication_log_guard.insert_event_unchecked(replica_event.into()); + replica_event.timestamp = wildcard_timestamp; + replica_event.action = wildcard_action; + replication_log_guard.insert_event(replica_event.into()); } } diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/log.rs b/plugins/zenoh-plugin-storage-manager/src/replication/log.rs index 4feb996285..8499a22e18 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/log.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/log.rs @@ -25,16 +25,42 @@ use super::{ digest::{Digest, Fingerprint}, }; +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, Hash)] +pub(crate) enum Action { + Put, + Delete, + WildcardPut(OwnedKeyExpr), + WildcardDelete(OwnedKeyExpr), +} + +impl From for Action { + fn from(kind: SampleKind) -> Self { + match kind { + SampleKind::Put => Action::Put, + SampleKind::Delete => Action::Delete, + } + } +} + +impl From<&Action> for SampleKind { + fn from(action: &Action) -> Self { + match action { + Action::Put | Action::WildcardPut(_) => SampleKind::Put, + Action::Delete | Action::WildcardDelete(_) => SampleKind::Delete, + } + } +} + /// The `EventMetadata` structure contains all the information needed by a replica to assess if it /// is missing an [Event] in its log. /// /// Associating the `action` allows only sending the metadata when the associate action is /// [SampleKind::Delete]. -#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, Hash)] pub struct EventMetadata { pub(crate) stripped_key: Option, pub(crate) timestamp: Timestamp, - pub(crate) action: SampleKind, + pub(crate) action: Action, } impl EventMetadata { @@ -52,7 +78,7 @@ impl From<&Event> for EventMetadata { Self { stripped_key: event.maybe_stripped_key.clone(), timestamp: event.timestamp, - action: event.action, + action: event.action.clone(), } } } @@ -62,12 +88,12 @@ impl From<&Event> for EventMetadata { /// /// When an `Event` is created, its [Fingerprint] is computed, using the `xxhash-rust` crate. This /// [Fingerprint] is used to construct the [Digest] associated with the replication log. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Event { - pub(crate) maybe_stripped_key: Option, - pub(crate) timestamp: Timestamp, - pub(crate) action: SampleKind, - pub(crate) fingerprint: Fingerprint, + maybe_stripped_key: Option, + timestamp: Timestamp, + action: Action, + fingerprint: Fingerprint, } impl From for Event { @@ -80,7 +106,11 @@ impl Event { /// Creates a new [Event] with the provided key expression and timestamp. /// /// This function computes the [Fingerprint] of both using the `xxhash_rust` crate. - pub fn new(key_expr: Option, timestamp: Timestamp, action: SampleKind) -> Self { + pub fn new( + key_expr: Option, + timestamp: Timestamp, + action: impl Into, + ) -> Self { let mut hasher = xxhash_rust::xxh3::Xxh3::default(); if let Some(key_expr) = &key_expr { hasher.update(key_expr.as_bytes()); @@ -91,11 +121,35 @@ impl Event { Self { maybe_stripped_key: key_expr, timestamp, - action, + action: action.into(), fingerprint: hasher.digest().into(), } } + fn compute_fingerprint(&self) -> Fingerprint { + let mut hasher = xxhash_rust::xxh3::Xxh3::default(); + if let Some(key_expr) = &self.maybe_stripped_key { + hasher.update(key_expr.as_bytes()); + } + hasher.update(&self.timestamp.get_time().0.to_le_bytes()); + hasher.update(&self.timestamp.get_id().to_le_bytes()); + + hasher.digest().into() + } + + pub fn set_timestamp(&mut self, timestamp: Timestamp) { + self.timestamp = timestamp; + self.fingerprint = self.compute_fingerprint(); + } + + pub fn set_action(&mut self, action: Action) { + self.action = action; + } + + pub fn action(&self) -> &Action { + &self.action + } + /// Returns a reference over the key expression associated with this [Event]. /// /// Note that this method can return `None` as the underlying key expression could be the @@ -401,6 +455,48 @@ impl LogLatest { hot_era_fingerprints, } } + + /// Removes and returns the [Event]s overridden by the provided Wildcard Update from the + /// Replication Log. + /// + /// The affected `Interval` and `SubInterval` will have their [Fingerprint] updated accordingly. + /// + /// # Error + /// + /// This method will return an error if the call to obtain the time classification of the + /// Wildcard Update failed. This should only happen if the Timestamp is far in the future or if + /// the internal clock of the host system is misconfigured. + pub(crate) fn remove_events_overridden_by_wildcard_update( + &mut self, + wildcard_key_expr: &OwnedKeyExpr, + wildcard_timestamp: &Timestamp, + ) -> ZResult> { + let (wildcard_interval_idx, wildcard_sub_interval_idx) = self + .configuration + .get_time_classification(wildcard_timestamp)?; + + let mut overridden_events = HashSet::new(); + + for (interval_idx, interval) in self.intervals.iter_mut() { + if *interval_idx > wildcard_interval_idx { + break; + } + + let mut wildcard_sub_idx = None; + if *interval_idx == wildcard_interval_idx { + wildcard_sub_idx = Some(wildcard_sub_interval_idx); + } + + overridden_events.extend(interval.remove_events_overridden_by_wildcard_update( + self.configuration.prefix(), + wildcard_key_expr, + wildcard_timestamp, + wildcard_sub_idx, + )); + } + + Ok(overridden_events) + } } #[cfg(test)] diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/mod.rs b/plugins/zenoh-plugin-storage-manager/src/replication/mod.rs index 5ef3bba2de..ae7b97d6ed 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/mod.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/mod.rs @@ -33,5 +33,5 @@ mod digest; mod log; mod service; -pub(crate) use log::{Event, LogLatest}; +pub(crate) use log::{Action, Event, LogLatest}; pub(crate) use service::ReplicationService; diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/service.rs b/plugins/zenoh-plugin-storage-manager/src/replication/service.rs index e48fb4fecd..233e526072 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/service.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/service.rs @@ -49,20 +49,18 @@ impl ReplicationService { /// received. pub async fn spawn_start( zenoh_session: Arc, - storage_service: &StorageService, + storage_service: Arc, storage_key_expr: OwnedKeyExpr, replication_log: Arc>, latest_updates: Arc>, mut rx: Receiver, ) { - let storage = storage_service.storage.clone(); - let replication = Replication { zenoh_session, replication_log, storage_key_expr, latest_updates, - storage, + storage_service, }; if replication diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs index 3e5a2e1f09..94e3ef576b 100644 --- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs +++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs @@ -122,15 +122,17 @@ pub(crate) async fn create_and_start_storage( // target any Storage that matches the same key expression, regardless of if they have // been configured to be replicated. tokio::task::spawn(async move { - let storage_service = StorageService::new( - zenoh_session.clone(), - config.clone(), - &name, - storage, - capability, - CacheLatest::new(latest_updates.clone(), replication_log.clone()), - ) - .await; + let storage_service = Arc::new( + StorageService::new( + zenoh_session.clone(), + config.clone(), + &name, + storage, + capability, + CacheLatest::new(latest_updates.clone(), replication_log.clone()), + ) + .await, + ); // Testing if the `replication_log` is set is equivalent to testing if the `replication` is // set: the `replication_log` is only set when the latter is. @@ -143,7 +145,7 @@ pub(crate) async fn create_and_start_storage( ReplicationService::spawn_start( zenoh_session, - &storage_service, + storage_service.clone(), config.key_expr, replication_log, latest_updates, diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs index 5b2072cdc9..a3003e1a1e 100644 --- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs +++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs @@ -45,7 +45,7 @@ use zenoh_backend_traits::{ use super::LatestUpdates; use crate::{ - replication::Event, + replication::{Action, Event}, storages_mgt::{CacheLatest, StorageMessage}, }; @@ -53,20 +53,38 @@ pub const WILDCARD_UPDATES_FILENAME: &str = "wildcard_updates"; pub const TOMBSTONE_FILENAME: &str = "tombstones"; #[derive(Clone)] -struct Update { +pub(crate) struct Update { kind: SampleKind, data: StoredData, } +impl Update { + pub(crate) fn timestamp(&self) -> &Timestamp { + &self.data.timestamp + } + + pub(crate) fn kind(&self) -> SampleKind { + self.kind + } + + pub(crate) fn value(&self) -> &Value { + &self.data.value + } + + pub(crate) fn into_value(self) -> Value { + self.data.value + } +} + #[derive(Clone)] pub struct StorageService { session: Arc, - configuration: StorageConfig, + pub(crate) configuration: StorageConfig, name: String, pub(crate) storage: Arc>>, capability: Capability, tombstones: Arc>>, - wildcard_updates: Arc>>, + pub(crate) wildcard_updates: Arc>>, cache_latest: CacheLatest, } @@ -120,7 +138,10 @@ impl StorageService { storage_service } - pub(crate) async fn start_storage_queryable_subscriber(self, mut rx: Receiver) { + pub(crate) async fn start_storage_queryable_subscriber( + self: Arc, + mut rx: Receiver, + ) { // start periodic GC event let t = Timer::default(); @@ -232,7 +253,26 @@ impl StorageService { // if wildcard, update wildcard_updates if sample.key_expr().is_wild() { - self.register_wildcard_update(sample.clone()).await; + let sample_key_expr: OwnedKeyExpr = sample.key_expr().clone().into(); + self.register_wildcard_update( + sample_key_expr.clone(), + sample.kind(), + *sample_timestamp, + sample.clone(), + ) + .await; + + self.cache_latest.latest_updates.write().await.insert( + Some(sample_key_expr.clone()), + Event::new( + Some(sample_key_expr.clone()), + *sample_timestamp, + match sample.kind() { + SampleKind::Put => Action::WildcardPut(sample_key_expr), + SampleKind::Delete => Action::WildcardDelete(sample_key_expr), + }, + ), + ); } let matching_keys = if sample.key_expr().is_wild() { @@ -383,17 +423,34 @@ impl StorageService { } } - async fn register_wildcard_update(&self, sample: Sample) { + /// Registers a Wildcard Update, storing it in a dedicated in-memory structure and on disk if + /// the Storage persistence capability is set to `Durable`. + /// + /// The `key_expr` and `timestamp` cannot be extracted from the received Sample when aligning + /// and hence must be manually passed. + /// + /// # ⚠️ Cache with Replication + /// + /// It is the *responsibility of the caller* to insert a Wildcard Update event in the Cache. If + /// the Replication is enabled, depending on where this method is called, the event should + /// either be inserted in the Cache (to be later added in the Replication Log) or in the + /// Replication Log. + pub(crate) async fn register_wildcard_update( + &self, + key_expr: OwnedKeyExpr, + kind: SampleKind, + timestamp: Timestamp, + value: impl Into, + ) { + tracing::trace!("Registering Wildcard Update: < {key_expr} >"); // @TODO: change into a better store that does incremental writes - let key = sample.key_expr().clone(); let mut wildcards = self.wildcard_updates.write().await; - let timestamp = *sample.timestamp().unwrap(); wildcards.insert( - &key, + &key_expr, Update { - kind: sample.kind(), + kind, data: StoredData { - value: Value::from(sample), + value: value.into(), timestamp, }, }, @@ -420,7 +477,8 @@ impl StorageService { weight.is_some() && weight.unwrap() > timestamp } - async fn overriding_wild_update( + // Returns an [Update] if the provided key expression is overridden by a Wildcard Update. + pub(crate) async fn overriding_wild_update( &self, key_expr: &OwnedKeyExpr, timestamp: &Timestamp,