Skip to content

Commit

Permalink
fix(qconnection): fix port 0 can't be bound repeatedly
Browse files Browse the repository at this point in the history
  • Loading branch information
eareimu authored and huster-zhangpeng committed Nov 27, 2024
1 parent 49ed01d commit f11e1d0
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 30 deletions.
74 changes: 62 additions & 12 deletions qconnection/src/usc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ impl UscRegistry {
Task: Future<Output = ()> + Send + 'static,
F: FnOnce(ArcUsc) -> Task,
{
// lock the entry, avoid the racing condition
// for port 0, its always create a new usc
if addr.port() == 0 {
return UscRegistry::create_new_usc(addr, recv_task);
}

// for other ports, lock the entry for avoiding the racing condition
let entry = USC_REGISTRY.entry(addr);
if let dashmap::Entry::Occupied(entry) = entry {
return Ok(entry.get().0.clone());
Expand All @@ -67,20 +72,13 @@ impl UscRegistry {
Task: Future<Output = ()> + Send + 'static,
F: FnOnce(ArcUsc) -> Task,
{
// lock the entry, avoid the racing condition
let entry = USC_REGISTRY.entry(addr);
if let dashmap::Entry::Occupied(_exist_usc) = entry {
let error = io::Error::new(io::ErrorKind::AddrInUse, "address already in use");
return Err(error);
}

let usc = Arc::new(qudp::UdpSocketController::new(addr)?);
let addr = usc.local_addr()?;

let usc = ArcUsc { usc, addr };

let recv_task = tokio::spawn(recv_task(usc.clone()));
entry.insert((usc.clone(), recv_task));
USC_REGISTRY.insert(addr, (usc.clone(), recv_task));

Ok(usc)
}
Expand Down Expand Up @@ -140,13 +138,12 @@ impl ArcUsc {
}
}

// TOOD: its not a good idea to use strong_count to determine whether the usc is dropped
impl Drop for ArcUsc {
fn drop(&mut self) {
// 3 = self, registry, recv_task
// for server, it will hold one instance of usc
if Arc::strong_count(&self.usc) == 3 {
// is possible that: recv_task is complete because usc error
// or, while this drop is called, another drop is called,
// so, pattern matching is necessary here, expect will panic
if let Some((_addr, (_usc, recvtask))) = USC_REGISTRY.remove(&self.addr) {
recvtask.abort();
}
Expand Down Expand Up @@ -179,3 +176,56 @@ impl Future for SendAllViaPathWay<'_> {
Poll::Ready(Ok(()))
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn bind() {
let recv_task = |usc: ArcUsc| async move {
let _usc = usc;
core::future::pending::<()>().await;
};

let unspecified: SocketAddr = "127.0.0.1:0".parse().unwrap();
let specified: SocketAddr = "127.0.0.1:18123".parse().unwrap();

{
// bind unspecified
// hold the usc or it will be dropped immediately.
let usc = UscRegistry::get_or_create_usc(unspecified, recv_task);
assert!(usc.is_ok());
assert_eq!(USC_REGISTRY.len(), 1);

let usc = UscRegistry::get_or_create_usc(unspecified, recv_task);
assert!(usc.is_ok());
assert_eq!(USC_REGISTRY.len(), 2);

let usc = UscRegistry::create_new_usc(unspecified, recv_task);
assert!(usc.is_ok());
assert_eq!(USC_REGISTRY.len(), 3);

let usc = UscRegistry::create_new_usc(unspecified, recv_task);
assert!(usc.is_ok());
assert_eq!(USC_REGISTRY.len(), 4);

// bind specified, and reuse the address
let usc = UscRegistry::create_new_usc(specified, recv_task);
assert!(usc.is_ok());
assert_eq!(USC_REGISTRY.len(), 5);

// faild beacuse the address is already bound
let usc = UscRegistry::create_new_usc(specified, recv_task);
assert!(usc.is_err());
assert_eq!(USC_REGISTRY.len(), 5);

// its ok to get the exist usc
let usc = UscRegistry::get_or_create_usc(specified, recv_task);
assert!(usc.is_ok());
assert_eq!(USC_REGISTRY.len(), 5);
}
// empty because all usage of the usc is dropped
assert!(USC_REGISTRY.is_empty());
}
}
22 changes: 5 additions & 17 deletions qrecovery/src/recv/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,23 +135,11 @@ impl<TX> Drop for Reader<TX> {
let mut recver = self.0.recver();
let inner = recver.deref_mut();
if let Ok(receiving_state) = inner {
match receiving_state {
Recver::Recv(r) => {
assert!(
r.is_stopped(),
r#"RecvStream in Recv State must be
stopped with error code before dropped!"#
)
}
Recver::SizeKnown(r) => {
assert!(
r.is_stopped(),
r#"RecvStream in Recv State must be
stopped with error code before dropped!"#
)
}
_ => (),
}
debug_assert!(
!(matches!(receiving_state, Recver::Recv(state) if !state.is_stopped())
|| matches!(receiving_state, Recver::SizeKnown(state) if !state.is_stopped())),
"RecvStream must tell peer to stop sending or be done before dropped!"
);
}
}
}
2 changes: 1 addition & 1 deletion qrecovery/src/send/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl<TX> Drop for Writer<TX> {
sending_state,
Sender::DataRcvd | Sender::ResetSent(_) | Sender::ResetRcvd(_)
),
"SendingStream must be shutdowned before dropped!"
"SendingStream must be shutdowned or cancelled before dropped!"
);
};
}
Expand Down

0 comments on commit f11e1d0

Please sign in to comment.