From 08c1be83269ef96af1c0de61700c8d7a93175355 Mon Sep 17 00:00:00 2001 From: eareimu Date: Tue, 21 Jan 2025 17:02:35 +0800 Subject: [PATCH] feat(gm-quic): reuse connection and documents --- Cargo.toml | 7 +- README.md | 26 +++-- README_CN.md | 24 ++-- gm-quic/Cargo.toml | 1 - gm-quic/examples/client.rs | 9 +- gm-quic/examples/server.rs | 1 + gm-quic/src/client.rs | 224 ++++++++++++++++++------------------- gm-quic/src/interfaces.rs | 5 +- gm-quic/src/server.rs | 44 ++++---- qinterface/src/router.rs | 12 +- 10 files changed, 179 insertions(+), 174 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 65b1e6eb..cdbbdc1a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,20 +3,21 @@ resolver = "2" members = [ "qbase", "qrecovery", - "qconnection", "qcongestion", "qudp", + "qinterface", "qunreliable", + "qconnection", "gm-quic", "h3-shim", - "qinterface", ] default-members = [ "qbase", "qrecovery", - "qconnection", "qcongestion", "qunreliable", + "qinterface", + "qconnection", "gm-quic", ] diff --git a/README.md b/README.md index 79f68fb3..e8b0f582 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ With these layers in place, it becomes clear that the `Accept Functor` and the ` - **qbase**: Core structure of the QUIC protocol, including variable integer encoding (VarInt), connection ID management, stream ID, various frame and packet type definitions, and asynchronous keys. - **qrecovery**: The reliable transport part of QUIC, encompassing the state machine evolution of the sender/receiver, and the internal logic interaction between the application layer and the transport layer. - **qcongestion**: Congestion control in QUIC, which abstracts a unified congestion control interface and implements BBRv1. In the future, it will also implement more transport control algorithms such as Cubic and others. +- **qinterface**: QUIC's packet routing and definition of the underlying IO interface (`QuicInterface`) enable gm-quic to run in various environments. Contains an optional qudp-based `QuicInterface` implementation - **qconnection**: Encapsulation of QUIC connections, linking the necessary components and tasks within a QUIC connection to ensure smooth operation. - **gm-quic**: The top-level encapsulation of the QUIC protocol, including interfaces for both the QUIC client and server. - **qudp**: High-performance UDP encapsulation for QUIC. Ordinary UDP incurs a system call for each packet sent or received, resulting in poor performance. @@ -78,15 +79,27 @@ The QUIC client not only offers configuration options specified by the QUIC prot ```rust let quic_client = QuicClient::builder() + // Allows reusing a connection to the server when there is already one, + // instead of initiating a new connection every time. .reuse_connection() + // Keep the connection alive when it is idle + .keep_alive(Durnation::from_secs(30)) // The QUIC version negotiation mechanism prioritizes using the earlier versions, // currently only supporting V1. .prefer_versions([1u32]) // .with_parameter(&client_parameters) // If not set, the default parameters will be used + // .witl_streams_controller(controller) // Specify the streams controller for the client // .with_token_sink(token_sink) // Manage Tokens issued by various servers .with_root_certificates(root_certificates) // .with_webpki_verifier(verifier) // More advanced ways to verify server certificates .without_cert() // Generally, clients do not need to set certificates + // Specify how client bind interfaces + // The default interface is the high-performance udp implementation provided by qudp. + // .with_iface_binder(binder) + // Let the client only use the interface on specified address. + // By default, a new interface will be used every time initiates a connection. + // like 0.0.0.0:0 or [::]:0 + // .bind(&local_addrs[..])? .build(); let quic_client_conn = quic_client @@ -98,11 +111,9 @@ The QUIC server provides SNI(Server Name Indication) support in TLS, allowing th ```rust let quic_server = QuicServer::builder() + // Keep the accepted connection alive when it is idle + .keep_alive(Durnation::from_secs(30)) .with_supported_versions([1u32]) - // for load balancing - // .with_load_balance(move |initial_packet: &InitialPacket| -> Option { - // ... - // }) .without_cert_verifier() // Generally, client identity is not verified .enable_sni() .add_host("www.genmeta.net", www_cert, www_key) @@ -118,6 +129,8 @@ while let Ok(quic_server_conn) = quic_server.accept().await? { } ``` +For complete examples, please refer to the `examples` folders under the `h3-shim`, `gm-quic` and `qconnection` folders. + There is an asynchronous interface for creating unidirectional or bidirectional QUIC streams from a QUIC Connection, or for listening to incoming streams from the other side of a QUIC Connection. This interface is almost identical to the one in [`hyperium/h3`](https://github.com/hyperium/h3/blob/master/docs/PROPOSAL.md#5-quic-transport). We also implement the interface defined by [`hyperium/h3`](https://github.com/hyperium/h3/blob/master/docs/PROPOSAL.md#5-quic-transport) in `h3-shim` crate to facilitate with other crates integrated. We have a frok of `reqwest` that use `gm-quic` as the transport layer, you can find it [here](https://github.com/genmeta/reqwest/tree/gm-quic). @@ -126,12 +139,11 @@ As for reading and writing data from a QUIC stream, the tokio's `AsyncRead` and ## Progress -`gm-quic` has entered the final testing phase. Next, we will further improve the documentation and add the qlog module. Stay tuned. +The early version has been released and is still being continuously optimized and improved. Welcome to use it :D ## Documentation -While `gm-quic` is not yet complete, its documentation will not be uploaded to `crate.io`. -Please refer to the documentation within the code for now! +Online documentation released with the release is at docs.rs. You can also view the latest documentation in the code. ## Contribution diff --git a/README_CN.md b/README_CN.md index 055d6576..dcba8034 100644 --- a/README_CN.md +++ b/README_CN.md @@ -24,6 +24,7 @@ QUIC协议可谓一个相当复杂的、IO密集型的协议,因此正是适 - **qbase**: QUIC协议的基础结构,包括可变整型编码VarInt、连接ID管理、流ID、各种帧以及包类型定义、异步密钥等 - **qrecovery**: QUIC的可靠传输部分,包括发送端/接收端的状态机演变、应用层与传输层的内部逻辑交互等 - **qcongestion**: QUIC的拥塞控制,抽象了统一的拥塞控制接口,并实现了BBRv1,未来还会实现Cubic、ETC等更多的传输控制算法 +- **qinterface**: QUIC的数据包路由和对底层IO接口(`QuicInterface`)的定义,令gm-quic可以运行在各种环境。内含一个可选的基于qudp的`QuicInterface`实现 - **qconnection**: QUIC连接封装,将QUIC连接内部所需的各组件、任务串联起来,最终能够完美运行 - **gm-quic**: QUIC协议的顶层封装,包括QUIC客户端和服务端2部分的接口 - **qudp**: QUIC的高性能UDP封装,普通的UDP每收发一个包就是一次系统调用,性能低下。qudp则使用GSO、GRO等手段极致优化UDP的性能,如发送的压测效果如下: @@ -68,14 +69,23 @@ QUIC客户端不仅提供了QUIC协议所规定的Parameters选项配置,也 ```rust let quic_client = QuicClient::builder() + // 允许复用到服务器的连接,而不是每次都发起新连接 .reuse_connection() - .enable_happy_eyeballs() + // 自动在连接空闲时发送数据包保持连接活跃 + .keep_alive(Durnation::from_secs(30)) .prefer_versions([1u32]) // QUIC的版本协商机制,会优先使用靠前的版本,目前仅支持V1 // .with_parameter(&client_parameters) // 不设置即为使用默认参数 // .with_token_sink(token_sink) // 管理各服务器颁发的Token .with_root_certificates(root_certificates) // .with_webpki_verifier(verifier) // 更高级地验证服务端证书的办法 .without_cert() // 一般客户端不必设置证书 + // 指定客户端怎么绑定接口 + // 默认的接口为qudp提供的高性能实现 + // .with_iface_binder(binder) + // 令client只使用给定的地址 + // 默认client每次建立连接时会创建一个新的接口,绑定系统随机分配的地址端口 + // 即绑定0.0.0.0:0 或 [::]:0 + // .bind(&local_addrs[..])? .build(); let quic_client_conn = quic_client @@ -87,11 +97,9 @@ QUIC服务端支持SNI(Server Name Indication),可以设置多台Server的 ```rust let quic_server = QuicServer::builder() + // 同client + .keep_alive(Durnation::from_secs(30)) .with_supported_versions([1u32]) - // 通过重试包进行负载均衡 - // .with_load_balance(move |initial_packet: &InitialPacket| -> Option { - // ... - // }) .without_cert_verifier() // 一般不验证客户端身份 .enable_sni() .add_host("www.genmeta.net", www_cert, www_key) @@ -107,17 +115,19 @@ while let Ok(quic_server_conn) = quic_server.accept().await? { } ``` +完整用例请翻阅`h3-shim`,`gm-quic`以及`qconnection`文件夹下的`examples`文件夹。 + 关于如何从QUIC Connection中创建单向QUIC流,或者双向QUIC流,抑或是从QUIC Connection监听来自对方的流,都有一套异步的接口,这套接口几乎与[`hyperium/h3`](https://github.com/hyperium/h3/blob/master/docs/PROPOSAL.md#5-quic-transport)的接口相同。 至于如何从QUIC流中读写数据,则为QUIC流实现了标准的`AsyncRead`、`AsyncWrite`接口,可以很方便地使用。 ## 进展 -`gm-quic`已经进入到最后的测试阶段。接下来,将进一步完善文档,以及补充qlog模块,敬请期待。 +早期版本已经发布,目前仍在不断优化改进中,欢迎使用 :D ## 文档 -在`gm-quic`尚未完成之际,其文档也不会上传托管到`crate.io`。请暂且先查看代码中的文档! +随版本发布的在线文档位于docs.rs,也可查看代码中的最新文档。 ## 贡献 diff --git a/gm-quic/Cargo.toml b/gm-quic/Cargo.toml index 27a3e0ef..b968a6e4 100644 --- a/gm-quic/Cargo.toml +++ b/gm-quic/Cargo.toml @@ -22,7 +22,6 @@ qcongestion = { workspace = true } qconnection = { workspace = true } qrecovery = { workspace = true } qinterface = { workspace = true } -qudp = { workspace = true } qunreliable = { workspace = true } rustls = { workspace = true } thiserror = { workspace = true } diff --git a/gm-quic/examples/client.rs b/gm-quic/examples/client.rs index 3d0af37d..ff082971 100644 --- a/gm-quic/examples/client.rs +++ b/gm-quic/examples/client.rs @@ -67,16 +67,9 @@ async fn run(args: Arguments) -> Result<(), Box> { .build(); let quic_conn = client.connect(args.domain, args.addr).unwrap(); - let mut counter = 0; loop { let mut input = String::new(); - - if counter == 0 { - input = "/README.md".to_string(); - counter += 1; - } else { - let _n = std::io::stdin().read_line(&mut input).unwrap(); - } + _ = std::io::stdin().read_line(&mut input).unwrap(); let content = input.trim(); if content.is_empty() { diff --git a/gm-quic/examples/server.rs b/gm-quic/examples/server.rs index 9c1482a5..c346a71a 100644 --- a/gm-quic/examples/server.rs +++ b/gm-quic/examples/server.rs @@ -43,6 +43,7 @@ async fn run(options: Opt) -> Result<(), Box> { let server = QuicServer::builder() .with_supported_versions([0x00000001u32]) .without_cert_verifier() + // .keep_alive() .with_single_cert(options.cert, options.key) .listen(options.bind)?; diff --git a/gm-quic/src/client.rs b/gm-quic/src/client.rs index 996f10f0..90ca3a97 100644 --- a/gm-quic/src/client.rs +++ b/gm-quic/src/client.rs @@ -1,7 +1,7 @@ use std::{ io, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs}, - sync::Arc, + sync::{Arc, LazyLock}, time::Duration, }; @@ -36,13 +36,17 @@ pub struct QuicClient { quic_iface_binder: Box io::Result> + Send + Sync>, // TODO: 要改成一个加载上次连接的parameters的函数,根据server name _remembered: Option, - _reuse_connection: bool, - reuse_udp_sockets: bool, + reuse_connection: bool, + reuse_interfaces: bool, streams_controller: Box Box + Send + Sync>, tls_config: Arc, token_sink: Option>, } +// (server_name, server_addr) +static REUSEABLE_CONNECTIONS: LazyLock>> = + LazyLock::new(DashMap::new); + impl QuicClient { /// Start to build a QuicClient. /// @@ -67,7 +71,7 @@ impl QuicClient { pub fn builder() -> QuicClientBuilder> { QuicClientBuilder { bind_interfaces: Arc::new(DashMap::new()), - reuse_udp_sockets: false, + reuse_interfaces: false, reuse_connection: true, enable_happy_eyepballs: false, prefer_versions: vec![1], @@ -86,7 +90,7 @@ impl QuicClient { ) -> QuicClientBuilder> { QuicClientBuilder { bind_interfaces: Arc::new(DashMap::new()), - reuse_udp_sockets: false, + reuse_interfaces: false, reuse_connection: true, enable_happy_eyepballs: false, prefer_versions: vec![1], @@ -107,7 +111,7 @@ impl QuicClient { pub fn builder_with_tls(tls_config: TlsClientConfig) -> QuicClientBuilder { QuicClientBuilder { bind_interfaces: Arc::new(DashMap::new()), - reuse_udp_sockets: false, + reuse_interfaces: false, reuse_connection: true, enable_happy_eyepballs: false, prefer_versions: vec![1], @@ -120,47 +124,11 @@ impl QuicClient { } } - /// Returns the connection to the specified server. - /// - /// `server_name` is the name of the server, it will be included in the `ClientHello` message. - /// - /// `server_addr` is the address of the server, packets will be sent to this address. - /// - /// Note that the returned connection may not yet be connected to the server, but you can use it to do anything you - /// want, such as sending data, receiving data... operations will be pending until the connection is connected or - /// failed to connect. - /// - /// ### (WIP)Reuse connection - /// - /// If `reuse connection` is enabled, the client will try to reuse the connection that has already connected to the - /// server, this means that the client will not initiates a new connection, but return the existing connection. - /// Otherwise, the client will initiates a new connection to the server. - /// - /// If `reuse connection` is not enabled or there is no connection that can be reused, the client will bind a UDP Socket - /// and initiates a new connection to the server. - /// - /// If the client does not bind any address, Each time the client initiates a new connection, the client will use - /// the address and port that dynamic assigned by the system. - /// - /// If the client has already bound a set of addresses, The client will successively try to bind to an address that - /// matches the server's address family, until an address is successfully bound. If none of the given addresses are - /// successfully bound, the last error will be returned (similar to `UdpSocket::bind`). Its also possiable that all - /// of the bound addresses dont match the server's address family, an error will be returned in this case. - /// - /// How the client binds the address depends on whether `reuse udp sockets` is enabled. - /// - /// If `reuse udp sockets` is enabled, the client may share the same address with other connections. If `reuse udp - /// sockets` is disabled (default), The client will not bind to addresses that is already used by another connection. - /// - /// Note that although `reuse udp sockets` is not enabled, the socket bound by the client may still be reused, because - /// this option can only determine the behavior of this client when initiates a new connection. - pub fn connect( + fn new_connection( &self, - server_name: impl Into, + server_name: String, server_addr: SocketAddr, ) -> io::Result> { - let server_name = server_name.into(); - let quic_iface = match &self.bind_interfaces { None => { let quic_iface = if server_addr.is_ipv4() { @@ -174,7 +142,7 @@ impl QuicClient { Some(bind_interfaces) => { let no_available_address = || { let mut reason = String::from("No address matches the server's address family"); - if !self.reuse_udp_sockets { + if !self.reuse_interfaces { reason .push_str(", or all the bound addresses are used by other connections"); } @@ -185,7 +153,7 @@ impl QuicClient { .map(|entry| *entry.key()) .filter(|local_addr| local_addr.is_ipv4() == server_addr.is_ipv4()) .find_map(|local_addr| { - if self.reuse_udp_sockets { + if self.reuse_interfaces { Interfaces::try_acquire_unique(local_addr) } else { Interfaces::try_acquire_shared(local_addr) @@ -210,7 +178,7 @@ impl QuicClient { let (event_broker, mut events) = mpsc::unbounded_channel(); let connection = Arc::new( - Connection::with_token_sink(server_name, token_sink) + Connection::with_token_sink(server_name.clone(), token_sink) .with_parameters(self.parameters, None) .with_tls_config(self.tls_config.clone()) .with_streams_ctrl(&self.streams_controller) @@ -226,14 +194,26 @@ impl QuicClient { while let Some(event) = events.recv().await { match event { Event::Handshaked => {} - Event::Failed(error) => connection.enter_closing(error.into()), Event::ProbedNewPath(_, _) => {} Event::PathInactivated(_pathway, socket) => { _ = Interfaces::try_free_interface(socket.src()) } - Event::Closed(ccf) => connection.enter_draining(ccf), + Event::Failed(error) => { + REUSEABLE_CONNECTIONS + .remove_if(&(server_name.clone(), server_addr), |_, exist| { + Arc::ptr_eq(&connection, exist) + }); + connection.enter_closing(error.into()) + } + Event::Closed(ccf) => { + REUSEABLE_CONNECTIONS + .remove_if(&(server_name.clone(), server_addr), |_, exist| { + Arc::ptr_eq(&connection, exist) + }); + connection.enter_draining(ccf) + } Event::StatelessReset => {} - Event::Terminated => { /* Todo: connections set */ } + Event::Terminated => {} } } } @@ -242,6 +222,53 @@ impl QuicClient { connection.add_path(socket, pathway)?; Ok(connection) } + + /// Returns the connection to the specified server. + /// + /// `server_name` is the name of the server, it will be included in the `ClientHello` message. + /// + /// `server_addr` is the address of the server, packets will be sent to this address. + /// + /// Note that the returned connection may not yet be connected to the server, but you can use it to do anything you + /// want, such as sending data, receiving data... operations will be pending until the connection is connected or + /// failed to connect. + /// + /// ### Select an interface + /// + /// First, the client will select an interface to communicate with the server. + /// + /// If the client has already bound a set of addresses, The client will select the interface whose IP family of the + /// first address matches the server addr from the bound and not closed interfaces. + /// + /// If `reuse_interfaces` is not enabled; the client will not select an interface that is in use. + /// + /// ### Connecte to server + /// + /// If connection reuse is enabled, the client will give priority to returning the existing connection to the + /// `server_name` and `server_addr`. + /// + /// If the client does not bind any interface, the client will bind the interface on the address/port randomly assigned + /// by the system (i.e. xxx) through `quic_iface_binder` *every time* it establishes a connection. When no interface is + /// bound, the reuse interface option will have no effect. + /// + /// If `reuse connection` is not enabled or there is no connection that can be reused, the client will initiates + /// a new connection to the server. + pub fn connect( + &self, + server_name: impl Into, + server_addr: SocketAddr, + ) -> io::Result> { + let server_name = server_name.into(); + + if self.reuse_connection { + REUSEABLE_CONNECTIONS + .entry((server_name.clone(), server_addr)) + .or_try_insert_with(|| self.new_connection(server_name, server_addr)) + .map(|entry| entry.clone()) + } else { + self.new_connection(server_name, server_addr) + } + } } impl Drop for QuicClient { @@ -258,7 +285,7 @@ impl Drop for QuicClient { /// A builder for [`QuicClient`]. pub struct QuicClientBuilder { bind_interfaces: Arc>>, - reuse_udp_sockets: bool, + reuse_interfaces: bool, reuse_connection: bool, enable_happy_eyepballs: bool, prefer_versions: Vec, @@ -271,7 +298,7 @@ pub struct QuicClientBuilder { } impl QuicClientBuilder { - /// Specify how to bind interfaces. + /// Specify how client bind interfaces. /// /// The given closure will be used by [`Self::bind`], /// and/or [`QuicClient::connect`] if no interface bound when client built. @@ -289,21 +316,26 @@ impl QuicClientBuilder { } } - /// Bind the interfaces on given address. + /// Create quic interfaces bound on given address. + /// + /// If the bind failed, the error will be returned immediately. /// /// The default quic interface is [`Usc`] that support GSO and GRO. - /// You can also Specify how to bind the interface by calling [`Self::with_iface_binder`]. + /// You can let the client bind custom interfaces by calling the [`Self::with_iface_binder`] method. /// - /// Or you can use your own created interface by calling [`Self::with_interfaces`]. + /// If you dont bind any address, each time the client initiates a new connection, + /// the client will use bind a new interface on address and port that dynamic assigned by the system. + /// + /// To know more about how the client selects the interface when initiates a new connection, + /// read [`QuicClient::connect`]. /// /// If you call this multiple times, only the last set of interface will be used, /// previous bound interface will be freed immediately. /// - /// If you dont bind any address, each time the client initiates a new connection, - /// the client will use bind new interface on address and port that dynamic assigned by the system. + /// If the interface is closed for some reason after being created (meaning [`QuicInterface::poll_recv`] + /// returns an error), only the log will be printed. /// - /// To know more about how the client selects the socket address when initiates a new connection, - /// read [`QuicClient::connect`]. + /// If all interfaces are closed, clients will no longer be able to initiate new connections. pub fn bind(self, addrs: impl ToSocketAddrs) -> io::Result { for entry in self.bind_interfaces.iter() { Interfaces::del(*entry.key(), entry.value()); @@ -334,9 +366,7 @@ impl QuicClientBuilder { while let Some((result, local_addr)) = iface_recv_tasks.next().await { let error = match result { // Ok(result) => result.into_err(), - Ok(result) => match result { - Err(error) => error, - }, + Ok(error) => error, Err(join_error) if join_error.is_cancelled() => return, Err(join_error) => join_error.into(), }; @@ -351,58 +381,24 @@ impl QuicClientBuilder { Ok(self) } - /// Specify the Interface that client use. - /// - /// The client will use the given interfaces to initiates connections. - /// - /// If you call this multiple times, only the last set of interfaces will be used. - /// - /// This method will return [`io::Error`] if any of the interfaces failed to get its local addresses - /// (by calling [`QuicInterface::local_addr`]). - /// - /// If the given iterator is empty, each time the client initiates a new connection, - /// the client will use bind new interface on address and port that dynamic assigned by the system. - pub fn with_interfaces( - mut self, - interfaces: impl IntoIterator>, - ) -> io::Result { - for entry in self.bind_interfaces.iter() { - Interfaces::del(*entry.key(), entry.value()); - } - self.bind_interfaces = interfaces.into_iter().try_fold( - Arc::new(DashMap::new()), - |bind_interfaces, interface| { - let local_addr = interface.local_addr()?; - bind_interfaces.insert(local_addr, interface); - io::Result::Ok(bind_interfaces) - }, - )?; - self.bind_interfaces.clear(); - Ok(self) - } - - /// (WIP)Enable efficiently reuse connections. + /// Enable efficiently reuse connections. /// - /// If you enable this option, the client will try to reuse the connection that has already connected to the server, - /// this means that the client will not initiates a new connection, but return the existing connection when you call - /// [`QuicClient::connect`]. + /// If you enable this option the client will give priority to returning the existing connection to the `server_name` + /// and `server_addr`, instead of creating a new connection every time. pub fn reuse_connection(mut self) -> Self { self.reuse_connection = true; self } - /// Enable reuse UDP sockets. - /// + /// Enable reuse interface. /// - /// By default, the client will not use the same address as other connections, which means that the client must bind - /// to a new address every time it initiates a connection. If you enable this option, the client cloud share the same - /// address with other connections. This option can only determine the behavior of this client when establishing a - /// new connection. + /// If you dont bind any address, this option will not take effect. /// - /// If you dont bind any address, this option will not take effect because the client will use the address and port - /// that dynamic assigned by the system each time it initiates a new connection. - pub fn reuse_udp_sockets(mut self) -> Self { - self.reuse_udp_sockets = true; + /// By default, the client will not use the same interface with other connections, which means that the client must + /// select a new interface every time it initiates a connection. If you enable this option, the client will share + /// the same address between connections. + pub fn reuse_interfaces(mut self) -> Self { + self.reuse_interfaces = true; self } @@ -481,7 +477,7 @@ impl QuicClientBuilder> { ) -> QuicClientBuilder> { QuicClientBuilder { bind_interfaces: self.bind_interfaces, - reuse_udp_sockets: self.reuse_udp_sockets, + reuse_interfaces: self.reuse_interfaces, reuse_connection: self.reuse_connection, enable_happy_eyepballs: self.enable_happy_eyepballs, prefer_versions: self.prefer_versions, @@ -503,7 +499,7 @@ impl QuicClientBuilder> { ) -> QuicClientBuilder> { QuicClientBuilder { bind_interfaces: self.bind_interfaces, - reuse_udp_sockets: self.reuse_udp_sockets, + reuse_interfaces: self.reuse_interfaces, reuse_connection: self.reuse_connection, enable_happy_eyepballs: self.enable_happy_eyepballs, prefer_versions: self.prefer_versions, @@ -529,7 +525,7 @@ impl QuicClientBuilder> { ) -> QuicClientBuilder { QuicClientBuilder { bind_interfaces: self.bind_interfaces, - reuse_udp_sockets: self.reuse_udp_sockets, + reuse_interfaces: self.reuse_interfaces, reuse_connection: self.reuse_connection, enable_happy_eyepballs: self.enable_happy_eyepballs, prefer_versions: self.prefer_versions, @@ -549,7 +545,7 @@ impl QuicClientBuilder> { pub fn without_cert(self) -> QuicClientBuilder { QuicClientBuilder { bind_interfaces: self.bind_interfaces, - reuse_udp_sockets: self.reuse_udp_sockets, + reuse_interfaces: self.reuse_interfaces, reuse_connection: self.reuse_connection, enable_happy_eyepballs: self.enable_happy_eyepballs, prefer_versions: self.prefer_versions, @@ -569,7 +565,7 @@ impl QuicClientBuilder> { ) -> QuicClientBuilder { QuicClientBuilder { bind_interfaces: self.bind_interfaces, - reuse_udp_sockets: self.reuse_udp_sockets, + reuse_interfaces: self.reuse_interfaces, reuse_connection: self.reuse_connection, enable_happy_eyepballs: self.enable_happy_eyepballs, prefer_versions: self.prefer_versions, @@ -611,8 +607,6 @@ impl QuicClientBuilder { } /// Build the QuicClient, ready to initiates connect to the servers. - /// - /// If pub fn build(self) -> QuicClient { let bind_interfaces = if self.bind_interfaces.is_empty() { None @@ -621,8 +615,8 @@ impl QuicClientBuilder { }; QuicClient { bind_interfaces, - reuse_udp_sockets: self.reuse_udp_sockets, - _reuse_connection: self.reuse_connection, + reuse_interfaces: self.reuse_interfaces, + reuse_connection: self.reuse_connection, _enable_happy_eyepballs: self.enable_happy_eyepballs, _prefer_versions: self.prefer_versions, quic_iface_binder: self.quic_iface_binder, diff --git a/gm-quic/src/interfaces.rs b/gm-quic/src/interfaces.rs index 622b06c8..aa7e2923 100644 --- a/gm-quic/src/interfaces.rs +++ b/gm-quic/src/interfaces.rs @@ -1,5 +1,4 @@ use std::{ - convert::Infallible, io, net::SocketAddr, sync::{Arc, LazyLock}, @@ -16,9 +15,7 @@ static INTERFACES: LazyLock> = LazyLock::new(De pub enum Interfaces {} impl Interfaces { - pub fn add( - quic_iface: Arc, - ) -> io::Result>> { + pub fn add(quic_iface: Arc) -> io::Result> { let local_addr = quic_iface.local_addr()?; let entry = INTERFACES.entry(local_addr); if let dashmap::Entry::Occupied(..) = &entry { diff --git a/gm-quic/src/server.rs b/gm-quic/src/server.rs index d6a4c184..e02b2f40 100644 --- a/gm-quic/src/server.rs +++ b/gm-quic/src/server.rs @@ -201,7 +201,7 @@ impl QuicServer { }); } - fn close(&self) { + fn shutdown(&self) { if self.listener.close().is_none() { // already closed return; @@ -215,7 +215,7 @@ impl QuicServer { impl Drop for QuicServer { fn drop(&mut self) { - self.close(); + self.shutdown(); } } @@ -577,16 +577,13 @@ impl QuicServerBuilder { async move { let (result, iface_idx, _) = futures::future::select_all(iface_recv_tasks).await; let error = match result { - // Ok(result) => result.into_err(), - Ok(result) => match result { - Err(error) => error, - }, + Ok(error) => error, Err(join_error) if join_error.is_cancelled() => return, Err(join_error) => join_error.into(), }; let local_addr = local_addrs[iface_idx]; log::error!("interface on {local_addr} that server listened was closed unexpectedly: {error}"); - server.close(); + server.shutdown(); } }); @@ -611,26 +608,26 @@ impl QuicServerSniBuilder { /// Once listen is called, the server will start to accept incoming connections, do the handshake automatically, and /// you can get the incoming connection by calling the [`QuicServer::accept`] method. /// - /// Note that there can be only one server running at the same time, so this method will return an error if there is - /// already a server running. + /// ### Bind interfaces /// - /// When the `QuicServer` is dropped, the server will stop listening for incoming connections, and you can start a - /// new server by calling the [`QuicServerBuilder::listen`] method again. + /// All addresses need to be successfully bound before the Server will start. + /// Errors occurring in the binding address will be returned immediately /// - /// ## If the passive listening is not enabled + /// After the server is started, if any interface that the server is listening to is closed (meaning + /// [`QuicInterface::poll_recv`] returns an error), the Server will be shut down immediately and free all bound + /// interfaces, the [`QuicServer::accept`] method will return [`Err`]. /// - /// This method will try to bind all of the given addresses. The server will *only* accept connections from the given - /// addresses that successfully bound. + /// ### Server is unique /// - /// If all given addresses are failed to bind, this method will return an error. + /// If the passive listening is enabled, the server will accept connections from all address that gm-quic has + /// already bound to, such as those used by other local clients to connect to the remote server. /// - /// ## If the passive listening is enabled + /// Note that there can be only one server running at the same time, so this method will return an error if there is + /// already a server running. /// - /// This method will also attempt to bind to the given address, but the server will accept connections from *all* - /// addresses that gm-quic has already bound to, such as those used by other local clients to connect to the remote - /// server. + /// When the [`QuicServer`] is dropped, the server will be shut down immediately and free all bound interfaces, + /// and you can start a new server by calling the [`QuicServerBuilder::listen`] method again. /// - /// Although all given addresses are failed to bind, the server can still accept connections from other addresses. pub fn listen(self, addresses: impl ToSocketAddrs) -> io::Result> { let mut server = SERVER.write().unwrap(); if server.strong_count() != 0 { @@ -680,16 +677,13 @@ impl QuicServerSniBuilder { async move { let (result, iface_idx, _) = futures::future::select_all(iface_recv_tasks).await; let error = match result { - // Ok(result) => result.into_err(), - Ok(result) => match result { - Err(error) => error, - }, + Ok(error) => error, Err(join_error) if join_error.is_cancelled() => return, Err(join_error) => join_error.into(), }; let local_addr = local_addrs[iface_idx]; log::error!("interface on {local_addr} that server listened was closed unexpectedly: {error}"); - server.close(); + server.shutdown(); } }); diff --git a/qinterface/src/router.rs b/qinterface/src/router.rs index e55b6541..aee237ae 100644 --- a/qinterface/src/router.rs +++ b/qinterface/src/router.rs @@ -1,4 +1,4 @@ -use std::{convert::Infallible, fmt, io, net::SocketAddr, sync::Arc}; +use std::{fmt, io, net::SocketAddr, sync::Arc}; use dashmap::DashMap; use qbase::{ @@ -100,7 +100,7 @@ impl QuicProto { self: &Arc, local_addr: SocketAddr, interface: Arc, - ) -> JoinHandle> { + ) -> JoinHandle { let entry = self .interfaces .entry(local_addr) @@ -111,13 +111,17 @@ impl QuicProto { let mut rcvd_pkts = Vec::with_capacity(3); loop { // way: local -> peer - let (datagram, pathway, socket) = core::future::poll_fn(|cx| { + let (datagram, pathway, socket) = match core::future::poll_fn(|cx| { let interface = this.interfaces.get(&local_addr).ok_or_else(|| { io::Error::new(io::ErrorKind::BrokenPipe, "interface already be removed") })?; interface.inner.poll_recv(cx) }) - .await?; + .await + { + Ok(t) => t, + Err(e) => return e, + }; let datagram_size = datagram.len(); // todo: parse packets with any length of dcid rcvd_pkts.extend(PacketReader::new(datagram, 8).flatten());