Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage-manager): replication log lookup #1566

Merged
merged 2 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,21 @@ pub(crate) enum EventRemoval {
RemovedOlder(Event),
}

/// The `EventLookup` enumeration lists the possible outcomes when searching for an [Event] (with a
/// Timestamp) in the Replication Log.
///
/// The Timestamp allows comparing the [Event] that was found, establishing if it is Older, Newer
/// or Identical.
///
/// The Newer or Identical cases were merged as this enumeration was designed with the
/// `LogLatest::lookup_newer` method in mind, which does not need to distinguish them.
#[derive(Debug, PartialEq, Eq)]
pub(crate) enum EventLookup<'a> {
NotFound,
NewerOrIdentical(&'a Event),
Older,
}

/// An `IntervalIdx` represents the index of an `Interval`.
///
/// It is a thin wrapper around a `u64`.
Expand Down Expand Up @@ -126,8 +141,8 @@ impl Interval {
}

/// Returns an iterator over the [SubInterval]s contained in this `Interval`.
pub(crate) fn sub_intervals(&self) -> impl Iterator<Item = &SubInterval> {
self.sub_intervals.values()
pub(crate) fn sub_intervals(&self) -> impl Iterator<Item = (&SubIntervalIdx, &SubInterval)> {
self.sub_intervals.iter()
}

/// Returns, if one exists, a reference over the [SubInterval] matching the provided
Expand All @@ -139,17 +154,6 @@ impl Interval {
self.sub_intervals.get(sub_interval_idx)
}

/// Lookup the provided key expression and return, if found, its associated [Event].
pub(crate) fn lookup(&self, event_to_lookup: &EventMetadata) -> Option<&Event> {
for sub_interval in self.sub_intervals.values() {
if let Some(event) = sub_interval.lookup(event_to_lookup) {
return Some(event);
}
}

None
}

/// Returns an [HashMap] of the index and [Fingerprint] of all the [SubInterval]s contained in
/// this [Interval].
pub(crate) fn sub_intervals_fingerprints(&self) -> HashMap<SubIntervalIdx, Fingerprint> {
Expand Down Expand Up @@ -397,8 +401,25 @@ impl SubInterval {
EventRemoval::NotFound
}

fn lookup(&self, event_to_lookup: &EventMetadata) -> Option<&Event> {
self.events.get(&event_to_lookup.log_key())
/// Looks up the key expression of the provided `event_to_lookup` in this [SubInterval] and,
/// depending on its timestamp and if an [Event] has been found, returns the [EventLookup].
///
/// If the Event in the Replication Log has the same or a greater timestamp than
/// `event_to_lookup` then `NewerOrIdentical` is returned. If its timestamp is lower then
/// `Older` is returned.
///
/// If this SubInterval contains no Event with the same key expression, `NotFound` is returned.
pub(crate) fn lookup(&self, event_to_lookup: &EventMetadata) -> EventLookup {
match self.events.get(&event_to_lookup.log_key()) {
Some(event) => {
if event.timestamp >= event_to_lookup.timestamp {
EventLookup::NewerOrIdentical(event)
} else {
EventLookup::Older
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure it will never be useful to get the event in case of an EventLookup::Older?
IMO semantically the event exists, we should return it and it should be up to the calling logic to use it or not (unless this has a significant impact on performance).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand your point of view however as that code is only called from the Replication and that, for now, there are no cases where the metadata of an older event is needed, adding it brings no value (YAGNI).

}
}
None => EventLookup::NotFound,
}
}

fn remove_event(&mut self, event_to_remove: &EventMetadata) -> Option<Event> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl Replication {
.intervals
.get(&interval_idx)
{
interval.sub_intervals().for_each(|sub_interval| {
interval.sub_intervals().for_each(|(_, sub_interval)| {
events_to_retrieve.extend(sub_interval.events().map(Into::into));
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,16 +425,7 @@ impl Replication {
return false;
};

let maybe_newer_event = replication_log_guard
.lookup(replica_event)
.and_then(|log_event| {
if log_event.timestamp < replica_event.timestamp {
None
} else {
Some(log_event)
}
})
.cloned();
let maybe_newer_event = replication_log_guard.lookup_newer(replica_event);

match (maybe_wildcard_update, maybe_newer_event) {
// No Event in the Replication Log or Wildcard Update, we go on, we need to process it.
Expand Down Expand Up @@ -491,8 +482,9 @@ impl Replication {
.await;
}

let log_event_metadata = log_event.into();
if let Some(mut removed_event) =
replication_log_guard.remove_event(&(&log_event).into())
replication_log_guard.remove_event(&log_event_metadata)
{
removed_event.set_timestamp_and_action(
replica_event.timestamp,
Expand Down
49 changes: 43 additions & 6 deletions plugins/zenoh-plugin-storage-manager/src/replication/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use zenoh::{key_expr::OwnedKeyExpr, sample::SampleKind, time::Timestamp, Result
use zenoh_backend_traits::config::ReplicaConfig;

use super::{
classification::{EventRemoval, Interval, IntervalIdx},
classification::{EventLookup, EventRemoval, Interval, IntervalIdx},
configuration::Configuration,
digest::{Digest, Fingerprint},
};
Expand Down Expand Up @@ -364,15 +364,52 @@ impl LogLatest {
&self.configuration
}

/// Lookup the provided key expression and, if found, return its associated [Event].
pub fn lookup(&self, event_to_lookup: &EventMetadata) -> Option<&Event> {
/// Returns the [Event] from the Replication Log with a newer Timestamp and the same key
/// expression, or None if no such Event exists in the Replication Log.
///
/// To speed up this lookup, only the intervals that contain Events with a more recent Timestamp
/// are visited.
///
/// # ⚠️ Caveat
///
/// It is not because this method returns `None` that the Replication Log does not contain any
/// Event with the same key expression. It could return None and *still contain* an Event with
/// the same key expression.
pub fn lookup_newer(&self, event_to_lookup: &EventMetadata) -> Option<&Event> {
if !self.bloom_filter_event.check(&event_to_lookup.log_key()) {
return None;
}

for interval in self.intervals.values().rev() {
if let Some(event) = interval.lookup(event_to_lookup) {
return Some(event);
let Ok((event_interval_idx, event_sub_interval_idx)) = self
.configuration
.get_time_classification(&event_to_lookup.timestamp)
else {
tracing::error!(
"Fatal error: failed to compute the time classification of < {event_to_lookup:?} >"
);
return None;
};

for (interval_idx, interval) in self
.intervals
.iter()
.filter(|(&idx, _)| idx >= event_interval_idx)
{
for (_, sub_interval) in interval.sub_intervals().filter(|(&sub_idx, _)| {
// All sub-intervals must be visited except in one Interval: the Interval in which
// the `event_to_lookup` belongs. In this specific Interval, only the SubIntervals
// with a greater index should be visited (lower index means lower timestamp).
if *interval_idx == event_interval_idx {
return sub_idx >= event_sub_interval_idx;
}

true
}) {
match sub_interval.lookup(event_to_lookup) {
EventLookup::NotFound => continue,
EventLookup::NewerOrIdentical(event) => return Some(event),
EventLookup::Older => return None,
}
}
}

Expand Down
11 changes: 7 additions & 4 deletions plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,10 +508,13 @@ impl StorageService {
}

if let Some(replication_log) = &self.cache_latest.replication_log {
if let Some(event) = replication_log.read().await.lookup(new_event) {
if new_event.timestamp <= event.timestamp {
return None;
}
if replication_log
.read()
.await
.lookup_newer(new_event)
.is_some()
{
return None;
}
} else {
let mut storage = self.storage.lock().await;
Expand Down