Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow UDP clients to limit peers in response #1035

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 167 additions & 21 deletions src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ pub mod torrent;

pub mod peer_tests;

use std::cmp::max;
use std::collections::HashMap;
use std::net::IpAddr;
use std::panic::Location;
Expand Down Expand Up @@ -520,6 +521,48 @@ pub struct AnnounceData {
pub policy: AnnouncePolicy,
}

/// How many peers the peer announcing wants in the announce response.
#[derive(Clone, Debug, PartialEq, Default)]
pub enum PeersWanted {
/// The peer wants as many peers as possible in the announce response.
#[default]
All,
/// The peer only wants a certain amount of peers in the announce response.
Only { amount: usize },
}

impl PeersWanted {
#[must_use]
pub fn only(limit: u32) -> Self {
let amount: usize = match limit.try_into() {
Ok(amount) => amount,
Err(_) => TORRENT_PEERS_LIMIT,
};

Self::Only { amount }
}

fn limit(&self) -> usize {
match self {
PeersWanted::All => TORRENT_PEERS_LIMIT,
PeersWanted::Only { amount } => *amount,
}
}
}

impl From<i32> for PeersWanted {
fn from(value: i32) -> Self {
if value > 0 {
match value.try_into() {
Ok(peers_wanted) => Self::Only { amount: peers_wanted },
Err(_) => Self::All,
}
} else {
Self::All
}
}
}

/// Structure that holds the data returned by the `scrape` request.
#[derive(Debug, PartialEq, Default)]
pub struct ScrapeData {
Expand Down Expand Up @@ -639,7 +682,13 @@ impl Tracker {
/// # Context: Tracker
///
/// BEP 03: [The `BitTorrent` Protocol Specification](https://www.bittorrent.org/beps/bep_0003.html).
pub fn announce(&self, info_hash: &InfoHash, peer: &mut peer::Peer, remote_client_ip: &IpAddr) -> AnnounceData {
pub fn announce(
&self,
info_hash: &InfoHash,
peer: &mut peer::Peer,
remote_client_ip: &IpAddr,
peers_wanted: &PeersWanted,
) -> AnnounceData {
// code-review: maybe instead of mutating the peer we could just return
// a tuple with the new peer and the announce data: (Peer, AnnounceData).
// It could even be a different struct: `StoredPeer` or `PublicPeer`.
Expand All @@ -661,7 +710,7 @@ impl Tracker {

let stats = self.upsert_peer_and_get_stats(info_hash, peer);

let peers = self.get_peers_for(info_hash, peer);
let peers = self.get_peers_for(info_hash, peer, peers_wanted.limit());

AnnounceData {
peers,
Expand Down Expand Up @@ -713,16 +762,21 @@ impl Tracker {
Ok(())
}

fn get_peers_for(&self, info_hash: &InfoHash, peer: &peer::Peer) -> Vec<Arc<peer::Peer>> {
/// # Context: Tracker
///
/// Get torrent peers for a given torrent and client.
///
/// It filters out the client making the request.
fn get_peers_for(&self, info_hash: &InfoHash, peer: &peer::Peer, limit: usize) -> Vec<Arc<peer::Peer>> {
match self.torrents.get(info_hash) {
None => vec![],
Some(entry) => entry.get_peers_for_client(&peer.peer_addr, Some(TORRENT_PEERS_LIMIT)),
Some(entry) => entry.get_peers_for_client(&peer.peer_addr, Some(max(limit, TORRENT_PEERS_LIMIT))),
}
}

/// # Context: Tracker
///
/// Get all torrent peers for a given torrent
/// Get torrent peers for a given torrent.
pub fn get_torrent_peers(&self, info_hash: &InfoHash) -> Vec<Arc<peer::Peer>> {
match self.torrents.get(info_hash) {
None => vec![],
Expand Down Expand Up @@ -1199,6 +1253,7 @@ mod tests {
use std::sync::Arc;

use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes, PeerId};
use torrust_tracker_configuration::TORRENT_PEERS_LIMIT;
use torrust_tracker_primitives::info_hash::InfoHash;
use torrust_tracker_primitives::DurationSinceUnixEpoch;
use torrust_tracker_test_helpers::configuration;
Expand Down Expand Up @@ -1328,7 +1383,7 @@ mod tests {
}

#[tokio::test]
async fn it_should_return_all_the_peers_for_a_given_torrent() {
async fn it_should_return_the_peers_for_a_given_torrent() {
let tracker = public_tracker();

let info_hash = sample_info_hash();
Expand All @@ -1341,20 +1396,93 @@ mod tests {
assert_eq!(peers, vec![Arc::new(peer)]);
}

/// It generates a peer id from a number where the number is the last
/// part of the peer ID. For example, for `12` it returns
/// `-qB00000000000000012`.
fn numeric_peer_id(two_digits_value: i32) -> PeerId {
// Format idx as a string with leading zeros, ensuring it has exactly 2 digits
let idx_str = format!("{two_digits_value:02}");

// Create the base part of the peer ID.
let base = b"-qB00000000000000000";

// Concatenate the base with idx bytes, ensuring the total length is 20 bytes.
let mut peer_id_bytes = [0u8; 20];
peer_id_bytes[..base.len()].copy_from_slice(base);
peer_id_bytes[base.len() - idx_str.len()..].copy_from_slice(idx_str.as_bytes());

PeerId(peer_id_bytes)
}

#[tokio::test]
async fn it_should_return_74_peers_at_the_most_for_a_given_torrent() {
let tracker = public_tracker();

let info_hash = sample_info_hash();

for idx in 1..=75 {
let peer = Peer {
peer_id: numeric_peer_id(idx),
peer_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(126, 0, 0, idx.try_into().unwrap())), 8080),
updated: DurationSinceUnixEpoch::new(1_669_397_478_934, 0),
uploaded: NumberOfBytes::new(0),
downloaded: NumberOfBytes::new(0),
left: NumberOfBytes::new(0), // No bytes left to download
event: AnnounceEvent::Completed,
};

tracker.upsert_peer_and_get_stats(&info_hash, &peer);
}

let peers = tracker.get_torrent_peers(&info_hash);

assert_eq!(peers.len(), 74);
}

#[tokio::test]
async fn it_should_return_all_the_peers_for_a_given_torrent_excluding_a_given_peer() {
async fn it_should_return_the_peers_for_a_given_torrent_excluding_a_given_peer() {
let tracker = public_tracker();

let info_hash = sample_info_hash();
let peer = sample_peer();

tracker.upsert_peer_and_get_stats(&info_hash, &peer);

let peers = tracker.get_peers_for(&info_hash, &peer);
let peers = tracker.get_peers_for(&info_hash, &peer, TORRENT_PEERS_LIMIT);

assert_eq!(peers, vec![]);
}

#[tokio::test]
async fn it_should_return_74_peers_at_the_most_for_a_given_torrent_when_it_filters_out_a_given_peer() {
let tracker = public_tracker();

let info_hash = sample_info_hash();

let excluded_peer = sample_peer();

tracker.upsert_peer_and_get_stats(&info_hash, &excluded_peer);

// Add 74 peers
for idx in 2..=75 {
let peer = Peer {
peer_id: numeric_peer_id(idx),
peer_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(126, 0, 0, idx.try_into().unwrap())), 8080),
updated: DurationSinceUnixEpoch::new(1_669_397_478_934, 0),
uploaded: NumberOfBytes::new(0),
downloaded: NumberOfBytes::new(0),
left: NumberOfBytes::new(0), // No bytes left to download
event: AnnounceEvent::Completed,
};

tracker.upsert_peer_and_get_stats(&info_hash, &peer);
}

let peers = tracker.get_peers_for(&info_hash, &excluded_peer, TORRENT_PEERS_LIMIT);

assert_eq!(peers.len(), 74);
}

#[tokio::test]
async fn it_should_return_the_torrent_metrics() {
let tracker = public_tracker();
Expand Down Expand Up @@ -1409,6 +1537,7 @@ mod tests {
use crate::core::tests::the_tracker::{
peer_ip, public_tracker, sample_info_hash, sample_peer, sample_peer_1, sample_peer_2,
};
use crate::core::PeersWanted;

mod should_assign_the_ip_to_the_peer {

Expand Down Expand Up @@ -1514,7 +1643,7 @@ mod tests {

let mut peer = sample_peer();

let announce_data = tracker.announce(&sample_info_hash(), &mut peer, &peer_ip());
let announce_data = tracker.announce(&sample_info_hash(), &mut peer, &peer_ip(), &PeersWanted::All);

assert_eq!(announce_data.peers, vec![]);
}
Expand All @@ -1524,10 +1653,15 @@ mod tests {
let tracker = public_tracker();

let mut previously_announced_peer = sample_peer_1();
tracker.announce(&sample_info_hash(), &mut previously_announced_peer, &peer_ip());
tracker.announce(
&sample_info_hash(),
&mut previously_announced_peer,
&peer_ip(),
&PeersWanted::All,
);

let mut peer = sample_peer_2();
let announce_data = tracker.announce(&sample_info_hash(), &mut peer, &peer_ip());
let announce_data = tracker.announce(&sample_info_hash(), &mut peer, &peer_ip(), &PeersWanted::All);

assert_eq!(announce_data.peers, vec![Arc::new(previously_announced_peer)]);
}
Expand All @@ -1537,14 +1671,15 @@ mod tests {
use crate::core::tests::the_tracker::{
completed_peer, leecher, peer_ip, public_tracker, sample_info_hash, seeder, started_peer,
};
use crate::core::PeersWanted;

#[tokio::test]
async fn when_the_peer_is_a_seeder() {
let tracker = public_tracker();

let mut peer = seeder();

let announce_data = tracker.announce(&sample_info_hash(), &mut peer, &peer_ip());
let announce_data = tracker.announce(&sample_info_hash(), &mut peer, &peer_ip(), &PeersWanted::All);

assert_eq!(announce_data.stats.complete, 1);
}
Expand All @@ -1555,7 +1690,7 @@ mod tests {

let mut peer = leecher();

let announce_data = tracker.announce(&sample_info_hash(), &mut peer, &peer_ip());
let announce_data = tracker.announce(&sample_info_hash(), &mut peer, &peer_ip(), &PeersWanted::All);

assert_eq!(announce_data.stats.incomplete, 1);
}
Expand All @@ -1566,10 +1701,11 @@ mod tests {

// We have to announce with "started" event because peer does not count if peer was not previously known
let mut started_peer = started_peer();
tracker.announce(&sample_info_hash(), &mut started_peer, &peer_ip());
tracker.announce(&sample_info_hash(), &mut started_peer, &peer_ip(), &PeersWanted::All);

let mut completed_peer = completed_peer();
let announce_data = tracker.announce(&sample_info_hash(), &mut completed_peer, &peer_ip());
let announce_data =
tracker.announce(&sample_info_hash(), &mut completed_peer, &peer_ip(), &PeersWanted::All);

assert_eq!(announce_data.stats.downloaded, 1);
}
Expand All @@ -1583,7 +1719,7 @@ mod tests {
use torrust_tracker_primitives::info_hash::InfoHash;

use crate::core::tests::the_tracker::{complete_peer, incomplete_peer, public_tracker};
use crate::core::{ScrapeData, SwarmMetadata};
use crate::core::{PeersWanted, ScrapeData, SwarmMetadata};

#[tokio::test]
async fn it_should_return_a_zeroed_swarm_metadata_for_the_requested_file_if_the_tracker_does_not_have_that_torrent(
Expand All @@ -1609,11 +1745,21 @@ mod tests {

// Announce a "complete" peer for the torrent
let mut complete_peer = complete_peer();
tracker.announce(&info_hash, &mut complete_peer, &IpAddr::V4(Ipv4Addr::new(126, 0, 0, 10)));
tracker.announce(
&info_hash,
&mut complete_peer,
&IpAddr::V4(Ipv4Addr::new(126, 0, 0, 10)),
&PeersWanted::All,
);

// Announce an "incomplete" peer for the torrent
let mut incomplete_peer = incomplete_peer();
tracker.announce(&info_hash, &mut incomplete_peer, &IpAddr::V4(Ipv4Addr::new(126, 0, 0, 11)));
tracker.announce(
&info_hash,
&mut incomplete_peer,
&IpAddr::V4(Ipv4Addr::new(126, 0, 0, 11)),
&PeersWanted::All,
);

// Scrape
let scrape_data = tracker.scrape(&vec![info_hash]).await;
Expand Down Expand Up @@ -1740,7 +1886,7 @@ mod tests {
use crate::core::tests::the_tracker::{
complete_peer, incomplete_peer, peer_ip, sample_info_hash, whitelisted_tracker,
};
use crate::core::ScrapeData;
use crate::core::{PeersWanted, ScrapeData};

#[test]
fn it_should_be_able_to_build_a_zeroed_scrape_data_for_a_list_of_info_hashes() {
Expand All @@ -1761,11 +1907,11 @@ mod tests {
let info_hash = "3b245504cf5f11bbdbe1201cea6a6bf45aee1bc0".parse::<InfoHash>().unwrap();

let mut peer = incomplete_peer();
tracker.announce(&info_hash, &mut peer, &peer_ip());
tracker.announce(&info_hash, &mut peer, &peer_ip(), &PeersWanted::All);

// Announce twice to force non zeroed swarm metadata
let mut peer = complete_peer();
tracker.announce(&info_hash, &mut peer, &peer_ip());
tracker.announce(&info_hash, &mut peer, &peer_ip(), &PeersWanted::All);

let scrape_data = tracker.scrape(&vec![info_hash]).await;

Expand Down
3 changes: 2 additions & 1 deletion src/servers/http/v1/extractors/announce_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ mod tests {

#[test]
fn it_should_extract_the_announce_request_from_the_url_query_params() {
let raw_query = "info_hash=%3B%24U%04%CF%5F%11%BB%DB%E1%20%1C%EAjk%F4Z%EE%1B%C0&peer_addr=2.137.87.41&downloaded=0&uploaded=0&peer_id=-qB00000000000000001&port=17548&left=0&event=completed&compact=0";
let raw_query = "info_hash=%3B%24U%04%CF%5F%11%BB%DB%E1%20%1C%EAjk%F4Z%EE%1B%C0&peer_addr=2.137.87.41&downloaded=0&uploaded=0&peer_id=-qB00000000000000001&port=17548&left=0&event=completed&compact=0&numwant=50";

let announce = extract_announce_from(Some(raw_query)).unwrap();

Expand All @@ -126,6 +126,7 @@ mod tests {
left: Some(NumberOfBytes::new(0)),
event: Some(Event::Completed),
compact: Some(Compact::NotAccepted),
numwant: Some(50),
}
);
}
Expand Down
9 changes: 7 additions & 2 deletions src/servers/http/v1/handlers/announce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use torrust_tracker_clock::clock::Time;
use torrust_tracker_primitives::peer;

use crate::core::auth::Key;
use crate::core::{AnnounceData, Tracker};
use crate::core::{AnnounceData, PeersWanted, Tracker};
use crate::servers::http::v1::extractors::announce_request::ExtractRequest;
use crate::servers::http::v1::extractors::authentication_key::Extract as ExtractKey;
use crate::servers::http::v1::extractors::client_ip_sources::Extract as ExtractClientIpSources;
Expand Down Expand Up @@ -110,8 +110,12 @@ async fn handle_announce(
};

let mut peer = peer_from_request(announce_request, &peer_ip);
let peers_wanted = match announce_request.numwant {
Some(numwant) => PeersWanted::only(numwant),
None => PeersWanted::All,
};

let announce_data = services::announce::invoke(tracker.clone(), announce_request.info_hash, &mut peer).await;
let announce_data = services::announce::invoke(tracker.clone(), announce_request.info_hash, &mut peer, &peers_wanted).await;

Ok(announce_data)
}
Expand Down Expand Up @@ -205,6 +209,7 @@ mod tests {
left: None,
event: None,
compact: None,
numwant: None,
}
}

Expand Down
Loading
Loading