Skip to content

Commit

Permalink
Test: Concurrent clients for TokenRouter (#1010)
Browse files Browse the repository at this point in the history
This is an integration test to ensure that concurrent clients to the
same proxy and endpoint didn't mix packets. Could not replicate the
reported issue below, but it felt like a good test to have for concurrency
testing.

Work on #988
  • Loading branch information
markmandel authored Aug 24, 2024
1 parent f07777c commit fdde430
Showing 1 changed file with 99 additions and 30 deletions.
129 changes: 99 additions & 30 deletions tests/token_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,109 @@

use std::net::{Ipv6Addr, SocketAddr};

use tokio::time::{timeout, Duration};

use quilkin::{
config::Filter,
filters::{Capture, StaticFilter, TokenRouter},
net::endpoint::{metadata::MetadataView, Endpoint},
test::{AddressType, TestHelper},
};
use tokio::time::{timeout, Duration};

/// This test covers both token_router and capture filters,
/// since they work in concert together.
#[tokio::test]
async fn token_router() {
let mut t = TestHelper::default();

let local_addr = echo_server(&mut t).await;

// valid packet
let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;

let msg = b"helloabc";
tracing::trace!(%local_addr, "sending echo packet");
socket.send_to(msg, &local_addr).await.unwrap();

tracing::trace!("awaiting echo packet");
assert_eq!(
"hello",
timeout(Duration::from_millis(500), recv_chan.recv())
.await
.expect("should have received a packet")
.unwrap()
);

// send an invalid packet
let msg = b"helloxyz";
socket.send_to(msg, &local_addr).await.unwrap();

let result = timeout(Duration::from_millis(500), recv_chan.recv()).await;
assert!(result.is_err(), "should not have received a packet");
}

// This test covers the scenario in https://github.com/googleforgames/quilkin/issues/988
// to make sure there are no issues with overlapping streams between clients.
#[tokio::test]
async fn multiple_clients() {
let limit = 10_000;
let mut t = TestHelper::default();
let local_addr = echo_server(&mut t).await;

let (mut a_rx, a_socket) = t.open_socket_and_recv_multiple_packets().await;
let (mut b_rx, b_socket) = t.open_socket_and_recv_multiple_packets().await;

tokio::spawn(async move {
// some room to breath
tokio::time::sleep(Duration::from_millis(50)).await;
for _ in 0..limit {
a_socket.send_to(b"Aabc", &local_addr).await.unwrap();
tokio::time::sleep(Duration::from_nanos(5)).await;
}
});
tokio::spawn(async move {
// some room to breath
tokio::time::sleep(Duration::from_millis(50)).await;
for _ in 0..limit {
b_socket.send_to(b"Babc", &local_addr).await.unwrap();
tokio::time::sleep(Duration::from_nanos(5)).await;
}
});

let mut success = 0;
let mut failed = 0;
for _ in 0..limit {
match timeout(Duration::from_millis(60), a_rx.recv()).await {
Ok(packet) => {
assert_eq!("A", packet.unwrap());
success += 1;
}
Err(_) => {
failed += 1;
}
}
match timeout(Duration::from_millis(60), b_rx.recv()).await {
Ok(packet) => {
assert_eq!("B", packet.unwrap());
success += 1;
}
Err(_) => {
failed += 1;
}
}
}

// allow for some dropped packets, since UDP.
let threshold = 0.95 * (2 * limit) as f64;
assert!(
success as f64 > threshold,
"Success: {}, Failed: {}",
success,
failed
);
}

// start an echo server and return what port it's on.
async fn echo_server(t: &mut TestHelper) -> SocketAddr {
let mut echo = t.run_echo_server(AddressType::Ipv6).await;
quilkin::test::map_to_localhost(&mut echo).await;

Expand All @@ -47,10 +136,13 @@ quilkin.dev:
let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
server_config.clusters.modify(|clusters| {
clusters.insert_default(
[Endpoint::with_metadata(
echo.clone(),
serde_yaml::from_str::<MetadataView<_>>(endpoint_metadata).unwrap(),
)]
[
Endpoint::with_metadata(
echo.clone(),
serde_yaml::from_str::<MetadataView<_>>(endpoint_metadata).unwrap(),
),
"127.0.0.2:5000".parse().unwrap(), // goes nowhere, so shouldn't do anything.
]
.into(),
)
});
Expand All @@ -73,28 +165,5 @@ quilkin.dev:
);

let server_port = t.run_server(server_config, None, None).await;

// valid packet
let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;

let local_addr = SocketAddr::from((Ipv6Addr::LOCALHOST, server_port));
let msg = b"helloabc";
tracing::trace!(%local_addr, "sending echo packet");
socket.send_to(msg, &local_addr).await.unwrap();

tracing::trace!("awaiting echo packet");
assert_eq!(
"hello",
timeout(Duration::from_millis(500), recv_chan.recv())
.await
.expect("should have received a packet")
.unwrap()
);

// send an invalid packet
let msg = b"helloxyz";
socket.send_to(msg, &local_addr).await.unwrap();

let result = timeout(Duration::from_millis(500), recv_chan.recv()).await;
assert!(result.is_err(), "should not have received a packet");
SocketAddr::from((Ipv6Addr::LOCALHOST, server_port))
}

0 comments on commit fdde430

Please sign in to comment.