diff --git a/src/main.rs b/src/main.rs index 01121052a..47896ff43 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use log::info; +use torrust_tracker::tracker::statistics::StatsTracker; use torrust_tracker::tracker::tracker::TorrentTracker; use torrust_tracker::{logging, setup, static_time, Configuration}; @@ -19,8 +20,11 @@ async fn main() { } }; + // Initialize stats tracker + let stats_tracker = StatsTracker::new_running_instance(); + // Initialize Torrust tracker - let tracker = match TorrentTracker::new(config.clone()) { + let tracker = match TorrentTracker::new(config.clone(), Box::new(stats_tracker)) { Ok(tracker) => Arc::new(tracker), Err(error) => { panic!("{}", error) diff --git a/src/tracker/statistics.rs b/src/tracker/statistics.rs index 85a2dbae9..cf801e1df 100644 --- a/src/tracker/statistics.rs +++ b/src/tracker/statistics.rs @@ -1,12 +1,12 @@ +use async_trait::async_trait; use std::sync::Arc; - use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::Sender; use tokio::sync::{mpsc, RwLock, RwLockReadGuard}; const CHANNEL_BUFFER_SIZE: usize = 65_535; -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum TrackerStatisticsEvent { Tcp4Announce, Tcp4Scrape, @@ -61,6 +61,12 @@ pub struct StatsTracker { } impl StatsTracker { + pub fn new_running_instance() -> Self { + let mut stats_tracker = Self::new(); + stats_tracker.run_worker(); + stats_tracker + } + pub fn new() -> Self { Self { channel_sender: None, @@ -68,18 +74,6 @@ impl StatsTracker { } } - pub async fn get_stats(&self) -> RwLockReadGuard<'_, TrackerStatistics> { - self.stats.read().await - } - - pub async fn send_event(&self, event: TrackerStatisticsEvent) -> Option>> { - if let Some(tx) = &self.channel_sender { - Some(tx.send(event).await) - } else { - None - } - } - pub fn run_worker(&mut self) { let (tx, mut rx) = mpsc::channel::(CHANNEL_BUFFER_SIZE); @@ -134,3 +128,35 @@ impl StatsTracker { }); } } + +#[async_trait] +pub trait TrackerStatisticsEventSender: Sync + Send { + async fn send_event(&self, event: TrackerStatisticsEvent) -> Option>>; +} + +#[async_trait] +impl TrackerStatisticsEventSender for StatsTracker { + async fn send_event(&self, event: TrackerStatisticsEvent) -> Option>> { + if let Some(tx) = &self.channel_sender { + Some(tx.send(event).await) + } else { + None + } + } +} + +#[async_trait] +pub trait TrackerStatisticsRepository: Sync + Send { + async fn get_stats(&self) -> RwLockReadGuard<'_, TrackerStatistics>; +} + +#[async_trait] +impl TrackerStatisticsRepository for StatsTracker { + async fn get_stats(&self) -> RwLockReadGuard<'_, TrackerStatistics> { + self.stats.read().await + } +} + +pub trait TrackerStatsService: TrackerStatisticsEventSender + TrackerStatisticsRepository {} + +impl TrackerStatsService for StatsTracker {} diff --git a/src/tracker/torrent.rs b/src/tracker/torrent.rs index 7404f63af..f12f0a622 100644 --- a/src/tracker/torrent.rs +++ b/src/tracker/torrent.rs @@ -8,7 +8,7 @@ use crate::peer::TorrentPeer; use crate::protocol::clock::{DefaultClock, TimeNow}; use crate::{PeerId, MAX_SCRAPE_TORRENTS}; -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct TorrentEntry { #[serde(skip)] pub peers: std::collections::BTreeMap, diff --git a/src/tracker/tracker.rs b/src/tracker/tracker.rs index 9a242e41a..5499eebeb 100644 --- a/src/tracker/tracker.rs +++ b/src/tracker/tracker.rs @@ -12,7 +12,7 @@ use crate::databases::database::Database; use crate::mode::TrackerMode; use crate::peer::TorrentPeer; use crate::protocol::common::InfoHash; -use crate::statistics::{StatsTracker, TrackerStatistics, TrackerStatisticsEvent}; +use crate::statistics::{TrackerStatistics, TrackerStatisticsEvent, TrackerStatsService}; use crate::tracker::key; use crate::tracker::key::AuthKey; use crate::tracker::torrent::{TorrentEntry, TorrentError, TorrentStats}; @@ -24,19 +24,13 @@ pub struct TorrentTracker { keys: RwLock>, whitelist: RwLock>, torrents: RwLock>, - stats_tracker: StatsTracker, + stats_tracker: Box, database: Box, } impl TorrentTracker { - pub fn new(config: Arc) -> Result { + pub fn new(config: Arc, stats_tracker: Box) -> Result { let database = database::connect_database(&config.db_driver, &config.db_path)?; - let mut stats_tracker = StatsTracker::new(); - - // starts a thread for updating tracker stats - if config.tracker_usage_statistics { - stats_tracker.run_worker(); - } Ok(TorrentTracker { config: config.clone(), @@ -96,11 +90,20 @@ impl TorrentTracker { // Adding torrents is not relevant to public trackers. pub async fn add_torrent_to_whitelist(&self, info_hash: &InfoHash) -> Result<(), database::Error> { - self.database.add_info_hash_to_whitelist(info_hash.clone()).await?; - self.whitelist.write().await.insert(info_hash.clone()); + self.add_torrent_to_database_whitelist(info_hash).await?; + self.add_torrent_to_memory_whitelist(info_hash).await; Ok(()) } + async fn add_torrent_to_database_whitelist(&self, info_hash: &InfoHash) -> Result<(), database::Error> { + self.database.add_info_hash_to_whitelist(*info_hash).await?; + Ok(()) + } + + pub async fn add_torrent_to_memory_whitelist(&self, info_hash: &InfoHash) -> bool { + self.whitelist.write().await.insert(*info_hash) + } + // Removing torrents is not relevant to public trackers. pub async fn remove_torrent_from_whitelist(&self, info_hash: &InfoHash) -> Result<(), database::Error> { self.database.remove_info_hash_from_whitelist(info_hash.clone()).await?; @@ -177,6 +180,7 @@ impl TorrentTracker { Ok(()) } + /// Get all torrent peers for a given torrent filtering out the peer with the client address pub async fn get_torrent_peers(&self, info_hash: &InfoHash, client_addr: &SocketAddr) -> Vec { let read_lock = self.torrents.read().await; @@ -186,6 +190,16 @@ impl TorrentTracker { } } + /// Get all torrent peers for a given torrent + pub async fn get_all_torrent_peers(&self, info_hash: &InfoHash) -> Vec { + let read_lock = self.torrents.read().await; + + match read_lock.get(info_hash) { + None => vec![], + Some(entry) => entry.get_peers(None).into_iter().cloned().collect(), + } + } + pub async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &TorrentPeer) -> TorrentStats { let mut torrents = self.torrents.write().await; diff --git a/src/udp/handlers.rs b/src/udp/handlers.rs index 907dac0bc..3c4074eae 100644 --- a/src/udp/handlers.rs +++ b/src/udp/handlers.rs @@ -236,3 +236,1051 @@ fn handle_error(e: ServerError, transaction_id: TransactionId) -> Response { message: message.into(), }) } + +#[cfg(test)] +mod tests { + use std::{ + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, + sync::Arc, + }; + + use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes}; + use async_trait::async_trait; + use tokio::sync::{mpsc::error::SendError, RwLock, RwLockReadGuard}; + + use crate::{ + mode::TrackerMode, + peer::TorrentPeer, + protocol::clock::{DefaultClock, Time}, + statistics::{ + StatsTracker, TrackerStatistics, TrackerStatisticsEvent, TrackerStatisticsEventSender, TrackerStatisticsRepository, + TrackerStatsService, + }, + tracker::tracker::TorrentTracker, + Configuration, PeerId, + }; + + fn default_tracker_config() -> Arc { + Arc::new(Configuration::default()) + } + + fn initialized_public_tracker() -> Arc { + let configuration = Arc::new(TrackerConfigurationBuilder::default().with_mode(TrackerMode::Public).into()); + Arc::new(TorrentTracker::new(configuration, Box::new(StatsTracker::new_running_instance())).unwrap()) + } + + fn initialized_private_tracker() -> Arc { + let configuration = Arc::new(TrackerConfigurationBuilder::default().with_mode(TrackerMode::Private).into()); + Arc::new(TorrentTracker::new(configuration, Box::new(StatsTracker::new_running_instance())).unwrap()) + } + + fn initialized_whitelisted_tracker() -> Arc { + let configuration = Arc::new(TrackerConfigurationBuilder::default().with_mode(TrackerMode::Listed).into()); + Arc::new(TorrentTracker::new(configuration, Box::new(StatsTracker::new_running_instance())).unwrap()) + } + + fn sample_ipv4_remote_addr() -> SocketAddr { + sample_ipv4_socket_address() + } + + fn sample_ipv6_remote_addr() -> SocketAddr { + sample_ipv6_socket_address() + } + + fn sample_ipv4_socket_address() -> SocketAddr { + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080) + } + + fn sample_ipv6_socket_address() -> SocketAddr { + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), 8080) + } + + struct TorrentPeerBuilder { + peer: TorrentPeer, + } + + impl TorrentPeerBuilder { + pub fn default() -> TorrentPeerBuilder { + let default_peer = TorrentPeer { + peer_id: PeerId([255u8; 20]), + peer_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), 8080), + updated: DefaultClock::now(), + uploaded: NumberOfBytes(0), + downloaded: NumberOfBytes(0), + left: NumberOfBytes(0), + event: AnnounceEvent::Started, + }; + TorrentPeerBuilder { peer: default_peer } + } + + pub fn with_peer_id(mut self, peer_id: PeerId) -> Self { + self.peer.peer_id = peer_id; + self + } + + pub fn with_peer_addr(mut self, peer_addr: SocketAddr) -> Self { + self.peer.peer_addr = peer_addr; + self + } + + pub fn with_bytes_left(mut self, left: i64) -> Self { + self.peer.left = NumberOfBytes(left); + self + } + + pub fn into(self) -> TorrentPeer { + self.peer + } + } + + struct TrackerStatsServiceMock { + stats: Arc>, + expected_event: Option, + } + + impl TrackerStatsServiceMock { + fn new() -> Self { + Self { + stats: Arc::new(RwLock::new(TrackerStatistics::new())), + expected_event: None, + } + } + + fn should_throw_event(&mut self, expected_event: TrackerStatisticsEvent) { + self.expected_event = Some(expected_event); + } + } + + #[async_trait] + impl TrackerStatisticsEventSender for TrackerStatsServiceMock { + async fn send_event(&self, _event: TrackerStatisticsEvent) -> Option>> { + if self.expected_event.is_some() { + assert_eq!(_event, *self.expected_event.as_ref().unwrap()); + } + None + } + } + + #[async_trait] + impl TrackerStatisticsRepository for TrackerStatsServiceMock { + async fn get_stats(&self) -> RwLockReadGuard<'_, TrackerStatistics> { + self.stats.read().await + } + } + + impl TrackerStatsService for TrackerStatsServiceMock {} + + struct TrackerConfigurationBuilder { + configuration: Configuration, + } + + impl TrackerConfigurationBuilder { + pub fn default() -> TrackerConfigurationBuilder { + let default_configuration = Configuration::default(); + TrackerConfigurationBuilder { + configuration: default_configuration, + } + } + + pub fn with_external_ip(mut self, external_ip: &str) -> Self { + self.configuration.external_ip = Some(external_ip.to_owned()); + self + } + + pub fn with_mode(mut self, mode: TrackerMode) -> Self { + self.configuration.mode = mode; + self + } + + pub fn into(self) -> Configuration { + self.configuration + } + } + + mod connect_request { + + use std::sync::Arc; + + use crate::{ + protocol::utils::get_connection_id, + statistics::TrackerStatisticsEvent, + tracker::tracker::TorrentTracker, + udp::{ + handle_connect, + handlers::tests::{initialized_public_tracker, sample_ipv4_remote_addr}, + }, + }; + use aquatic_udp_protocol::{ConnectRequest, ConnectResponse, Response, TransactionId}; + + use super::{default_tracker_config, sample_ipv4_socket_address, sample_ipv6_remote_addr, TrackerStatsServiceMock}; + + fn sample_connect_request() -> ConnectRequest { + ConnectRequest { + transaction_id: TransactionId(0i32), + } + } + + #[tokio::test] + async fn a_connect_response_should_contain_the_same_transaction_id_as_the_connect_request() { + let request = ConnectRequest { + transaction_id: TransactionId(0i32), + }; + + let response = handle_connect(sample_ipv4_remote_addr(), &request, initialized_public_tracker()) + .await + .unwrap(); + + assert_eq!( + response, + Response::Connect(ConnectResponse { + connection_id: get_connection_id(&sample_ipv4_remote_addr()), + transaction_id: request.transaction_id + }) + ); + } + + #[tokio::test] + async fn a_connect_response_should_contain_a_new_connection_id() { + let request = ConnectRequest { + transaction_id: TransactionId(0i32), + }; + + let response = handle_connect(sample_ipv4_remote_addr(), &request, initialized_public_tracker()) + .await + .unwrap(); + + assert_eq!( + response, + Response::Connect(ConnectResponse { + connection_id: get_connection_id(&sample_ipv4_remote_addr()), + transaction_id: request.transaction_id + }) + ); + } + + #[tokio::test] + async fn it_should_send_the_upd4_connect_event_when_a_client_tries_to_connect_using_a_ip4_socket_address() { + let mut tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); + + let client_socket_address = sample_ipv4_socket_address(); + tracker_stats_service.should_throw_event(TrackerStatisticsEvent::Udp4Connect); + + let torrent_tracker = Arc::new(TorrentTracker::new(default_tracker_config(), tracker_stats_service).unwrap()); + handle_connect(client_socket_address, &sample_connect_request(), torrent_tracker) + .await + .unwrap(); + } + + #[tokio::test] + async fn it_should_send_the_upd6_connect_event_when_a_client_tries_to_connect_using_a_ip6_socket_address() { + let mut tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); + + tracker_stats_service.should_throw_event(TrackerStatisticsEvent::Udp6Connect); + + let torrent_tracker = Arc::new(TorrentTracker::new(default_tracker_config(), tracker_stats_service).unwrap()); + handle_connect(sample_ipv6_remote_addr(), &sample_connect_request(), torrent_tracker) + .await + .unwrap(); + } + } + + mod announce_request { + + use std::net::Ipv4Addr; + + use aquatic_udp_protocol::{ + AnnounceEvent, AnnounceRequest, NumberOfBytes, NumberOfPeers, PeerId as AquaticPeerId, PeerKey, Port, TransactionId, + }; + + use crate::{protocol::utils::get_connection_id, udp::handlers::tests::sample_ipv4_remote_addr}; + + struct AnnounceRequestBuilder { + request: AnnounceRequest, + } + + impl AnnounceRequestBuilder { + pub fn default() -> AnnounceRequestBuilder { + let client_ip = Ipv4Addr::new(126, 0, 0, 1); + let client_port = 8080; + let info_hash_aquatic = aquatic_udp_protocol::InfoHash([0u8; 20]); + + let default_request = AnnounceRequest { + connection_id: get_connection_id(&sample_ipv4_remote_addr()), + transaction_id: TransactionId(0i32), + info_hash: info_hash_aquatic, + peer_id: AquaticPeerId([255u8; 20]), + bytes_downloaded: NumberOfBytes(0i64), + bytes_uploaded: NumberOfBytes(0i64), + bytes_left: NumberOfBytes(0i64), + event: AnnounceEvent::Started, + ip_address: Some(client_ip), + key: PeerKey(0u32), + peers_wanted: NumberOfPeers(1i32), + port: Port(client_port), + }; + AnnounceRequestBuilder { + request: default_request, + } + } + + pub fn with_info_hash(mut self, info_hash: aquatic_udp_protocol::InfoHash) -> Self { + self.request.info_hash = info_hash; + self + } + + pub fn with_peer_id(mut self, peer_id: AquaticPeerId) -> Self { + self.request.peer_id = peer_id; + self + } + + pub fn with_ip_address(mut self, ip_address: Ipv4Addr) -> Self { + self.request.ip_address = Some(ip_address); + self + } + + pub fn with_port(mut self, port: u16) -> Self { + self.request.port = Port(port); + self + } + + pub fn into(self) -> AnnounceRequest { + self.request + } + } + + mod using_ipv4 { + + use std::{ + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, + sync::Arc, + }; + + use aquatic_udp_protocol::{ + AnnounceInterval, AnnounceResponse, InfoHash as AquaticInfoHash, NumberOfPeers, PeerId as AquaticPeerId, + Response, ResponsePeer, + }; + + use crate::{ + statistics::TrackerStatisticsEvent, + tracker::tracker::TorrentTracker, + udp::{ + handle_announce, + handlers::tests::{ + announce_request::AnnounceRequestBuilder, default_tracker_config, initialized_public_tracker, + sample_ipv4_socket_address, TorrentPeerBuilder, TrackerStatsServiceMock, + }, + }, + PeerId, + }; + + #[tokio::test] + async fn an_announced_peer_should_be_added_to_the_tracker() { + let tracker = initialized_public_tracker(); + + let client_ip = Ipv4Addr::new(126, 0, 0, 1); + let client_port = 8080; + let info_hash = AquaticInfoHash([0u8; 20]); + let peer_id = AquaticPeerId([255u8; 20]); + + let request = AnnounceRequestBuilder::default() + .with_info_hash(info_hash) + .with_peer_id(peer_id) + .with_ip_address(client_ip) + .with_port(client_port) + .into(); + + let remote_addr = SocketAddr::new(IpAddr::V4(client_ip), client_port); + handle_announce(remote_addr, &request, tracker.clone()).await.unwrap(); + + let peers = tracker.get_all_torrent_peers(&info_hash.0.into()).await; + + let expected_peer = TorrentPeerBuilder::default() + .with_peer_id(PeerId(peer_id.0)) + .with_peer_addr(SocketAddr::new(IpAddr::V4(client_ip), client_port)) + .into(); + + assert_eq!(peers[0], expected_peer); + } + + #[tokio::test] + async fn the_announced_peer_should_not_be_included_in_the_response() { + let request = AnnounceRequestBuilder::default().into(); + let remote_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), 8080); + + let response = handle_announce(remote_addr, &request, initialized_public_tracker()) + .await + .unwrap(); + + let empty_peer_vector: Vec> = vec![]; + assert_eq!( + response, + Response::from(AnnounceResponse { + transaction_id: request.transaction_id, + announce_interval: AnnounceInterval(120i32), + leechers: NumberOfPeers(0i32), + seeders: NumberOfPeers(1i32), + peers: empty_peer_vector + }) + ); + } + + #[tokio::test] + async fn the_tracker_should_always_use_the_remote_client_ip_but_not_the_port_in_the_udp_request_header_instead_of_the_peer_address_in_the_announce_request( + ) { + // From the BEP 15 (https://www.bittorrent.org/beps/bep_0015.html): + // "Do note that most trackers will only honor the IP address field under limited circumstances." + + let tracker = initialized_public_tracker(); + + let info_hash = AquaticInfoHash([0u8; 20]); + let peer_id = AquaticPeerId([255u8; 20]); + let client_port = 8080; + + let remote_client_ip = Ipv4Addr::new(126, 0, 0, 1); + let remote_client_port = 8081; + let peer_address = Ipv4Addr::new(126, 0, 0, 2); + + let request = AnnounceRequestBuilder::default() + .with_info_hash(info_hash) + .with_peer_id(peer_id) + .with_ip_address(peer_address) + .with_port(client_port) + .into(); + + let remote_addr = SocketAddr::new(IpAddr::V4(remote_client_ip), remote_client_port); + handle_announce(remote_addr, &request, tracker.clone()).await.unwrap(); + + let peers = tracker.get_all_torrent_peers(&info_hash.0.into()).await; + + assert_eq!(peers[0].peer_addr, SocketAddr::new(IpAddr::V4(remote_client_ip), client_port)); + } + + async fn add_a_torrent_peer_using_ipv6(tracker: Arc) { + let info_hash = AquaticInfoHash([0u8; 20]); + + let client_ip_v4 = Ipv4Addr::new(126, 0, 0, 1); + let client_ip_v6 = client_ip_v4.to_ipv6_compatible(); + let client_port = 8080; + let peer_id = AquaticPeerId([255u8; 20]); + + let peer_using_ipv6 = TorrentPeerBuilder::default() + .with_peer_id(PeerId(peer_id.0)) + .with_peer_addr(SocketAddr::new(IpAddr::V6(client_ip_v6), client_port)) + .into(); + + tracker + .update_torrent_with_peer_and_get_stats(&info_hash.0.into(), &peer_using_ipv6) + .await; + } + + async fn announce_a_new_peer_using_ipv4(tracker: Arc) -> Response { + let request = AnnounceRequestBuilder::default().into(); + let remote_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(126, 0, 0, 1)), 8080); + let response = handle_announce(remote_addr, &request, tracker.clone()).await.unwrap(); + response + } + + #[tokio::test] + async fn when_the_announce_request_comes_from_a_client_using_ipv4_the_response_should_not_include_peers_using_ipv6() { + let tracker = initialized_public_tracker(); + + add_a_torrent_peer_using_ipv6(tracker.clone()).await; + + let response = announce_a_new_peer_using_ipv4(tracker.clone()).await; + + // The response should not contain the peer using IPV6 + let peers: Option>> = match response { + Response::AnnounceIpv6(announce_response) => Some(announce_response.peers), + _ => None, + }; + let no_ipv6_peers = peers.is_none(); + assert!(no_ipv6_peers); + } + + #[tokio::test] + async fn should_send_the_upd4_announce_event() { + let mut tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); + + tracker_stats_service.should_throw_event(TrackerStatisticsEvent::Udp4Announce); + + let tracker = Arc::new(TorrentTracker::new(default_tracker_config(), tracker_stats_service).unwrap()); + handle_announce( + sample_ipv4_socket_address(), + &AnnounceRequestBuilder::default().into(), + tracker.clone(), + ) + .await + .unwrap(); + } + + mod from_a_loopback_ip { + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + + use aquatic_udp_protocol::{InfoHash as AquaticInfoHash, PeerId as AquaticPeerId}; + + use crate::{ + udp::{ + handle_announce, + handlers::tests::{ + announce_request::AnnounceRequestBuilder, initialized_public_tracker, TorrentPeerBuilder, + }, + }, + PeerId, + }; + + #[tokio::test] + async fn the_peer_ip_should_be_changed_to_the_external_ip_in_the_tracker_configuration_if_defined() { + let tracker = initialized_public_tracker(); + + let client_ip = Ipv4Addr::new(127, 0, 0, 1); + let client_port = 8080; + let info_hash = AquaticInfoHash([0u8; 20]); + let peer_id = AquaticPeerId([255u8; 20]); + + let request = AnnounceRequestBuilder::default() + .with_info_hash(info_hash) + .with_peer_id(peer_id) + .with_ip_address(client_ip) + .with_port(client_port) + .into(); + + let remote_addr = SocketAddr::new(IpAddr::V4(client_ip), client_port); + handle_announce(remote_addr, &request, tracker.clone()).await.unwrap(); + + let peers = tracker.get_all_torrent_peers(&info_hash.0.into()).await; + + let external_ip_in_tracker_configuration = + tracker.config.external_ip.clone().unwrap().parse::().unwrap(); + + let expected_peer = TorrentPeerBuilder::default() + .with_peer_id(PeerId(peer_id.0)) + .with_peer_addr(SocketAddr::new(IpAddr::V4(external_ip_in_tracker_configuration), client_port)) + .into(); + + assert_eq!(peers[0], expected_peer); + } + } + } + + mod using_ipv6 { + + use std::{ + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, + sync::Arc, + }; + + use aquatic_udp_protocol::{ + AnnounceInterval, AnnounceResponse, InfoHash as AquaticInfoHash, NumberOfPeers, PeerId as AquaticPeerId, + Response, ResponsePeer, + }; + + use crate::{ + statistics::TrackerStatisticsEvent, + tracker::tracker::TorrentTracker, + udp::{ + handle_announce, + handlers::tests::{ + announce_request::AnnounceRequestBuilder, default_tracker_config, initialized_public_tracker, + sample_ipv6_remote_addr, TorrentPeerBuilder, TrackerStatsServiceMock, + }, + }, + PeerId, + }; + + #[tokio::test] + async fn an_announced_peer_should_be_added_to_the_tracker() { + let tracker = initialized_public_tracker(); + + let client_ip_v4 = Ipv4Addr::new(126, 0, 0, 1); + let client_ip_v6 = client_ip_v4.to_ipv6_compatible(); + let client_port = 8080; + let info_hash = AquaticInfoHash([0u8; 20]); + let peer_id = AquaticPeerId([255u8; 20]); + + let request = AnnounceRequestBuilder::default() + .with_info_hash(info_hash) + .with_peer_id(peer_id) + .with_ip_address(client_ip_v4) + .with_port(client_port) + .into(); + + let remote_addr = SocketAddr::new(IpAddr::V6(client_ip_v6), client_port); + handle_announce(remote_addr, &request, tracker.clone()).await.unwrap(); + + let peers = tracker.get_all_torrent_peers(&info_hash.0.into()).await; + + let expected_peer = TorrentPeerBuilder::default() + .with_peer_id(PeerId(peer_id.0)) + .with_peer_addr(SocketAddr::new(IpAddr::V6(client_ip_v6), client_port)) + .into(); + + assert_eq!(peers[0], expected_peer); + } + + #[tokio::test] + async fn the_announced_peer_should_not_be_included_in_the_response() { + let request = AnnounceRequestBuilder::default().into(); + let client_ip_v4 = Ipv4Addr::new(126, 0, 0, 1); + let client_ip_v6 = client_ip_v4.to_ipv6_compatible(); + let remote_addr = SocketAddr::new(IpAddr::V6(client_ip_v6), 8080); + + let response = handle_announce(remote_addr, &request, initialized_public_tracker()) + .await + .unwrap(); + + let empty_peer_vector: Vec> = vec![]; + assert_eq!( + response, + Response::from(AnnounceResponse { + transaction_id: request.transaction_id, + announce_interval: AnnounceInterval(120i32), + leechers: NumberOfPeers(0i32), + seeders: NumberOfPeers(1i32), + peers: empty_peer_vector + }) + ); + } + + #[tokio::test] + async fn the_tracker_should_always_use_the_remote_client_ip_but_not_the_port_in_the_udp_request_header_instead_of_the_peer_address_in_the_announce_request( + ) { + // From the BEP 15 (https://www.bittorrent.org/beps/bep_0015.html): + // "Do note that most trackers will only honor the IP address field under limited circumstances." + + let tracker = initialized_public_tracker(); + + let info_hash = AquaticInfoHash([0u8; 20]); + let peer_id = AquaticPeerId([255u8; 20]); + let client_port = 8080; + + let remote_client_ip = "::100".parse().unwrap(); // IPV4 ::0.0.1.0 -> IPV6 = ::100 = ::ffff:0:100 = 0:0:0:0:0:ffff:0:0100 + let remote_client_port = 8081; + let peer_address = "126.0.0.1".parse().unwrap(); + + let request = AnnounceRequestBuilder::default() + .with_info_hash(info_hash) + .with_peer_id(peer_id) + .with_ip_address(peer_address) + .with_port(client_port) + .into(); + + let remote_addr = SocketAddr::new(IpAddr::V6(remote_client_ip), remote_client_port); + handle_announce(remote_addr, &request, tracker.clone()).await.unwrap(); + + let peers = tracker.get_all_torrent_peers(&info_hash.0.into()).await; + + // When using IPv6 the tracker converts the remote client ip into a IPv4 address + assert_eq!(peers[0].peer_addr, SocketAddr::new(IpAddr::V6(remote_client_ip), client_port)); + } + + async fn add_a_torrent_peer_using_ipv4(tracker: Arc) { + let info_hash = AquaticInfoHash([0u8; 20]); + + let client_ip_v4 = Ipv4Addr::new(126, 0, 0, 1); + let client_port = 8080; + let peer_id = AquaticPeerId([255u8; 20]); + + let peer_using_ipv4 = TorrentPeerBuilder::default() + .with_peer_id(PeerId(peer_id.0)) + .with_peer_addr(SocketAddr::new(IpAddr::V4(client_ip_v4), client_port)) + .into(); + + tracker + .update_torrent_with_peer_and_get_stats(&info_hash.0.into(), &peer_using_ipv4) + .await; + } + + async fn announce_a_new_peer_using_ipv6(tracker: Arc) -> Response { + let client_ip_v4 = Ipv4Addr::new(126, 0, 0, 1); + let client_ip_v6 = client_ip_v4.to_ipv6_compatible(); + let client_port = 8080; + let remote_addr = SocketAddr::new(IpAddr::V6(client_ip_v6), client_port); + let request = AnnounceRequestBuilder::default().into(); + let response = handle_announce(remote_addr, &request, tracker.clone()).await.unwrap(); + response + } + + #[tokio::test] + async fn when_the_announce_request_comes_from_a_client_using_ipv6_the_response_should_not_include_peers_using_ipv4() { + let tracker = initialized_public_tracker(); + + add_a_torrent_peer_using_ipv4(tracker.clone()).await; + + let response = announce_a_new_peer_using_ipv6(tracker.clone()).await; + + // The response should not contain the peer using IPV4 + let peers: Option>> = match response { + Response::AnnounceIpv4(announce_response) => Some(announce_response.peers), + _ => None, + }; + let no_ipv4_peers = peers.is_none(); + assert!(no_ipv4_peers); + } + + #[tokio::test] + async fn should_send_the_upd6_announce_event() { + let mut tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); + + tracker_stats_service.should_throw_event(TrackerStatisticsEvent::Udp6Announce); + + let tracker = Arc::new(TorrentTracker::new(default_tracker_config(), tracker_stats_service).unwrap()); + handle_announce( + sample_ipv6_remote_addr(), + &AnnounceRequestBuilder::default().into(), + tracker.clone(), + ) + .await + .unwrap(); + } + + mod from_a_loopback_ip { + use std::{ + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, + sync::Arc, + }; + + use aquatic_udp_protocol::{InfoHash as AquaticInfoHash, PeerId as AquaticPeerId}; + + use crate::{ + statistics::StatsTracker, + tracker::tracker::TorrentTracker, + udp::{ + handle_announce, + handlers::tests::{announce_request::AnnounceRequestBuilder, TrackerConfigurationBuilder}, + }, + }; + + #[tokio::test] + async fn the_peer_ip_should_be_changed_to_the_external_ip_in_the_tracker_configuration() { + let configuration = Arc::new(TrackerConfigurationBuilder::default().with_external_ip("::126.0.0.1").into()); + let tracker = + Arc::new(TorrentTracker::new(configuration, Box::new(StatsTracker::new_running_instance())).unwrap()); + + let loopback_ipv4 = Ipv4Addr::new(127, 0, 0, 1); + let loopback_ipv6 = Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1); + + let client_ip_v4 = loopback_ipv4; + let client_ip_v6 = loopback_ipv6; + let client_port = 8080; + + let info_hash = AquaticInfoHash([0u8; 20]); + let peer_id = AquaticPeerId([255u8; 20]); + + let request = AnnounceRequestBuilder::default() + .with_info_hash(info_hash) + .with_peer_id(peer_id) + .with_ip_address(client_ip_v4) + .with_port(client_port) + .into(); + + let remote_addr = SocketAddr::new(IpAddr::V6(client_ip_v6), client_port); + handle_announce(remote_addr, &request, tracker.clone()).await.unwrap(); + + let peers = tracker.get_all_torrent_peers(&info_hash.0.into()).await; + + let _external_ip_in_tracker_configuration = + tracker.config.external_ip.clone().unwrap().parse::().unwrap(); + + // There's a special type of IPv6 addresses that provide compatibility with IPv4. + // The last 32 bits of these addresses represent an IPv4, and are represented like this: + // 1111:2222:3333:4444:5555:6666:1.2.3.4 + // + // ::127.0.0.1 is the IPV6 representation for the IPV4 address 127.0.0.1. + assert_eq!(Ok(peers[0].peer_addr.ip()), "::126.0.0.1".parse()); + } + } + } + } + + mod scrape_request { + use std::{net::SocketAddr, sync::Arc}; + + use aquatic_udp_protocol::{ + InfoHash, NumberOfDownloads, NumberOfPeers, Response, ScrapeRequest, ScrapeResponse, TorrentScrapeStatistics, + TransactionId, + }; + + use crate::{ + protocol::utils::get_connection_id, + tracker::tracker::TorrentTracker, + udp::{ + handle_scrape, + handlers::tests::{initialized_public_tracker, sample_ipv4_remote_addr}, + }, + PeerId, + }; + + use super::TorrentPeerBuilder; + + fn zeroed_torrent_statistics() -> TorrentScrapeStatistics { + TorrentScrapeStatistics { + seeders: NumberOfPeers(0), + completed: NumberOfDownloads(0), + leechers: NumberOfPeers(0), + } + } + + #[tokio::test] + async fn should_return_no_stats_when_the_tracker_does_not_have_any_torrent() { + let remote_addr = sample_ipv4_remote_addr(); + + let info_hash = InfoHash([0u8; 20]); + let info_hashes = vec![info_hash]; + + let request = ScrapeRequest { + connection_id: get_connection_id(&remote_addr), + transaction_id: TransactionId(0i32), + info_hashes, + }; + + let response = handle_scrape(remote_addr, &request, initialized_public_tracker()) + .await + .unwrap(); + + let expected_torrent_stats = vec![zeroed_torrent_statistics()]; + + assert_eq!( + response, + Response::from(ScrapeResponse { + transaction_id: request.transaction_id, + torrent_stats: expected_torrent_stats + }) + ); + } + + async fn add_a_seeder(tracker: Arc, remote_addr: &SocketAddr, info_hash: &InfoHash) { + let peer_id = PeerId([255u8; 20]); + + let peer = TorrentPeerBuilder::default() + .with_peer_id(PeerId(peer_id.0)) + .with_peer_addr(*remote_addr) + .with_bytes_left(0) + .into(); + + tracker + .update_torrent_with_peer_and_get_stats(&info_hash.0.into(), &peer) + .await; + } + + fn build_scrape_request(remote_addr: &SocketAddr, info_hash: &InfoHash) -> ScrapeRequest { + let info_hashes = vec![*info_hash]; + + ScrapeRequest { + connection_id: get_connection_id(remote_addr), + transaction_id: TransactionId(0i32), + info_hashes, + } + } + + async fn add_a_sample_seeder_and_scrape(tracker: Arc) -> Response { + let remote_addr = sample_ipv4_remote_addr(); + let info_hash = InfoHash([0u8; 20]); + + add_a_seeder(tracker.clone(), &remote_addr, &info_hash).await; + + let request = build_scrape_request(&remote_addr, &info_hash); + + handle_scrape(remote_addr, &request, tracker.clone()).await.unwrap() + } + + fn match_scrape_response(response: Response) -> Option { + match response { + Response::Scrape(scrape_response) => Some(scrape_response), + _ => None, + } + } + + mod with_a_public_tracker { + use aquatic_udp_protocol::{NumberOfDownloads, NumberOfPeers, TorrentScrapeStatistics}; + + use crate::udp::handlers::tests::{ + initialized_public_tracker, + scrape_request::{add_a_sample_seeder_and_scrape, match_scrape_response}, + }; + + #[tokio::test] + async fn should_return_torrent_statistics_when_the_tracker_has_the_requested_torrent() { + let tracker = initialized_public_tracker(); + + let torrent_stats = match_scrape_response(add_a_sample_seeder_and_scrape(tracker.clone()).await); + + let expected_torrent_stats = vec![TorrentScrapeStatistics { + seeders: NumberOfPeers(1), + completed: NumberOfDownloads(0), + leechers: NumberOfPeers(0), + }]; + + assert_eq!(torrent_stats.unwrap().torrent_stats, expected_torrent_stats); + } + } + + mod with_a_private_tracker { + + use aquatic_udp_protocol::InfoHash; + + use crate::udp::{ + handle_scrape, + handlers::tests::{ + initialized_private_tracker, sample_ipv4_remote_addr, + scrape_request::{ + add_a_sample_seeder_and_scrape, build_scrape_request, match_scrape_response, zeroed_torrent_statistics, + }, + }, + }; + + #[tokio::test] + async fn should_return_zeroed_statistics_when_the_tracker_does_not_have_the_requested_torrent() { + let tracker = initialized_private_tracker(); + + let remote_addr = sample_ipv4_remote_addr(); + let non_existing_info_hash = InfoHash([0u8; 20]); + + let request = build_scrape_request(&remote_addr, &non_existing_info_hash); + + let torrent_stats = + match_scrape_response(handle_scrape(remote_addr, &request, tracker.clone()).await.unwrap()).unwrap(); + + let expected_torrent_stats = vec![zeroed_torrent_statistics()]; + + assert_eq!(torrent_stats.torrent_stats, expected_torrent_stats); + } + + #[tokio::test] + async fn should_return_zeroed_statistics_when_the_tracker_has_the_requested_torrent_because_authenticated_requests_are_not_supported_in_udp_tracker( + ) { + let tracker = initialized_private_tracker(); + + let torrent_stats = match_scrape_response(add_a_sample_seeder_and_scrape(tracker.clone()).await).unwrap(); + + let expected_torrent_stats = vec![zeroed_torrent_statistics()]; + + assert_eq!(torrent_stats.torrent_stats, expected_torrent_stats); + } + } + + mod with_a_whitelisted_tracker { + use aquatic_udp_protocol::{InfoHash, NumberOfDownloads, NumberOfPeers, TorrentScrapeStatistics}; + + use crate::udp::{ + handle_scrape, + handlers::tests::{ + initialized_whitelisted_tracker, sample_ipv4_remote_addr, + scrape_request::{add_a_seeder, build_scrape_request, match_scrape_response, zeroed_torrent_statistics}, + }, + }; + + #[tokio::test] + async fn should_return_the_torrent_statistics_when_the_requested_torrent_is_whitelisted() { + let tracker = initialized_whitelisted_tracker(); + + let remote_addr = sample_ipv4_remote_addr(); + let info_hash = InfoHash([0u8; 20]); + + add_a_seeder(tracker.clone(), &remote_addr, &info_hash).await; + + tracker.add_torrent_to_memory_whitelist(&info_hash.0.into()).await; + + let request = build_scrape_request(&remote_addr, &info_hash); + + let torrent_stats = + match_scrape_response(handle_scrape(remote_addr, &request, tracker.clone()).await.unwrap()).unwrap(); + + let expected_torrent_stats = vec![TorrentScrapeStatistics { + seeders: NumberOfPeers(1), + completed: NumberOfDownloads(0), + leechers: NumberOfPeers(0), + }]; + + assert_eq!(torrent_stats.torrent_stats, expected_torrent_stats); + } + + #[tokio::test] + async fn should_return_zeroed_statistics_when_the_requested_torrent_is_not_whitelisted() { + let tracker = initialized_whitelisted_tracker(); + + let remote_addr = sample_ipv4_remote_addr(); + let info_hash = InfoHash([0u8; 20]); + + add_a_seeder(tracker.clone(), &remote_addr, &info_hash).await; + + let request = build_scrape_request(&remote_addr, &info_hash); + + let torrent_stats = + match_scrape_response(handle_scrape(remote_addr, &request, tracker.clone()).await.unwrap()).unwrap(); + + let expected_torrent_stats = vec![zeroed_torrent_statistics()]; + + assert_eq!(torrent_stats.torrent_stats, expected_torrent_stats); + } + } + + fn sample_scrape_request(remote_addr: &SocketAddr) -> ScrapeRequest { + let info_hash = InfoHash([0u8; 20]); + let info_hashes = vec![info_hash]; + + ScrapeRequest { + connection_id: get_connection_id(remote_addr), + transaction_id: TransactionId(0i32), + info_hashes, + } + } + + mod using_ipv4 { + use std::sync::Arc; + + use crate::{ + statistics::TrackerStatisticsEvent, + tracker::tracker::TorrentTracker, + udp::handlers::{ + handle_scrape, + tests::{default_tracker_config, sample_ipv4_remote_addr, TrackerStatsServiceMock}, + }, + }; + + use super::sample_scrape_request; + + #[tokio::test] + async fn should_send_the_upd4_scrape_event() { + let mut tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); + + tracker_stats_service.should_throw_event(TrackerStatisticsEvent::Udp4Scrape); + + let remote_addr = sample_ipv4_remote_addr(); + let tracker = Arc::new(TorrentTracker::new(default_tracker_config(), tracker_stats_service).unwrap()); + + handle_scrape(remote_addr, &sample_scrape_request(&remote_addr), tracker.clone()) + .await + .unwrap(); + } + } + + mod using_ipv6 { + use std::sync::Arc; + + use crate::{ + statistics::TrackerStatisticsEvent, + tracker::tracker::TorrentTracker, + udp::handlers::{ + handle_scrape, + tests::{default_tracker_config, sample_ipv6_remote_addr, TrackerStatsServiceMock}, + }, + }; + + use super::sample_scrape_request; + + #[tokio::test] + async fn should_send_the_upd6_scrape_event() { + let mut tracker_stats_service = Box::new(TrackerStatsServiceMock::new()); + + tracker_stats_service.should_throw_event(TrackerStatisticsEvent::Udp6Scrape); + + let remote_addr = sample_ipv6_remote_addr(); + let tracker = Arc::new(TorrentTracker::new(default_tracker_config(), tracker_stats_service).unwrap()); + + handle_scrape(remote_addr, &sample_scrape_request(&remote_addr), tracker.clone()) + .await + .unwrap(); + } + } + } +}