diff --git a/configs/server.json b/configs/server.json index a7bade19f..c778f0ce2 100644 --- a/configs/server.json +++ b/configs/server.json @@ -84,6 +84,14 @@ "enabled": false, "certificate": "certs/iggy.pfx", "password": "iggy123" + }, + "socket": { + "ipv6": false, + "recv_buffer_size": 102400, + "send_buffer_size": 102400, + "keepalive": false, + "nodelay": false, + "linger": "100 ms" } }, "quic": { diff --git a/configs/server.toml b/configs/server.toml index 5fe2120da..46aea5bec 100644 --- a/configs/server.toml +++ b/configs/server.toml @@ -180,6 +180,28 @@ certificate = "certs/iggy.pfx" # Password for the TLS certificate, required for accessing the private key. password = "iggy123" +# Configuration for the TCP socket +[tcp.socket] +# Whether to use an ipv4 or ipv6 socket +ipv6 = false + +# Optional, SO_RCVBUF: maximum size in bytes of the receive buffer, can be clamped by the OS +recv_buffer_size = 102400 + +# Optional, SO_SNDBUF: maximum size in bytes of the send buffer, can be clamped by the OS +send_buffer_size = 102400 + +# Optional, SO_KEEPALIVE: whether to regularly send a keepalive packet maintaining the connection +keepalive = false + +# Optional, TCP_NODELAY: enable/disable the Nagle algorithm which buffers data before sending +# segments +nodelay = false + +# Optional, SO_LINGER: delay to wait for while data is being transmitted before closing the socket +# after a close or shutdown call has been received +linger = "100 ms" + # QUIC protocol configuration. [quic] # Controls whether the QUIC server is enabled. diff --git a/server/src/configs/defaults.rs b/server/src/configs/defaults.rs index 43728435b..6683fbd56 100644 --- a/server/src/configs/defaults.rs +++ b/server/src/configs/defaults.rs @@ -16,6 +16,8 @@ use crate::configs::system::{ use crate::configs::tcp::{TcpConfig, TcpTlsConfig}; use std::sync::Arc; +use super::tcp::TcpSocketConfig; + static_toml::static_toml! { // static_toml crate always starts from CARGO_MANIFEST_DIR (in this case iggy-server root directory) static SERVER_CONFIG = include_toml!("../configs/server.toml"); @@ -120,6 +122,7 @@ impl Default for TcpConfig { enabled: SERVER_CONFIG.tcp.enabled, address: SERVER_CONFIG.tcp.address.parse().unwrap(), tls: TcpTlsConfig::default(), + socket: TcpSocketConfig::default(), } } } @@ -134,6 +137,19 @@ impl Default for TcpTlsConfig { } } +impl Default for TcpSocketConfig { + fn default() -> TcpSocketConfig { + TcpSocketConfig { + ipv6: false, + recv_buffer_size: None, + send_buffer_size: None, + keepalive: None, + nodelay: None, + linger: None, + } + } +} + impl Default for HttpConfig { fn default() -> HttpConfig { HttpConfig { diff --git a/server/src/configs/tcp.rs b/server/src/configs/tcp.rs index 8b6d3a572..d88f4854a 100644 --- a/server/src/configs/tcp.rs +++ b/server/src/configs/tcp.rs @@ -1,3 +1,4 @@ +use iggy::utils::duration::IggyDuration; use serde::{Deserialize, Serialize}; #[derive(Debug, Deserialize, Serialize, Clone)] @@ -5,6 +6,7 @@ pub struct TcpConfig { pub enabled: bool, pub address: String, pub tls: TcpTlsConfig, + pub socket: TcpSocketConfig, } #[derive(Debug, Deserialize, Serialize, Clone)] @@ -13,3 +15,13 @@ pub struct TcpTlsConfig { pub certificate: String, pub password: String, } + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct TcpSocketConfig { + pub ipv6: bool, + pub recv_buffer_size: Option, + pub send_buffer_size: Option, + pub keepalive: Option, + pub nodelay: Option, + pub linger: Option, +} diff --git a/server/src/tcp/mod.rs b/server/src/tcp/mod.rs index c2fa37049..4cbcdd201 100644 --- a/server/src/tcp/mod.rs +++ b/server/src/tcp/mod.rs @@ -3,5 +3,6 @@ pub mod sender; pub mod tcp_listener; mod tcp_sender; pub mod tcp_server; +mod tcp_socket; pub mod tcp_tls_listener; pub mod tcp_tls_sender; diff --git a/server/src/tcp/tcp_listener.rs b/server/src/tcp/tcp_listener.rs index 9b1c5ec81..5c8431579 100644 --- a/server/src/tcp/tcp_listener.rs +++ b/server/src/tcp/tcp_listener.rs @@ -4,17 +4,24 @@ use crate::tcp::connection_handler::{handle_connection, handle_error}; use crate::tcp::tcp_sender::TcpSender; use std::net::SocketAddr; use tokio::io::AsyncWriteExt; -use tokio::net::TcpListener; +use tokio::net::TcpSocket; use tokio::sync::oneshot; use tracing::{error, info}; -pub async fn start(address: &str, system: SharedSystem) -> SocketAddr { +pub async fn start(address: &str, socket: TcpSocket, system: SharedSystem) -> SocketAddr { let address = address.to_string(); let (tx, rx) = oneshot::channel(); tokio::spawn(async move { - let listener = TcpListener::bind(&address) - .await - .expect("Unable to start TCP server."); + let addr = address.parse(); + if addr.is_err() { + panic!("Unable to parse address {:?}", address); + } + + socket + .bind(addr.unwrap()) + .expect("Unable to bind socket to address"); + + let listener = socket.listen(1024).expect("Unable to start TCP server."); let local_addr = listener .local_addr() diff --git a/server/src/tcp/tcp_server.rs b/server/src/tcp/tcp_server.rs index 0f963f9b8..6fafc9128 100644 --- a/server/src/tcp/tcp_server.rs +++ b/server/src/tcp/tcp_server.rs @@ -1,6 +1,6 @@ use crate::configs::tcp::TcpConfig; use crate::streaming::systems::system::SharedSystem; -use crate::tcp::{tcp_listener, tcp_tls_listener}; +use crate::tcp::{tcp_listener, tcp_socket, tcp_tls_listener}; use std::net::SocketAddr; use tracing::info; @@ -13,9 +13,10 @@ pub async fn start(config: TcpConfig, system: SharedSystem) -> SocketAddr { "Iggy TCP" }; info!("Initializing {server_name} server..."); + let socket = tcp_socket::build(config.socket); let addr = match config.tls.enabled { - true => tcp_tls_listener::start(&config.address, config.tls, system).await, - false => tcp_listener::start(&config.address, system).await, + true => tcp_tls_listener::start(&config.address, config.tls, socket, system).await, + false => tcp_listener::start(&config.address, socket, system).await, }; info!("{server_name} server has started on: {:?}", addr); addr diff --git a/server/src/tcp/tcp_socket.rs b/server/src/tcp/tcp_socket.rs new file mode 100644 index 000000000..85221941f --- /dev/null +++ b/server/src/tcp/tcp_socket.rs @@ -0,0 +1,126 @@ +use tokio::net::TcpSocket; + +use crate::configs::tcp::TcpSocketConfig; + +pub fn build(config: TcpSocketConfig) -> TcpSocket { + let socket = if config.ipv6 { + TcpSocket::new_v6().expect("Unable to create an ipv6 socket") + } else { + TcpSocket::new_v4().expect("Unable to create an ipv4 socket") + }; + + if let Some(size) = config.recv_buffer_size { + socket + .set_recv_buffer_size(size) + .expect("Unable to set SO_RCVBUF on socket"); + } + if let Some(size) = config.send_buffer_size { + socket + .set_send_buffer_size(size) + .expect("Unable to set SO_SNDBUF on socket"); + } + if let Some(keepalive) = config.keepalive { + socket + .set_keepalive(keepalive) + .expect("Unable to set SO_KEEPALIVE on socket"); + } + if let Some(nodelay) = config.nodelay { + socket + .set_nodelay(nodelay) + .expect("Unable to set TCP_NODELAY on socket"); + } + if let Some(linger) = config.linger { + socket + .set_linger(Some(linger.get_duration())) + .expect("Unable to set SO_LINGER on socket"); + } + + socket +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use iggy::utils::duration::IggyDuration; + + use super::*; + + #[test] + fn given_recv_config_socket_should_be_configured() { + let recv_buffer_size = 425984; + let config = TcpSocketConfig { + ipv6: true, + recv_buffer_size: Some(recv_buffer_size), + send_buffer_size: None, + keepalive: None, + nodelay: None, + linger: None, + }; + let socket = build(config); + let recv = socket.recv_buffer_size().unwrap(); + assert_eq!(recv, recv_buffer_size); + } + + #[test] + fn given_send_config_socket_should_be_configured() { + let send_buffer_size = 425984; + let config = TcpSocketConfig { + ipv6: true, + recv_buffer_size: None, + send_buffer_size: Some(send_buffer_size), + keepalive: None, + nodelay: None, + linger: None, + }; + let socket = build(config); + let send = socket.send_buffer_size().unwrap(); + assert_eq!(send, send_buffer_size); + } + + #[test] + fn given_keepalive_config_socket_should_be_configured() { + let config = TcpSocketConfig { + ipv6: true, + recv_buffer_size: None, + send_buffer_size: None, + keepalive: Some(true), + nodelay: None, + linger: None, + }; + let socket = build(config); + let keepalive = socket.keepalive().unwrap(); + assert!(keepalive); + } + + #[test] + fn given_nodelay_config_socket_should_be_configured() { + let config = TcpSocketConfig { + ipv6: true, + recv_buffer_size: None, + send_buffer_size: None, + keepalive: None, + nodelay: Some(true), + linger: None, + }; + let socket = build(config); + let nodelay = socket.nodelay().unwrap(); + assert!(nodelay); + } + + #[test] + fn given_linger_config_socket_should_be_configured() { + let linger_dur = Duration::new(1, 0); + let config = TcpSocketConfig { + ipv6: true, + recv_buffer_size: None, + send_buffer_size: None, + keepalive: None, + nodelay: None, + linger: Some(IggyDuration::new(linger_dur)), + }; + let socket = build(config); + let linger = socket.linger().unwrap(); + assert_eq!(linger, Some(linger_dur)); + } +} diff --git a/server/src/tcp/tcp_tls_listener.rs b/server/src/tcp/tcp_tls_listener.rs index 138e6f143..48eaf4bea 100644 --- a/server/src/tcp/tcp_tls_listener.rs +++ b/server/src/tcp/tcp_tls_listener.rs @@ -5,13 +5,18 @@ use crate::tcp::connection_handler::{handle_connection, handle_error}; use crate::tcp::tcp_tls_sender::TcpTlsSender; use std::net::SocketAddr; use tokio::io::AsyncWriteExt; -use tokio::net::TcpListener; +use tokio::net::TcpSocket; use tokio::sync::oneshot; use tokio_native_tls::native_tls; use tokio_native_tls::native_tls::Identity; use tracing::{error, info}; -pub(crate) async fn start(address: &str, config: TcpTlsConfig, system: SharedSystem) -> SocketAddr { +pub(crate) async fn start( + address: &str, + config: TcpTlsConfig, + socket: TcpSocket, + system: SharedSystem, +) -> SocketAddr { let address = address.to_string(); let (tx, rx) = oneshot::channel(); tokio::spawn(async move { @@ -31,8 +36,17 @@ pub(crate) async fn start(address: &str, config: TcpTlsConfig, system: SharedSys .unwrap(), ); - let listener = TcpListener::bind(&address) - .await + let addr = address.parse(); + if addr.is_err() { + panic!("Unable to parse address {:?}", address); + } + + socket + .bind(addr.unwrap()) + .expect("Unable to bind socket to address"); + + let listener = socket + .listen(1024) .expect("Unable to start TCP TLS server."); let local_addr = listener