Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(storage-manager): improve documentation of Replication #1750

Merged
merged 1 commit into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ impl Interval {

/// Returns an [HashMap] of the index and [Fingerprint] of all the [SubInterval]s contained in
/// this [Interval].
//
// This is a convenience method used to compute the Digest and an AlignmentReply.
pub(crate) fn sub_intervals_fingerprints(&self) -> HashMap<SubIntervalIdx, Fingerprint> {
self.sub_intervals
.iter()
Expand All @@ -174,7 +176,7 @@ impl Interval {
/// As its name indicates, this method DOES NOT CHECK if there is another [Event] associated to
/// the same key expression (regardless of its [Timestamp]).
///
/// This uniqueness property (i.e. there should only be a single [Event] in the replication Log
/// This uniqueness property (i.e. there should only be a single [Event] in the Replication Log
/// for a given key expression) cannot be enforced at the [Interval] level. Hence, this method
/// assumes the check has already been performed and thus does not do redundant work.
pub(crate) fn insert_unchecked(&mut self, sub_interval_idx: SubIntervalIdx, event: Event) {
Expand Down Expand Up @@ -220,6 +222,9 @@ impl Interval {
result
}

/// Removes and returns, if found, the [Event] having the provided [EventMetadata].
///
/// The fingerprint of the Interval will be updated accordingly.
pub(crate) fn remove_event(
&mut self,
sub_interval_idx: &SubIntervalIdx,
Expand Down Expand Up @@ -422,6 +427,9 @@ impl SubInterval {
}
}

/// Removes and returns, if found, the [Event] having the same [EventMetadata].
///
/// The Fingerprint of the SubInterval is updated accordingly.
fn remove_event(&mut self, event_to_remove: &EventMetadata) -> Option<Event> {
let removed_event = self.events.remove(&event_to_remove.log_key());
if let Some(event) = &removed_event {
Expand All @@ -438,6 +446,8 @@ impl SubInterval {
/// is where the Wildcard Update should be recorded.
/// It is only in that specific scenario that we are not sure that all [Event]s have a lower
/// timestamp.
///
/// The Fingerprint of the [SubInterval] is updated accordingly.
fn remove_events_overridden_by_wildcard_update(
&mut self,
prefix: Option<&OwnedKeyExpr>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ impl Configuration {
}
}

/// Returns the `prefix`, if one is set, that is stripped before keys are stored in the Storage.
///
/// This corresponds to the `strip_prefix` configuration parameter of the Storage.
///
/// TODO Rename this field and method to `strip_prefix` for consistency.
pub fn prefix(&self) -> Option<&OwnedKeyExpr> {
self.prefix.as_ref()
}
Expand Down Expand Up @@ -120,8 +125,8 @@ impl Configuration {
Ok(IntervalIdx(last_elapsed_interval as u64))
}

/// Returns the index of the lowest interval contained in the *hot* era, assuming that the
/// highest interval contained in the *hot* era is the one provided.
/// Returns the index of the lowest interval contained in the *Hot* Era, assuming that the
/// highest interval contained in the *Hot* Era is the one provided.
///
/// # Example
///
Expand All @@ -139,8 +144,8 @@ impl Configuration {
(*hot_era_upper_bound - self.hot + 1).into()
}

/// Returns the index of the lowest interval contained in the *warm* era, assuming that the
/// highest interval contained in the *hot* era is the one provided.
/// Returns the index of the lowest interval contained in the *Warm* Era, assuming that the
/// highest interval contained in the *Hot* Era is the one provided.
///
/// ⚠️ Note that, even though this method computes the lower bound of the WARM era, the index
/// provided is the upper bound of the HOT era.
Expand All @@ -162,7 +167,7 @@ impl Configuration {
(*hot_era_upper_bound - self.hot - self.warm + 1).into()
}

/// Returns the time classification — [Interval] and [SubInterval] — of the provided
/// Returns the time classification — i.e. [Interval] and [SubInterval] — of the provided
/// [Timestamp].
///
/// # Errors
Expand Down
39 changes: 24 additions & 15 deletions plugins/zenoh-plugin-storage-manager/src/replication/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,8 @@ impl Replication {
///
/// # Replica discovery
///
/// To discover a Replica, this method will create a Digest subscriber, wait to receive a
/// *valid* Digest and, upon reception, ask that Replica for all its entries.
///
/// To avoid waiting indefinitely (in case there are no other Replica on the network), the
/// subscriber will wait for, at most, the duration of two Intervals.
/// To discover a Replica, this method will craft a specific [AlignmentQuery] using the
/// [Discovery] variant.
pub(crate) async fn initial_alignment(&self) {
let ke_all_replicas = match keformat!(
aligner_key_expr_formatter::formatter(),
Expand Down Expand Up @@ -160,7 +157,7 @@ impl Replication {
}
};

// We have no control over when a replica is going to be started. The purpose is here
// We have no control over when a replica is going to be started. The purpose here
// is to try to align its publications and make it so that they happen more or less
// at every interval (+ δ).
let duration_until_next_interval = {
Expand Down Expand Up @@ -278,9 +275,9 @@ impl Replication {

/// Spawns a task that subscribes to the [Digest] published by other Replicas.
///
/// Upon reception of a [Digest], it is compared with the local Replication Log. If this
/// comparison generates a [DigestDiff], the Aligner of the Replica that generated the [Digest]
/// that was processed is queried to start an alignment.
/// Upon reception of a [Digest], the local Digest is retrieved and both are compared. If this
/// comparison generates a [DigestDiff], the Aligner of the remote Replica is queried to start
/// an alignment.
///
/// [DigestDiff]: super::digest::DigestDiff
pub(crate) fn spawn_digest_subscriber(&self) -> JoinHandle<()> {
Expand Down Expand Up @@ -411,10 +408,10 @@ impl Replication {

/// Spawns a task that handles alignment queries.
///
/// An alignment query will always come from a Replica. Hence, as multiple Replicas could query
/// at the same time, for each received query a new task is spawned. This newly spawned task is
/// responsible for fetching in the Replication Log or in the Storage the relevant information
/// to send to the Replica such that it can align its own Storage.
/// An alignment query will always come from a remote Replica. As multiple remote Replicas could
/// query at the same time, a new task is spawned for each received query. This newly spawned
/// task is responsible for fetching in the Replication Log or in the Storage the relevant
/// information to send to the remote Replica such that it can align its own Storage.
pub(crate) fn spawn_aligner_queryable(&self) -> JoinHandle<()> {
let replication = self.clone();

Expand Down Expand Up @@ -471,7 +468,7 @@ impl Replication {
})
}

/// Spawns a new task to query the Aligner of the Replica which potentially has data this
/// Spawns a new task to query the Aligner of the remote Replica which potentially has data this
/// Storage is missing.
///
/// This method will:
Expand All @@ -481,7 +478,7 @@ impl Replication {
/// 3. Process all replies.
///
/// Note that the processing of a reply can trigger a new query (requesting additional
/// information), spawning a new task.
/// information), consequently spawning a new task.
///
/// This process is stateless and all the required information are carried in the query / reply.
pub(crate) fn spawn_query_replica_aligner(
Expand Down Expand Up @@ -580,6 +577,18 @@ impl Replication {
}
}

/// This function will search through the `events` structure and remove all event(s) that are
/// "impacted" by the wildcard.
///
/// An event should be removed if:
/// 1. The key expression of the wildcard, `wildcard_ke`, contains its key expression.
/// 2. The timestamp of the event is older than the timestamp of the wildcard.
/// 3. Their respective actions are "compatible": in particular, a Wildcard Put cannot "resuscitate"
/// a deleted key expression. See the comments within this function for other special cases that
/// need to be taken into consideration.
///
/// NOTE: This function is used to process both the `latest_updates` structure and the Replication
/// Log. Given that their structures are identical, the code was factored out and put here.
pub(crate) fn remove_events_overridden_by_wildcard_update(
events: &mut HashMap<LogLatestKey, Event>,
prefix: Option<&OwnedKeyExpr>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,23 @@ use crate::replication::{
///
/// A divergence in the Hot era, will directly let the Replica assess which [SubInterval]s it needs,
/// hence directly skipping to the `SubIntervals` variant.
///
/// The `Discovery` and `All` variants are used to perform the initial alignment. After receiving a
/// `Discovery` Query, a Replica will reply with its Zenoh ID. The Replica that replied first will
/// then receive an `All` Query to transfer all its content.
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
pub(crate) enum AlignmentQuery {
/// Ask Replica for their Zenoh ID to perform an initial alignment.
Discovery,
/// Retrieve all the content of a Replica.
All,
/// First alignment Query after detecting a potential misalignment.
Diff(DigestDiff),
/// Request the Fingerprint(s) of the Sub-Interval(s) contained in the provided Interval(s).
Intervals(HashSet<IntervalIdx>),
/// Request the EventMetadata contained in the provided Sub-Interval(s).
SubIntervals(HashMap<IntervalIdx, HashSet<SubIntervalIdx>>),
/// Request the Payload associated with the provided EventMetadata.
Events(Vec<EventMetadata>),
}

Expand Down Expand Up @@ -161,20 +171,19 @@ impl Replication {
}
}

/// Replies to the provided [Query] with a hash map containing the index of the [Interval] in
/// the Cold era and their [Fingerprint].
/// Replies to the provided [Query] with a hash map containing the index of the [Interval]s in
/// the Cold Era and their [Fingerprint]s.
///
/// The Replica will use this response to assess which [Interval]s differ.
///
/// # Temporality
///
/// There is no guarantee that the Replica indicating a difference in the Cold era is "aligned":
/// it is possible that its Cold era is either ahead or late (i.e. it has more or less
/// Interval(s) in its Replication Log in the Cold era).
/// There is no guarantee that the Replica indicating a difference in the Cold Era is aligned:
/// it is possible that its Cold Era contains a different number of Intervals.
///
/// We believe this is not important: the Replication Log does not separate the Intervals based
/// on their era so performing this comparison will still be relevant — even if an Interval is
/// in the Cold era on one end and in the Warm era in the other.
/// on their Era so performing this comparison will still be relevant — even if an Interval is
/// in the Cold Era on one end and in the Warm Era in the other.
pub(crate) async fn reply_cold_era(&self, query: &Query) {
let log = self.replication_log.read().await;
let configuration = log.configuration();
Expand All @@ -200,7 +209,7 @@ impl Replication {
reply_to_query(query, reply, None).await;
}

/// Replies to the [Query] with a structure containing, for each interval index present in the
/// Replies to the [Query] with a structure containing, for each Interval index present in the
/// `different_intervals`, all the [SubInterval]s [Fingerprint].
///
/// The Replica will use this structure to assess which [SubInterval]s differ.
Expand Down Expand Up @@ -230,24 +239,6 @@ impl Replication {
///
/// The Replica will use this structure to assess which [Event] (and its associated payload) are
/// missing in its Replication Log and connected Storage.
///
/// # TODO Performance improvement
///
/// Although the Replica we are answering has to find if, for each provided [EventMetadata],
/// there is a more recent one, it does not need to go through all its Replication Log. It only
/// needs, for each [EventMetadata], to go through the Intervals that are greater than the one
/// it is contained in.
///
/// The rationale is that the Intervals are already sorted in increasing order, so if no Event,
/// for the same key expression, can be found in any greater Interval, then by definition the
/// Replication Log does not contain a more recent Event.
///
/// That would require the following changes:
/// - Change the `sub_intervals` field of the `Interval` structure to a BTreeMap.
/// - In the `reply_events_metadata` method (just below), send out a `HashMap<IntervalIdx,
/// HashMap<SubIntervalIdx, HashSet<EventMetadata>>>` instead of a `Vec<EventMetadata>`.
/// - In the `process_alignment_reply` method, implement the searching algorithm described
/// above.
pub(crate) async fn reply_events_metadata(
&self,
query: &Query,
Expand Down
Loading
Loading