From aa4bfbaf5496de32a02bc1d23779ba200974f2a0 Mon Sep 17 00:00:00 2001 From: Jose Celano Date: Fri, 12 Apr 2024 15:54:49 +0100 Subject: [PATCH] refactor: segregate command and query for announce request This changes the API of the torrent repository. The method: ``` fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata); ``` is replaced with: ``` fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer); fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option; ``` The performance is not affected. Benchmaring is still using both methods in order to simulate `announce` requests. 1. The interface is simpler (command/query segregation. 2. In the long-term: - Returning swarm metadata in the announce request could be optional. The announce request process would be faster if the tracker does not have to mantain the swarm data. This is not likely to happen becuase the scrape request needs this metadata. - New repository performance improvements could be implemented. This allow decoupling peer lists from swarm metadata. The repository internally can have two data strcutures one for the peer list and another for the swarm metatada. Both using different locks. --- .../benches/helpers/asyn.rs | 34 +++++------ .../benches/helpers/sync.rs | 22 +++++-- packages/torrent-repository/src/entry/mod.rs | 20 ++---- .../torrent-repository/src/entry/mutex_std.rs | 14 ++--- .../src/entry/mutex_tokio.rs | 12 ++-- .../torrent-repository/src/entry/single.rs | 10 +-- .../src/repository/dash_map_mutex_std.rs | 16 ++--- .../torrent-repository/src/repository/mod.rs | 10 ++- .../src/repository/rw_lock_std.rs | 10 ++- .../src/repository/rw_lock_std_mutex_std.rs | 12 +++- .../src/repository/rw_lock_std_mutex_tokio.rs | 16 ++++- .../src/repository/rw_lock_tokio.rs | 11 +++- .../src/repository/rw_lock_tokio_mutex_std.rs | 11 +++- .../repository/rw_lock_tokio_mutex_tokio.rs | 14 ++++- .../src/repository/skip_map_mutex_std.rs | 10 ++- .../torrent-repository/tests/common/repo.rs | 43 +++++++------ .../tests/common/torrent.rs | 22 +++---- .../torrent-repository/tests/entry/mod.rs | 41 +++++++------ .../tests/repository/mod.rs | 40 ++++++++---- src/core/mod.rs | 61 +++++++++++-------- src/core/services/torrent.rs | 38 ++++-------- src/servers/udp/handlers.rs | 12 +--- tests/servers/api/environment.rs | 2 +- tests/servers/http/environment.rs | 2 +- tests/servers/udp/environment.rs | 2 +- 25 files changed, 259 insertions(+), 226 deletions(-) diff --git a/packages/torrent-repository/benches/helpers/asyn.rs b/packages/torrent-repository/benches/helpers/asyn.rs index 80f70cdc2..1c6d9d915 100644 --- a/packages/torrent-repository/benches/helpers/asyn.rs +++ b/packages/torrent-repository/benches/helpers/asyn.rs @@ -18,9 +18,9 @@ where let info_hash = InfoHash([0; 20]); - torrent_repository - .update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER) - .await; + torrent_repository.upsert_peer(&info_hash, &DEFAULT_PEER).await; + + torrent_repository.get_swarm_metadata(&info_hash).await; } start.elapsed() @@ -37,9 +37,9 @@ where let handles = FuturesUnordered::new(); // Add the torrent/peer to the torrent repository - torrent_repository - .update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER) - .await; + torrent_repository.upsert_peer(info_hash, &DEFAULT_PEER).await; + + torrent_repository.get_swarm_metadata(info_hash).await; let start = Instant::now(); @@ -47,9 +47,9 @@ where let torrent_repository_clone = torrent_repository.clone(); let handle = runtime.spawn(async move { - torrent_repository_clone - .update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER) - .await; + torrent_repository_clone.upsert_peer(info_hash, &DEFAULT_PEER).await; + + torrent_repository_clone.get_swarm_metadata(info_hash).await; if let Some(sleep_time) = sleep { let start_time = std::time::Instant::now(); @@ -87,9 +87,9 @@ where let torrent_repository_clone = torrent_repository.clone(); let handle = runtime.spawn(async move { - torrent_repository_clone - .update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER) - .await; + torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER).await; + + torrent_repository_clone.get_swarm_metadata(&info_hash).await; if let Some(sleep_time) = sleep { let start_time = std::time::Instant::now(); @@ -123,9 +123,8 @@ where // Add the torrents/peers to the torrent repository for info_hash in &info_hashes { - torrent_repository - .update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER) - .await; + torrent_repository.upsert_peer(info_hash, &DEFAULT_PEER).await; + torrent_repository.get_swarm_metadata(info_hash).await; } let start = Instant::now(); @@ -134,9 +133,8 @@ where let torrent_repository_clone = torrent_repository.clone(); let handle = runtime.spawn(async move { - torrent_repository_clone - .update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER) - .await; + torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER).await; + torrent_repository_clone.get_swarm_metadata(&info_hash).await; if let Some(sleep_time) = sleep { let start_time = std::time::Instant::now(); diff --git a/packages/torrent-repository/benches/helpers/sync.rs b/packages/torrent-repository/benches/helpers/sync.rs index 0523f4141..63fccfc77 100644 --- a/packages/torrent-repository/benches/helpers/sync.rs +++ b/packages/torrent-repository/benches/helpers/sync.rs @@ -20,7 +20,9 @@ where let info_hash = InfoHash([0; 20]); - torrent_repository.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER); + torrent_repository.upsert_peer(&info_hash, &DEFAULT_PEER); + + torrent_repository.get_swarm_metadata(&info_hash); } start.elapsed() @@ -37,7 +39,9 @@ where let handles = FuturesUnordered::new(); // Add the torrent/peer to the torrent repository - torrent_repository.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER); + torrent_repository.upsert_peer(info_hash, &DEFAULT_PEER); + + torrent_repository.get_swarm_metadata(info_hash); let start = Instant::now(); @@ -45,7 +49,9 @@ where let torrent_repository_clone = torrent_repository.clone(); let handle = runtime.spawn(async move { - torrent_repository_clone.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER); + torrent_repository_clone.upsert_peer(info_hash, &DEFAULT_PEER); + + torrent_repository_clone.get_swarm_metadata(info_hash); if let Some(sleep_time) = sleep { let start_time = std::time::Instant::now(); @@ -83,7 +89,9 @@ where let torrent_repository_clone = torrent_repository.clone(); let handle = runtime.spawn(async move { - torrent_repository_clone.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER); + torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER); + + torrent_repository_clone.get_swarm_metadata(&info_hash); if let Some(sleep_time) = sleep { let start_time = std::time::Instant::now(); @@ -117,7 +125,8 @@ where // Add the torrents/peers to the torrent repository for info_hash in &info_hashes { - torrent_repository.update_torrent_with_peer_and_get_stats(info_hash, &DEFAULT_PEER); + torrent_repository.upsert_peer(info_hash, &DEFAULT_PEER); + torrent_repository.get_swarm_metadata(info_hash); } let start = Instant::now(); @@ -126,7 +135,8 @@ where let torrent_repository_clone = torrent_repository.clone(); let handle = runtime.spawn(async move { - torrent_repository_clone.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER); + torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER); + torrent_repository_clone.get_swarm_metadata(&info_hash); if let Some(sleep_time) = sleep { let start_time = std::time::Instant::now(); diff --git a/packages/torrent-repository/src/entry/mod.rs b/packages/torrent-repository/src/entry/mod.rs index 4c39af829..d72ff254b 100644 --- a/packages/torrent-repository/src/entry/mod.rs +++ b/packages/torrent-repository/src/entry/mod.rs @@ -15,7 +15,7 @@ pub trait Entry { /// It returns the swarm metadata (statistics) as a struct: /// /// `(seeders, completed, leechers)` - fn get_stats(&self) -> SwarmMetadata; + fn get_swarm_metadata(&self) -> SwarmMetadata; /// Returns True if Still a Valid Entry according to the Tracker Policy fn is_good(&self, policy: &TrackerPolicy) -> bool; @@ -40,10 +40,7 @@ pub trait Entry { /// /// The number of peers that have complete downloading is synchronously updated when peers are updated. /// That's the total torrent downloads counter. - fn insert_or_update_peer(&mut self, peer: &peer::Peer) -> bool; - - // It preforms a combined operation of `insert_or_update_peer` and `get_stats`. - fn insert_or_update_peer_and_get_stats(&mut self, peer: &peer::Peer) -> (bool, SwarmMetadata); + fn upsert_peer(&mut self, peer: &peer::Peer) -> bool; /// It removes peer from the swarm that have not been updated for more than `current_cutoff` seconds fn remove_inactive_peers(&mut self, current_cutoff: DurationSinceUnixEpoch); @@ -51,20 +48,19 @@ pub trait Entry { #[allow(clippy::module_name_repetitions)] pub trait EntrySync { - fn get_stats(&self) -> SwarmMetadata; + fn get_swarm_metadata(&self) -> SwarmMetadata; fn is_good(&self, policy: &TrackerPolicy) -> bool; fn peers_is_empty(&self) -> bool; fn get_peers_len(&self) -> usize; fn get_peers(&self, limit: Option) -> Vec>; fn get_peers_for_client(&self, client: &SocketAddr, limit: Option) -> Vec>; - fn insert_or_update_peer(&self, peer: &peer::Peer) -> bool; - fn insert_or_update_peer_and_get_stats(&self, peer: &peer::Peer) -> (bool, SwarmMetadata); + fn upsert_peer(&self, peer: &peer::Peer) -> bool; fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch); } #[allow(clippy::module_name_repetitions)] pub trait EntryAsync { - fn get_stats(&self) -> impl std::future::Future + Send; + fn get_swarm_metadata(&self) -> impl std::future::Future + Send; fn check_good(self, policy: &TrackerPolicy) -> impl std::future::Future + Send; fn peers_is_empty(&self) -> impl std::future::Future + Send; fn get_peers_len(&self) -> impl std::future::Future + Send; @@ -74,11 +70,7 @@ pub trait EntryAsync { client: &SocketAddr, limit: Option, ) -> impl std::future::Future>> + Send; - fn insert_or_update_peer(self, peer: &peer::Peer) -> impl std::future::Future + Send; - fn insert_or_update_peer_and_get_stats( - self, - peer: &peer::Peer, - ) -> impl std::future::Future + std::marker::Send; + fn upsert_peer(self, peer: &peer::Peer) -> impl std::future::Future + Send; fn remove_inactive_peers(self, current_cutoff: DurationSinceUnixEpoch) -> impl std::future::Future + Send; } diff --git a/packages/torrent-repository/src/entry/mutex_std.rs b/packages/torrent-repository/src/entry/mutex_std.rs index b4b823909..990d8ab76 100644 --- a/packages/torrent-repository/src/entry/mutex_std.rs +++ b/packages/torrent-repository/src/entry/mutex_std.rs @@ -9,8 +9,8 @@ use super::{Entry, EntrySync}; use crate::{EntryMutexStd, EntrySingle}; impl EntrySync for EntryMutexStd { - fn get_stats(&self) -> SwarmMetadata { - self.lock().expect("it should get a lock").get_stats() + fn get_swarm_metadata(&self) -> SwarmMetadata { + self.lock().expect("it should get a lock").get_swarm_metadata() } fn is_good(&self, policy: &TrackerPolicy) -> bool { @@ -33,14 +33,8 @@ impl EntrySync for EntryMutexStd { self.lock().expect("it should get lock").get_peers_for_client(client, limit) } - fn insert_or_update_peer(&self, peer: &peer::Peer) -> bool { - self.lock().expect("it should lock the entry").insert_or_update_peer(peer) - } - - fn insert_or_update_peer_and_get_stats(&self, peer: &peer::Peer) -> (bool, SwarmMetadata) { - self.lock() - .expect("it should lock the entry") - .insert_or_update_peer_and_get_stats(peer) + fn upsert_peer(&self, peer: &peer::Peer) -> bool { + self.lock().expect("it should lock the entry").upsert_peer(peer) } fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) { diff --git a/packages/torrent-repository/src/entry/mutex_tokio.rs b/packages/torrent-repository/src/entry/mutex_tokio.rs index 34f4a4e92..c5363e51a 100644 --- a/packages/torrent-repository/src/entry/mutex_tokio.rs +++ b/packages/torrent-repository/src/entry/mutex_tokio.rs @@ -9,8 +9,8 @@ use super::{Entry, EntryAsync}; use crate::{EntryMutexTokio, EntrySingle}; impl EntryAsync for EntryMutexTokio { - async fn get_stats(&self) -> SwarmMetadata { - self.lock().await.get_stats() + async fn get_swarm_metadata(&self) -> SwarmMetadata { + self.lock().await.get_swarm_metadata() } async fn check_good(self, policy: &TrackerPolicy) -> bool { @@ -33,12 +33,8 @@ impl EntryAsync for EntryMutexTokio { self.lock().await.get_peers_for_client(client, limit) } - async fn insert_or_update_peer(self, peer: &peer::Peer) -> bool { - self.lock().await.insert_or_update_peer(peer) - } - - async fn insert_or_update_peer_and_get_stats(self, peer: &peer::Peer) -> (bool, SwarmMetadata) { - self.lock().await.insert_or_update_peer_and_get_stats(peer) + async fn upsert_peer(self, peer: &peer::Peer) -> bool { + self.lock().await.upsert_peer(peer) } async fn remove_inactive_peers(self, current_cutoff: DurationSinceUnixEpoch) { diff --git a/packages/torrent-repository/src/entry/single.rs b/packages/torrent-repository/src/entry/single.rs index c1041e9a2..a38b54023 100644 --- a/packages/torrent-repository/src/entry/single.rs +++ b/packages/torrent-repository/src/entry/single.rs @@ -12,7 +12,7 @@ use crate::EntrySingle; impl Entry for EntrySingle { #[allow(clippy::cast_possible_truncation)] - fn get_stats(&self) -> SwarmMetadata { + fn get_swarm_metadata(&self) -> SwarmMetadata { let complete: u32 = self.peers.values().filter(|peer| peer.is_seeder()).count() as u32; let incomplete: u32 = self.peers.len() as u32 - complete; @@ -70,7 +70,7 @@ impl Entry for EntrySingle { } } - fn insert_or_update_peer(&mut self, peer: &peer::Peer) -> bool { + fn upsert_peer(&mut self, peer: &peer::Peer) -> bool { let mut downloaded_stats_updated: bool = false; match peer::ReadInfo::get_event(peer) { @@ -93,12 +93,6 @@ impl Entry for EntrySingle { downloaded_stats_updated } - fn insert_or_update_peer_and_get_stats(&mut self, peer: &peer::Peer) -> (bool, SwarmMetadata) { - let changed = self.insert_or_update_peer(peer); - let stats = self.get_stats(); - (changed, stats) - } - fn remove_inactive_peers(&mut self, current_cutoff: DurationSinceUnixEpoch) { self.peers .retain(|_, peer| peer::ReadInfo::get_updated(peer) > current_cutoff); diff --git a/packages/torrent-repository/src/repository/dash_map_mutex_std.rs b/packages/torrent-repository/src/repository/dash_map_mutex_std.rs index 67c47973e..b398b09dc 100644 --- a/packages/torrent-repository/src/repository/dash_map_mutex_std.rs +++ b/packages/torrent-repository/src/repository/dash_map_mutex_std.rs @@ -23,19 +23,21 @@ where EntryMutexStd: EntrySync, EntrySingle: Entry, { - fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) { + fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { if let Some(entry) = self.torrents.get(info_hash) { - entry.insert_or_update_peer_and_get_stats(peer) + entry.upsert_peer(peer); } else { let _unused = self.torrents.insert(*info_hash, Arc::default()); - - match self.torrents.get(info_hash) { - Some(entry) => entry.insert_or_update_peer_and_get_stats(peer), - None => (false, SwarmMetadata::zeroed()), + if let Some(entry) = self.torrents.get(info_hash) { + entry.upsert_peer(peer); } } } + fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { + self.torrents.get(info_hash).map(|entry| entry.value().get_swarm_metadata()) + } + fn get(&self, key: &InfoHash) -> Option { let maybe_entry = self.torrents.get(key); maybe_entry.map(|entry| entry.clone()) @@ -45,7 +47,7 @@ where let mut metrics = TorrentsMetrics::default(); for entry in &self.torrents { - let stats = entry.value().lock().expect("it should get a lock").get_stats(); + let stats = entry.value().lock().expect("it should get a lock").get_swarm_metadata(); metrics.complete += u64::from(stats.complete); metrics.downloaded += u64::from(stats.downloaded); metrics.incomplete += u64::from(stats.incomplete); diff --git a/packages/torrent-repository/src/repository/mod.rs b/packages/torrent-repository/src/repository/mod.rs index c7c64c54a..f198288f8 100644 --- a/packages/torrent-repository/src/repository/mod.rs +++ b/packages/torrent-repository/src/repository/mod.rs @@ -24,7 +24,8 @@ pub trait Repository: Debug + Default + Sized + 'static { fn remove(&self, key: &InfoHash) -> Option; fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch); fn remove_peerless_torrents(&self, policy: &TrackerPolicy); - fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata); + fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer); + fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option; } #[allow(clippy::module_name_repetitions)] @@ -36,9 +37,6 @@ pub trait RepositoryAsync: Debug + Default + Sized + 'static { fn remove(&self, key: &InfoHash) -> impl std::future::Future> + Send; fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) -> impl std::future::Future + Send; fn remove_peerless_torrents(&self, policy: &TrackerPolicy) -> impl std::future::Future + Send; - fn update_torrent_with_peer_and_get_stats( - &self, - info_hash: &InfoHash, - peer: &peer::Peer, - ) -> impl std::future::Future + Send; + fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) -> impl std::future::Future + Send; + fn get_swarm_metadata(&self, info_hash: &InfoHash) -> impl std::future::Future> + Send; } diff --git a/packages/torrent-repository/src/repository/rw_lock_std.rs b/packages/torrent-repository/src/repository/rw_lock_std.rs index e9074a271..af48428e4 100644 --- a/packages/torrent-repository/src/repository/rw_lock_std.rs +++ b/packages/torrent-repository/src/repository/rw_lock_std.rs @@ -47,12 +47,16 @@ impl Repository for TorrentsRwLockStd where EntrySingle: Entry, { - fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) { + fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let mut db = self.get_torrents_mut(); let entry = db.entry(*info_hash).or_insert(EntrySingle::default()); - entry.insert_or_update_peer_and_get_stats(peer) + entry.upsert_peer(peer); + } + + fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { + self.get(info_hash).map(|entry| entry.get_swarm_metadata()) } fn get(&self, key: &InfoHash) -> Option { @@ -64,7 +68,7 @@ where let mut metrics = TorrentsMetrics::default(); for entry in self.get_torrents().values() { - let stats = entry.get_stats(); + let stats = entry.get_swarm_metadata(); metrics.complete += u64::from(stats.complete); metrics.downloaded += u64::from(stats.downloaded); metrics.incomplete += u64::from(stats.incomplete); diff --git a/packages/torrent-repository/src/repository/rw_lock_std_mutex_std.rs b/packages/torrent-repository/src/repository/rw_lock_std_mutex_std.rs index 0b65234e3..74cdc4475 100644 --- a/packages/torrent-repository/src/repository/rw_lock_std_mutex_std.rs +++ b/packages/torrent-repository/src/repository/rw_lock_std_mutex_std.rs @@ -33,7 +33,7 @@ where EntryMutexStd: EntrySync, EntrySingle: Entry, { - fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) { + fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let maybe_entry = self.get_torrents().get(info_hash).cloned(); let entry = if let Some(entry) = maybe_entry { @@ -44,7 +44,13 @@ where entry.clone() }; - entry.insert_or_update_peer_and_get_stats(peer) + entry.upsert_peer(peer); + } + + fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { + self.get_torrents() + .get(info_hash) + .map(super::super::entry::EntrySync::get_swarm_metadata) } fn get(&self, key: &InfoHash) -> Option { @@ -56,7 +62,7 @@ where let mut metrics = TorrentsMetrics::default(); for entry in self.get_torrents().values() { - let stats = entry.lock().expect("it should get a lock").get_stats(); + let stats = entry.lock().expect("it should get a lock").get_swarm_metadata(); metrics.complete += u64::from(stats.complete); metrics.downloaded += u64::from(stats.downloaded); metrics.incomplete += u64::from(stats.incomplete); diff --git a/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs b/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs index 5394abb6a..83ac02c91 100644 --- a/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs +++ b/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs @@ -37,7 +37,7 @@ where EntryMutexTokio: EntryAsync, EntrySingle: Entry, { - async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) { + async fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let maybe_entry = self.get_torrents().get(info_hash).cloned(); let entry = if let Some(entry) = maybe_entry { @@ -48,8 +48,18 @@ where entry.clone() }; - entry.insert_or_update_peer_and_get_stats(peer).await + entry.upsert_peer(peer).await; } + + async fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { + let maybe_entry = self.get_torrents().get(info_hash).cloned(); + + match maybe_entry { + Some(entry) => Some(entry.get_swarm_metadata().await), + None => None, + } + } + async fn get(&self, key: &InfoHash) -> Option { let db = self.get_torrents(); db.get(key).cloned() @@ -75,7 +85,7 @@ where let entries: Vec<_> = self.get_torrents().values().cloned().collect(); for entry in entries { - let stats = entry.lock().await.get_stats(); + let stats = entry.lock().await.get_swarm_metadata(); metrics.complete += u64::from(stats.complete); metrics.downloaded += u64::from(stats.downloaded); metrics.incomplete += u64::from(stats.incomplete); diff --git a/packages/torrent-repository/src/repository/rw_lock_tokio.rs b/packages/torrent-repository/src/repository/rw_lock_tokio.rs index d84074eaf..b95f1e31e 100644 --- a/packages/torrent-repository/src/repository/rw_lock_tokio.rs +++ b/packages/torrent-repository/src/repository/rw_lock_tokio.rs @@ -51,13 +51,18 @@ impl RepositoryAsync for TorrentsRwLockTokio where EntrySingle: Entry, { - async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) { + async fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let mut db = self.get_torrents_mut().await; let entry = db.entry(*info_hash).or_insert(EntrySingle::default()); - entry.insert_or_update_peer_and_get_stats(peer) + entry.upsert_peer(peer); } + + async fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { + self.get(info_hash).await.map(|entry| entry.get_swarm_metadata()) + } + async fn get(&self, key: &InfoHash) -> Option { let db = self.get_torrents().await; db.get(key).cloned() @@ -81,7 +86,7 @@ where let mut metrics = TorrentsMetrics::default(); for entry in self.get_torrents().await.values() { - let stats = entry.get_stats(); + let stats = entry.get_swarm_metadata(); metrics.complete += u64::from(stats.complete); metrics.downloaded += u64::from(stats.downloaded); metrics.incomplete += u64::from(stats.incomplete); diff --git a/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_std.rs b/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_std.rs index fbbc51a09..bde959940 100644 --- a/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_std.rs +++ b/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_std.rs @@ -35,7 +35,7 @@ where EntryMutexStd: EntrySync, EntrySingle: Entry, { - async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) { + async fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let maybe_entry = self.get_torrents().await.get(info_hash).cloned(); let entry = if let Some(entry) = maybe_entry { @@ -46,8 +46,13 @@ where entry.clone() }; - entry.insert_or_update_peer_and_get_stats(peer) + entry.upsert_peer(peer); } + + async fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { + self.get(info_hash).await.map(|entry| entry.get_swarm_metadata()) + } + async fn get(&self, key: &InfoHash) -> Option { let db = self.get_torrents().await; db.get(key).cloned() @@ -71,7 +76,7 @@ where let mut metrics = TorrentsMetrics::default(); for entry in self.get_torrents().await.values() { - let stats = entry.get_stats(); + let stats = entry.get_swarm_metadata(); metrics.complete += u64::from(stats.complete); metrics.downloaded += u64::from(stats.downloaded); metrics.incomplete += u64::from(stats.incomplete); diff --git a/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs b/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs index bc7fd61e8..1d002e317 100644 --- a/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs +++ b/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs @@ -35,7 +35,7 @@ where EntryMutexTokio: EntryAsync, EntrySingle: Entry, { - async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) { + async fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let maybe_entry = self.get_torrents().await.get(info_hash).cloned(); let entry = if let Some(entry) = maybe_entry { @@ -46,8 +46,16 @@ where entry.clone() }; - entry.insert_or_update_peer_and_get_stats(peer).await + entry.upsert_peer(peer).await; } + + async fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { + match self.get(info_hash).await { + Some(entry) => Some(entry.get_swarm_metadata().await), + None => None, + } + } + async fn get(&self, key: &InfoHash) -> Option { let db = self.get_torrents().await; db.get(key).cloned() @@ -71,7 +79,7 @@ where let mut metrics = TorrentsMetrics::default(); for entry in self.get_torrents().await.values() { - let stats = entry.get_stats().await; + let stats = entry.get_swarm_metadata().await; metrics.complete += u64::from(stats.complete); metrics.downloaded += u64::from(stats.downloaded); metrics.incomplete += u64::from(stats.incomplete); diff --git a/packages/torrent-repository/src/repository/skip_map_mutex_std.rs b/packages/torrent-repository/src/repository/skip_map_mutex_std.rs index 0c0127b15..ef3e7e478 100644 --- a/packages/torrent-repository/src/repository/skip_map_mutex_std.rs +++ b/packages/torrent-repository/src/repository/skip_map_mutex_std.rs @@ -23,9 +23,13 @@ where EntryMutexStd: EntrySync, EntrySingle: Entry, { - fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) { + fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { let entry = self.torrents.get_or_insert(*info_hash, Arc::default()); - entry.value().insert_or_update_peer_and_get_stats(peer) + entry.value().upsert_peer(peer); + } + + fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { + self.torrents.get(info_hash).map(|entry| entry.value().get_swarm_metadata()) } fn get(&self, key: &InfoHash) -> Option { @@ -37,7 +41,7 @@ where let mut metrics = TorrentsMetrics::default(); for entry in &self.torrents { - let stats = entry.value().lock().expect("it should get a lock").get_stats(); + let stats = entry.value().lock().expect("it should get a lock").get_swarm_metadata(); metrics.complete += u64::from(stats.complete); metrics.downloaded += u64::from(stats.downloaded); metrics.incomplete += u64::from(stats.incomplete); diff --git a/packages/torrent-repository/tests/common/repo.rs b/packages/torrent-repository/tests/common/repo.rs index 5a6eddf97..7c245fe04 100644 --- a/packages/torrent-repository/tests/common/repo.rs +++ b/packages/torrent-repository/tests/common/repo.rs @@ -23,6 +23,32 @@ pub(crate) enum Repo { } impl Repo { + pub(crate) async fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { + match self { + Repo::RwLockStd(repo) => repo.upsert_peer(info_hash, peer), + Repo::RwLockStdMutexStd(repo) => repo.upsert_peer(info_hash, peer), + Repo::RwLockStdMutexTokio(repo) => repo.upsert_peer(info_hash, peer).await, + Repo::RwLockTokio(repo) => repo.upsert_peer(info_hash, peer).await, + Repo::RwLockTokioMutexStd(repo) => repo.upsert_peer(info_hash, peer).await, + Repo::RwLockTokioMutexTokio(repo) => repo.upsert_peer(info_hash, peer).await, + Repo::SkipMapMutexStd(repo) => repo.upsert_peer(info_hash, peer), + Repo::DashMapMutexStd(repo) => repo.upsert_peer(info_hash, peer), + } + } + + pub(crate) async fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { + match self { + Repo::RwLockStd(repo) => repo.get_swarm_metadata(info_hash), + Repo::RwLockStdMutexStd(repo) => repo.get_swarm_metadata(info_hash), + Repo::RwLockStdMutexTokio(repo) => repo.get_swarm_metadata(info_hash).await, + Repo::RwLockTokio(repo) => repo.get_swarm_metadata(info_hash).await, + Repo::RwLockTokioMutexStd(repo) => repo.get_swarm_metadata(info_hash).await, + Repo::RwLockTokioMutexTokio(repo) => repo.get_swarm_metadata(info_hash).await, + Repo::SkipMapMutexStd(repo) => repo.get_swarm_metadata(info_hash), + Repo::DashMapMutexStd(repo) => repo.get_swarm_metadata(info_hash), + } + } + pub(crate) async fn get(&self, key: &InfoHash) -> Option { match self { Repo::RwLockStd(repo) => repo.get(key), @@ -145,23 +171,6 @@ impl Repo { } } - pub(crate) async fn update_torrent_with_peer_and_get_stats( - &self, - info_hash: &InfoHash, - peer: &peer::Peer, - ) -> (bool, SwarmMetadata) { - match self { - Repo::RwLockStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer), - Repo::RwLockStdMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer), - Repo::RwLockStdMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, - Repo::RwLockTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, - Repo::RwLockTokioMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, - Repo::RwLockTokioMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await, - Repo::SkipMapMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer), - Repo::DashMapMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer), - } - } - pub(crate) async fn insert(&self, info_hash: &InfoHash, torrent: EntrySingle) -> Option { match self { Repo::RwLockStd(repo) => { diff --git a/packages/torrent-repository/tests/common/torrent.rs b/packages/torrent-repository/tests/common/torrent.rs index 33264c443..c0699479e 100644 --- a/packages/torrent-repository/tests/common/torrent.rs +++ b/packages/torrent-repository/tests/common/torrent.rs @@ -17,9 +17,9 @@ pub(crate) enum Torrent { impl Torrent { pub(crate) async fn get_stats(&self) -> SwarmMetadata { match self { - Torrent::Single(entry) => entry.get_stats(), - Torrent::MutexStd(entry) => entry.get_stats(), - Torrent::MutexTokio(entry) => entry.clone().get_stats().await, + Torrent::Single(entry) => entry.get_swarm_metadata(), + Torrent::MutexStd(entry) => entry.get_swarm_metadata(), + Torrent::MutexTokio(entry) => entry.clone().get_swarm_metadata().await, } } @@ -63,19 +63,11 @@ impl Torrent { } } - pub(crate) async fn insert_or_update_peer(&mut self, peer: &peer::Peer) -> bool { + pub(crate) async fn upsert_peer(&mut self, peer: &peer::Peer) -> bool { match self { - Torrent::Single(entry) => entry.insert_or_update_peer(peer), - Torrent::MutexStd(entry) => entry.insert_or_update_peer(peer), - Torrent::MutexTokio(entry) => entry.clone().insert_or_update_peer(peer).await, - } - } - - pub(crate) async fn insert_or_update_peer_and_get_stats(&mut self, peer: &peer::Peer) -> (bool, SwarmMetadata) { - match self { - Torrent::Single(entry) => entry.insert_or_update_peer_and_get_stats(peer), - Torrent::MutexStd(entry) => entry.insert_or_update_peer_and_get_stats(peer), - Torrent::MutexTokio(entry) => entry.clone().insert_or_update_peer_and_get_stats(peer).await, + Torrent::Single(entry) => entry.upsert_peer(peer), + Torrent::MutexStd(entry) => entry.upsert_peer(peer), + Torrent::MutexTokio(entry) => entry.clone().upsert_peer(peer).await, } } diff --git a/packages/torrent-repository/tests/entry/mod.rs b/packages/torrent-repository/tests/entry/mod.rs index c39bef636..3c564c6f8 100644 --- a/packages/torrent-repository/tests/entry/mod.rs +++ b/packages/torrent-repository/tests/entry/mod.rs @@ -62,34 +62,34 @@ async fn make(torrent: &mut Torrent, makes: &Makes) -> Vec { Makes::Empty => vec![], Makes::Started => { let peer = a_started_peer(1); - torrent.insert_or_update_peer(&peer).await; + torrent.upsert_peer(&peer).await; vec![peer] } Makes::Completed => { let peer = a_completed_peer(2); - torrent.insert_or_update_peer(&peer).await; + torrent.upsert_peer(&peer).await; vec![peer] } Makes::Downloaded => { let mut peer = a_started_peer(3); - torrent.insert_or_update_peer(&peer).await; + torrent.upsert_peer(&peer).await; peer.event = AnnounceEvent::Completed; peer.left = NumberOfBytes(0); - torrent.insert_or_update_peer(&peer).await; + torrent.upsert_peer(&peer).await; vec![peer] } Makes::Three => { let peer_1 = a_started_peer(1); - torrent.insert_or_update_peer(&peer_1).await; + torrent.upsert_peer(&peer_1).await; let peer_2 = a_completed_peer(2); - torrent.insert_or_update_peer(&peer_2).await; + torrent.upsert_peer(&peer_2).await; let mut peer_3 = a_started_peer(3); - torrent.insert_or_update_peer(&peer_3).await; + torrent.upsert_peer(&peer_3).await; peer_3.event = AnnounceEvent::Completed; peer_3.left = NumberOfBytes(0); - torrent.insert_or_update_peer(&peer_3).await; + torrent.upsert_peer(&peer_3).await; vec![peer_1, peer_2, peer_3] } } @@ -182,7 +182,7 @@ async fn it_should_update_a_peer( // Make and insert a new peer. let mut peer = a_started_peer(-1); - torrent.insert_or_update_peer(&peer).await; + torrent.upsert_peer(&peer).await; // Get the Inserted Peer by Id. let peers = torrent.get_peers(None).await; @@ -195,7 +195,7 @@ async fn it_should_update_a_peer( // Announce "Completed" torrent download event. peer.event = AnnounceEvent::Completed; - torrent.insert_or_update_peer(&peer).await; + torrent.upsert_peer(&peer).await; // Get the Updated Peer by Id. let peers = torrent.get_peers(None).await; @@ -224,7 +224,7 @@ async fn it_should_remove_a_peer_upon_stopped_announcement( let mut peer = a_started_peer(-1); - torrent.insert_or_update_peer(&peer).await; + torrent.upsert_peer(&peer).await; // The started peer should be inserted. let peers = torrent.get_peers(None).await; @@ -237,7 +237,7 @@ async fn it_should_remove_a_peer_upon_stopped_announcement( // Change peer to "Stopped" and insert. peer.event = AnnounceEvent::Stopped; - torrent.insert_or_update_peer(&peer).await; + torrent.upsert_peer(&peer).await; // It should be removed now. let peers = torrent.get_peers(None).await; @@ -270,13 +270,12 @@ async fn it_should_handle_a_peer_completed_announcement_and_update_the_downloade // Announce "Completed" torrent download event. peer.event = AnnounceEvent::Completed; - let (updated, stats) = torrent.insert_or_update_peer_and_get_stats(&peer).await; + torrent.upsert_peer(&peer).await; + let stats = torrent.get_stats().await; if is_already_completed { - assert!(!updated); assert_eq!(stats.downloaded, downloaded); } else { - assert!(updated); assert_eq!(stats.downloaded, downloaded + 1); } } @@ -301,7 +300,8 @@ async fn it_should_update_a_peer_as_a_seeder( // Set Bytes Left to Zero peer.left = NumberOfBytes(0); - let (_, stats) = torrent.insert_or_update_peer_and_get_stats(&peer).await; // Add the peer + torrent.upsert_peer(&peer).await; + let stats = torrent.get_stats().await; if is_already_non_left { // it was already complete @@ -332,7 +332,8 @@ async fn it_should_update_a_peer_as_incomplete( // Set Bytes Left to no Zero peer.left = NumberOfBytes(1); - let (_, stats) = torrent.insert_or_update_peer_and_get_stats(&peer).await; // Add the peer + torrent.upsert_peer(&peer).await; + let stats = torrent.get_stats().await; if completed_already { // now it is incomplete @@ -368,7 +369,7 @@ async fn it_should_get_peers_excluding_the_client_socket( // set the address to the socket. peer.peer_addr = socket; - torrent.insert_or_update_peer(&peer).await; // Add peer + torrent.upsert_peer(&peer).await; // Add peer // It should not include the peer that has the same socket. assert!(!torrent.get_peers_for_client(&socket, None).await.contains(&peer.into())); @@ -391,7 +392,7 @@ async fn it_should_limit_the_number_of_peers_returned( for peer_number in 1..=74 + 1 { let mut peer = a_started_peer(1); peer.peer_id = peer::Id::from(peer_number); - torrent.insert_or_update_peer(&peer).await; + torrent.upsert_peer(&peer).await; } let peers = torrent.get_peers(Some(TORRENT_PEERS_LIMIT)).await; @@ -422,7 +423,7 @@ async fn it_should_remove_inactive_peers_beyond_cutoff( peer.updated = now.sub(EXPIRE); - torrent.insert_or_update_peer(&peer).await; + torrent.upsert_peer(&peer).await; assert_eq!(torrent.get_peers_len().await, peers.len() + 1); diff --git a/packages/torrent-repository/tests/repository/mod.rs b/packages/torrent-repository/tests/repository/mod.rs index a6784bf57..fde34467e 100644 --- a/packages/torrent-repository/tests/repository/mod.rs +++ b/packages/torrent-repository/tests/repository/mod.rs @@ -6,6 +6,7 @@ use torrust_tracker_configuration::TrackerPolicy; use torrust_tracker_primitives::announce_event::AnnounceEvent; use torrust_tracker_primitives::info_hash::InfoHash; use torrust_tracker_primitives::pagination::Pagination; +use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::{NumberOfBytes, PersistentTorrents}; use torrust_tracker_torrent_repository::entry::Entry as _; use torrust_tracker_torrent_repository::repository::dash_map_mutex_std::XacrimonDashMap; @@ -72,14 +73,14 @@ fn default() -> Entries { #[fixture] fn started() -> Entries { let mut torrent = EntrySingle::default(); - torrent.insert_or_update_peer(&a_started_peer(1)); + torrent.upsert_peer(&a_started_peer(1)); vec![(InfoHash::default(), torrent)] } #[fixture] fn completed() -> Entries { let mut torrent = EntrySingle::default(); - torrent.insert_or_update_peer(&a_completed_peer(2)); + torrent.upsert_peer(&a_completed_peer(2)); vec![(InfoHash::default(), torrent)] } @@ -87,10 +88,10 @@ fn completed() -> Entries { fn downloaded() -> Entries { let mut torrent = EntrySingle::default(); let mut peer = a_started_peer(3); - torrent.insert_or_update_peer(&peer); + torrent.upsert_peer(&peer); peer.event = AnnounceEvent::Completed; peer.left = NumberOfBytes(0); - torrent.insert_or_update_peer(&peer); + torrent.upsert_peer(&peer); vec![(InfoHash::default(), torrent)] } @@ -98,21 +99,21 @@ fn downloaded() -> Entries { fn three() -> Entries { let mut started = EntrySingle::default(); let started_h = &mut DefaultHasher::default(); - started.insert_or_update_peer(&a_started_peer(1)); + started.upsert_peer(&a_started_peer(1)); started.hash(started_h); let mut completed = EntrySingle::default(); let completed_h = &mut DefaultHasher::default(); - completed.insert_or_update_peer(&a_completed_peer(2)); + completed.upsert_peer(&a_completed_peer(2)); completed.hash(completed_h); let mut downloaded = EntrySingle::default(); let downloaded_h = &mut DefaultHasher::default(); let mut downloaded_peer = a_started_peer(3); - downloaded.insert_or_update_peer(&downloaded_peer); + downloaded.upsert_peer(&downloaded_peer); downloaded_peer.event = AnnounceEvent::Completed; downloaded_peer.left = NumberOfBytes(0); - downloaded.insert_or_update_peer(&downloaded_peer); + downloaded.upsert_peer(&downloaded_peer); downloaded.hash(downloaded_h); vec![ @@ -128,7 +129,7 @@ fn many_out_of_order() -> Entries { for i in 0..408 { let mut entry = EntrySingle::default(); - entry.insert_or_update_peer(&a_started_peer(i)); + entry.upsert_peer(&a_started_peer(i)); entries.insert((InfoHash::from(&i), entry)); } @@ -143,7 +144,7 @@ fn many_hashed_in_order() -> Entries { for i in 0..408 { let mut entry = EntrySingle::default(); - entry.insert_or_update_peer(&a_started_peer(i)); + entry.upsert_peer(&a_started_peer(i)); let hash: &mut DefaultHasher = &mut DefaultHasher::default(); hash.write_i32(i); @@ -390,7 +391,7 @@ async fn it_should_get_metrics( let mut metrics = TorrentsMetrics::default(); for (_, torrent) in entries { - let stats = torrent.get_stats(); + let stats = torrent.get_swarm_metadata(); metrics.torrents += 1; metrics.incomplete += u64::from(stats.incomplete); @@ -537,10 +538,25 @@ async fn it_should_remove_inactive_peers( // Insert the infohash and peer into the repository // and verify there is an extra torrent entry. { - repo.update_torrent_with_peer_and_get_stats(&info_hash, &peer).await; + repo.upsert_peer(&info_hash, &peer).await; assert_eq!(repo.get_metrics().await.torrents, entries.len() as u64 + 1); } + // Insert the infohash and peer into the repository + // and verify the swarm metadata was updated. + { + repo.upsert_peer(&info_hash, &peer).await; + let stats = repo.get_swarm_metadata(&info_hash).await; + assert_eq!( + stats, + Some(SwarmMetadata { + downloaded: 0, + complete: 1, + incomplete: 0 + }) + ); + } + // Verify that this new peer was inserted into the repository. { let entry = repo.get(&info_hash).await.expect("it_should_get_some"); diff --git a/src/core/mod.rs b/src/core/mod.rs index 6628426c1..83813a863 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -626,10 +626,9 @@ impl Tracker { peer.change_ip(&assign_ip_address_to_peer(remote_client_ip, self.external_ip)); debug!("After: {peer:?}"); - // we should update the torrent and get the stats before we get the peer list. - let stats = self.update_torrent_with_peer_and_get_stats(info_hash, peer).await; + let stats = self.upsert_peer_and_get_stats(info_hash, peer).await; - let peers = self.get_torrent_peers_for_peer(info_hash, peer); + let peers = self.get_peers_for(info_hash, peer); AnnounceData { peers, @@ -660,7 +659,7 @@ impl Tracker { /// It returns the data for a `scrape` response. fn get_swarm_metadata(&self, info_hash: &InfoHash) -> SwarmMetadata { match self.torrents.get(info_hash) { - Some(torrent_entry) => torrent_entry.get_stats(), + Some(torrent_entry) => torrent_entry.get_swarm_metadata(), None => SwarmMetadata::default(), } } @@ -681,7 +680,7 @@ impl Tracker { Ok(()) } - fn get_torrent_peers_for_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) -> Vec> { + fn get_peers_for(&self, info_hash: &InfoHash, peer: &peer::Peer) -> Vec> { match self.torrents.get(info_hash) { None => vec![], Some(entry) => entry.get_peers_for_client(&peer.peer_addr, Some(TORRENT_PEERS_LIMIT)), @@ -703,20 +702,36 @@ impl Tracker { /// needed for a `announce` request response. /// /// # Context: Tracker - pub async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> SwarmMetadata { - // code-review: consider splitting the function in two (command and query segregation). - // `update_torrent_with_peer` and `get_stats` + pub async fn upsert_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> SwarmMetadata { + let swarm_metadata_before = match self.torrents.get_swarm_metadata(info_hash) { + Some(swarm_metadata) => swarm_metadata, + None => SwarmMetadata::zeroed(), + }; - let (stats_updated, stats) = self.torrents.update_torrent_with_peer_and_get_stats(info_hash, peer); + self.torrents.upsert_peer(info_hash, peer); - if self.policy.persistent_torrent_completed_stat && stats_updated { - let completed = stats.downloaded; + let swarm_metadata_after = match self.torrents.get_swarm_metadata(info_hash) { + Some(swarm_metadata) => swarm_metadata, + None => SwarmMetadata::zeroed(), + }; + + if swarm_metadata_before != swarm_metadata_after { + self.persist_stats(info_hash, &swarm_metadata_after).await; + } + + swarm_metadata_after + } + + /// It stores the torrents stats into the database (if persistency is enabled). + /// + /// # Context: Tracker + async fn persist_stats(&self, info_hash: &InfoHash, swarm_metadata: &SwarmMetadata) { + if self.policy.persistent_torrent_completed_stat { + let completed = swarm_metadata.downloaded; let info_hash = *info_hash; drop(self.database.save_persistent_torrent(&info_hash, completed).await); } - - stats } /// It calculates and returns the general `Tracker` @@ -1130,7 +1145,7 @@ mod tests { let info_hash = sample_info_hash(); let peer = sample_peer(); - tracker.update_torrent_with_peer_and_get_stats(&info_hash, &peer).await; + tracker.upsert_peer_and_get_stats(&info_hash, &peer).await; let peers = tracker.get_torrent_peers(&info_hash); @@ -1144,9 +1159,9 @@ mod tests { let info_hash = sample_info_hash(); let peer = sample_peer(); - tracker.update_torrent_with_peer_and_get_stats(&info_hash, &peer).await; + tracker.upsert_peer_and_get_stats(&info_hash, &peer).await; - let peers = tracker.get_torrent_peers_for_peer(&info_hash, &peer); + let peers = tracker.get_peers_for(&info_hash, &peer); assert_eq!(peers, vec![]); } @@ -1155,9 +1170,7 @@ mod tests { async fn it_should_return_the_torrent_metrics() { let tracker = public_tracker(); - tracker - .update_torrent_with_peer_and_get_stats(&sample_info_hash(), &leecher()) - .await; + tracker.upsert_peer_and_get_stats(&sample_info_hash(), &leecher()).await; let torrent_metrics = tracker.get_torrents_metrics(); @@ -1178,9 +1191,7 @@ mod tests { let start_time = std::time::Instant::now(); for i in 0..1_000_000 { - tracker - .update_torrent_with_peer_and_get_stats(&gen_seeded_infohash(&i), &leecher()) - .await; + tracker.upsert_peer_and_get_stats(&gen_seeded_infohash(&i), &leecher()).await; } let result_a = start_time.elapsed(); @@ -1704,11 +1715,11 @@ mod tests { let mut peer = sample_peer(); peer.event = AnnounceEvent::Started; - let swarm_stats = tracker.update_torrent_with_peer_and_get_stats(&info_hash, &peer).await; + let swarm_stats = tracker.upsert_peer_and_get_stats(&info_hash, &peer).await; assert_eq!(swarm_stats.downloaded, 0); peer.event = AnnounceEvent::Completed; - let swarm_stats = tracker.update_torrent_with_peer_and_get_stats(&info_hash, &peer).await; + let swarm_stats = tracker.upsert_peer_and_get_stats(&info_hash, &peer).await; assert_eq!(swarm_stats.downloaded, 1); // Remove the newly updated torrent from memory @@ -1719,7 +1730,7 @@ mod tests { let torrent_entry = tracker.torrents.get(&info_hash).expect("it should be able to get entry"); // It persists the number of completed peers. - assert_eq!(torrent_entry.get_stats().downloaded, 1); + assert_eq!(torrent_entry.get_swarm_metadata().downloaded, 1); // It does not persist the peers assert!(torrent_entry.peers_is_empty()); diff --git a/src/core/services/torrent.rs b/src/core/services/torrent.rs index ce44af3a8..9cba5de25 100644 --- a/src/core/services/torrent.rs +++ b/src/core/services/torrent.rs @@ -50,7 +50,7 @@ pub async fn get_torrent_info(tracker: Arc, info_hash: &InfoHash) -> Op let torrent_entry = torrent_entry_option?; - let stats = torrent_entry.get_stats(); + let stats = torrent_entry.get_swarm_metadata(); let peers = torrent_entry.get_peers(None); @@ -70,7 +70,7 @@ pub async fn get_torrents_page(tracker: Arc, pagination: Option<&Pagina let mut basic_infos: Vec = vec![]; for (info_hash, torrent_entry) in tracker.torrents.get_paginated(pagination) { - let stats = torrent_entry.get_stats(); + let stats = torrent_entry.get_swarm_metadata(); basic_infos.push(BasicInfo { info_hash, @@ -88,7 +88,7 @@ pub async fn get_torrents(tracker: Arc, info_hashes: &[InfoHash]) -> Ve let mut basic_infos: Vec = vec![]; for info_hash in info_hashes { - if let Some(stats) = tracker.torrents.get(info_hash).map(|t| t.get_stats()) { + if let Some(stats) = tracker.torrents.get(info_hash).map(|t| t.get_swarm_metadata()) { basic_infos.push(BasicInfo { info_hash: *info_hash, seeders: u64::from(stats.complete), @@ -156,9 +156,7 @@ mod tests { let hash = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); let info_hash = InfoHash::from_str(&hash).unwrap(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash, &sample_peer()) - .await; + tracker.upsert_peer_and_get_stats(&info_hash, &sample_peer()).await; let torrent_info = get_torrent_info(tracker.clone(), &info_hash).await.unwrap(); @@ -208,9 +206,7 @@ mod tests { let hash = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); let info_hash = InfoHash::from_str(&hash).unwrap(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash, &sample_peer()) - .await; + tracker.upsert_peer_and_get_stats(&info_hash, &sample_peer()).await; let torrents = get_torrents_page(tracker.clone(), Some(&Pagination::default())).await; @@ -234,12 +230,8 @@ mod tests { let hash2 = "03840548643af2a7b63a9f5cbca348bc7150ca3a".to_owned(); let info_hash2 = InfoHash::from_str(&hash2).unwrap(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash1, &sample_peer()) - .await; - tracker - .update_torrent_with_peer_and_get_stats(&info_hash2, &sample_peer()) - .await; + tracker.upsert_peer_and_get_stats(&info_hash1, &sample_peer()).await; + tracker.upsert_peer_and_get_stats(&info_hash2, &sample_peer()).await; let offset = 0; let limit = 1; @@ -258,12 +250,8 @@ mod tests { let hash2 = "03840548643af2a7b63a9f5cbca348bc7150ca3a".to_owned(); let info_hash2 = InfoHash::from_str(&hash2).unwrap(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash1, &sample_peer()) - .await; - tracker - .update_torrent_with_peer_and_get_stats(&info_hash2, &sample_peer()) - .await; + tracker.upsert_peer_and_get_stats(&info_hash1, &sample_peer()).await; + tracker.upsert_peer_and_get_stats(&info_hash2, &sample_peer()).await; let offset = 1; let limit = 4000; @@ -288,15 +276,11 @@ mod tests { let hash1 = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); let info_hash1 = InfoHash::from_str(&hash1).unwrap(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash1, &sample_peer()) - .await; + tracker.upsert_peer_and_get_stats(&info_hash1, &sample_peer()).await; let hash2 = "03840548643af2a7b63a9f5cbca348bc7150ca3a".to_owned(); let info_hash2 = InfoHash::from_str(&hash2).unwrap(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash2, &sample_peer()) - .await; + tracker.upsert_peer_and_get_stats(&info_hash2, &sample_peer()).await; let torrents = get_torrents_page(tracker.clone(), Some(&Pagination::default())).await; diff --git a/src/servers/udp/handlers.rs b/src/servers/udp/handlers.rs index 2d5038ec3..122e666a8 100644 --- a/src/servers/udp/handlers.rs +++ b/src/servers/udp/handlers.rs @@ -718,9 +718,7 @@ mod tests { .with_peer_address(SocketAddr::new(IpAddr::V6(client_ip_v6), client_port)) .into(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash.0.into(), &peer_using_ipv6) - .await; + tracker.upsert_peer_and_get_stats(&info_hash.0.into(), &peer_using_ipv6).await; } async fn announce_a_new_peer_using_ipv4(tracker: Arc) -> Response { @@ -944,9 +942,7 @@ mod tests { .with_peer_address(SocketAddr::new(IpAddr::V4(client_ip_v4), client_port)) .into(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash.0.into(), &peer_using_ipv4) - .await; + tracker.upsert_peer_and_get_stats(&info_hash.0.into(), &peer_using_ipv4).await; } async fn announce_a_new_peer_using_ipv6(tracker: Arc) -> Response { @@ -1119,9 +1115,7 @@ mod tests { .with_number_of_bytes_left(0) .into(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash.0.into(), &peer) - .await; + tracker.upsert_peer_and_get_stats(&info_hash.0.into(), &peer).await; } fn build_scrape_request(remote_addr: &SocketAddr, info_hash: &InfoHash) -> ScrapeRequest { diff --git a/tests/servers/api/environment.rs b/tests/servers/api/environment.rs index 8d91f3ae8..dec4ccff2 100644 --- a/tests/servers/api/environment.rs +++ b/tests/servers/api/environment.rs @@ -23,7 +23,7 @@ pub struct Environment { impl Environment { /// Add a torrent to the tracker pub async fn add_torrent_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { - self.tracker.update_torrent_with_peer_and_get_stats(info_hash, peer).await; + self.tracker.upsert_peer_and_get_stats(info_hash, peer).await; } } diff --git a/tests/servers/http/environment.rs b/tests/servers/http/environment.rs index 5638713aa..f00da293e 100644 --- a/tests/servers/http/environment.rs +++ b/tests/servers/http/environment.rs @@ -20,7 +20,7 @@ pub struct Environment { impl Environment { /// Add a torrent to the tracker pub async fn add_torrent_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { - self.tracker.update_torrent_with_peer_and_get_stats(info_hash, peer).await; + self.tracker.upsert_peer_and_get_stats(info_hash, peer).await; } } diff --git a/tests/servers/udp/environment.rs b/tests/servers/udp/environment.rs index 12f4aeb9e..6ced1dbb7 100644 --- a/tests/servers/udp/environment.rs +++ b/tests/servers/udp/environment.rs @@ -20,7 +20,7 @@ impl Environment { /// Add a torrent to the tracker #[allow(dead_code)] pub async fn add_torrent(&self, info_hash: &InfoHash, peer: &peer::Peer) { - self.tracker.update_torrent_with_peer_and_get_stats(info_hash, peer).await; + self.tracker.upsert_peer_and_get_stats(info_hash, peer).await; } }