From f68b5465ab411484d49afce9ba79e21cedd49c19 Mon Sep 17 00:00:00 2001 From: benfradet Date: Sat, 2 Nov 2024 21:53:03 +0100 Subject: [PATCH] Expose TCP parameters #31 --- Cargo.lock | 4 +- configs/server.json | 9 ++++ configs/server.toml | 24 ++++++++++ integration/src/test_server.rs | 6 +++ sdk/Cargo.toml | 2 +- sdk/src/client.rs | 14 +++--- sdk/src/clients/consumer.rs | 1 + server/Cargo.toml | 2 +- server/src/configs/defaults.rs | 21 +++++++++ server/src/configs/tcp.rs | 17 ++++++++ server/src/tcp/mod.rs | 1 + server/src/tcp/tcp_listener.rs | 17 +++++--- server/src/tcp/tcp_server.rs | 7 +-- server/src/tcp/tcp_socket.rs | 70 ++++++++++++++++++++++++++++++ server/src/tcp/tcp_tls_listener.rs | 22 ++++++++-- 15 files changed, 194 insertions(+), 23 deletions(-) create mode 100644 server/src/tcp/tcp_socket.rs diff --git a/Cargo.lock b/Cargo.lock index cf2683db0..6c0b1f967 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2095,7 +2095,7 @@ dependencies = [ [[package]] name = "iggy" -version = "0.6.32" +version = "0.6.33" dependencies = [ "aes-gcm", "ahash 0.8.11", @@ -4098,7 +4098,7 @@ dependencies = [ [[package]] name = "server" -version = "0.4.61" +version = "0.4.62" dependencies = [ "ahash 0.8.11", "anyhow", diff --git a/configs/server.json b/configs/server.json index a7bade19f..9677a29ce 100644 --- a/configs/server.json +++ b/configs/server.json @@ -80,10 +80,19 @@ "tcp": { "enabled": true, "address": "0.0.0.0:8090", + "ipv6": false, "tls": { "enabled": false, "certificate": "certs/iggy.pfx", "password": "iggy123" + }, + "socket": { + "override_defaults": false, + "recv_buffer_size": "100 KB", + "send_buffer_size": "100 KB", + "keepalive": false, + "nodelay": false, + "linger": "0 s" } }, "quic": { diff --git a/configs/server.toml b/configs/server.toml index 5fe2120da..1338cfaeb 100644 --- a/configs/server.toml +++ b/configs/server.toml @@ -167,6 +167,9 @@ enabled = true # For example, "0.0.0.0:8090" listens on all network interfaces on port 8090. address = "0.0.0.0:8090" +# Whether to use ipv4 or ipv6 +ipv6 = false + # TLS configuration for the TCP server. [tcp.tls] # Enables or disables TLS for TCP connections. @@ -180,6 +183,27 @@ certificate = "certs/iggy.pfx" # Password for the TLS certificate, required for accessing the private key. password = "iggy123" +# Configuration for the TCP socket +[tcp.socket] +# Whether to overwrite the OS-default socket parameters +override_defaults = false + +# SO_RCVBUF: maximum size of the receive buffer, can be clamped by the OS +recv_buffer_size = "100 KB" + +# SO_SNDBUF: maximum size of the send buffer, can be clamped by the OS +send_buffer_size = "100 KB" + +# SO_KEEPALIVE: whether to regularly send a keepalive packet maintaining the connection +keepalive = false + +# TCP_NODELAY: enable/disable the Nagle algorithm which buffers data before sending segments +nodelay = false + +# SO_LINGER: delay to wait for while data is being transmitted before closing the socket after a +# close or shutdown call has been received +linger = "0 s" + # QUIC protocol configuration. [quic] # Controls whether the QUIC server is enabled. diff --git a/integration/src/test_server.rs b/integration/src/test_server.rs index adbf1c888..0a82baa03 100644 --- a/integration/src/test_server.rs +++ b/integration/src/test_server.rs @@ -25,12 +25,14 @@ use server::configs::config_provider::{ConfigProvider, FileConfigProvider}; pub const SYSTEM_PATH_ENV_VAR: &str = "IGGY_SYSTEM_PATH"; pub const TEST_VERBOSITY_ENV_VAR: &str = "IGGY_TEST_VERBOSE"; +pub const IPV6_ENV_VAR: &str = "IGGY_TCP_IPV6"; const USER_PASSWORD: &str = "secret"; const SLEEP_INTERVAL_MS: u64 = 20; const LOCAL_DATA_PREFIX: &str = "local_data_"; const MAX_PORT_WAIT_DURATION_S: u64 = 60; +#[derive(PartialEq)] pub enum IpAddrKind { V4, V6, @@ -91,6 +93,10 @@ impl TestServer { } } + if ip_kind == IpAddrKind::V6 { + envs.insert(IPV6_ENV_VAR.to_string(), "true".to_string()); + } + // If IGGY_SYSTEM_PATH is not set, use a random path starting with "local_data_" let local_data_path = if let Some(system_path) = envs.get(SYSTEM_PATH_ENV_VAR) { system_path.to_string() diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index 8cf7a4063..0cb2ecf21 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iggy" -version = "0.6.32" +version = "0.6.33" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2021" license = "MIT" diff --git a/sdk/src/client.rs b/sdk/src/client.rs index 96afbaf0d..5c5bf5ba7 100644 --- a/sdk/src/client.rs +++ b/sdk/src/client.rs @@ -454,13 +454,13 @@ impl ConnectionString { } let connection_string = connection_string.replace(CONNECTION_STRING_PREFIX, ""); - let parts = connection_string.split("@").collect::>(); + let parts = connection_string.split('@').collect::>(); if parts.len() != 2 { return Err(IggyError::InvalidConnectionString); } - let credentials = parts[0].split(":").collect::>(); + let credentials = parts[0].split(':').collect::>(); if credentials.len() != 2 { return Err(IggyError::InvalidConnectionString); } @@ -471,7 +471,7 @@ impl ConnectionString { return Err(IggyError::InvalidConnectionString); } - let server_and_options = parts[1].split("?").collect::>(); + let server_and_options = parts[1].split('?').collect::>(); if server_and_options.len() > 2 { return Err(IggyError::InvalidConnectionString); } @@ -481,11 +481,11 @@ impl ConnectionString { return Err(IggyError::InvalidConnectionString); } - if !server_address.contains(":") { + if !server_address.contains(':') { return Err(IggyError::InvalidConnectionString); } - let port = server_address.split(":").collect::>()[1]; + let port = server_address.split(':').collect::>()[1]; if port.is_empty() { return Err(IggyError::InvalidConnectionString); } @@ -512,7 +512,7 @@ impl ConnectionString { } fn parse_options(options: &str) -> Result { - let options = options.split("&").collect::>(); + let options = options.split('&').collect::>(); let mut tls_enabled = false; let mut tls_domain = "localhost".to_string(); let mut reconnection_retries = "unlimited".to_owned(); @@ -521,7 +521,7 @@ impl ConnectionString { let mut heartbeat_interval = "5s".to_owned(); for option in options { - let option_parts = option.split("=").collect::>(); + let option_parts = option.split('=').collect::>(); if option_parts.len() != 2 { return Err(IggyError::InvalidConnectionString); } diff --git a/sdk/src/clients/consumer.rs b/sdk/src/clients/consumer.rs index 02ffcf30a..cf99cf407 100644 --- a/sdk/src/clients/consumer.rs +++ b/sdk/src/clients/consumer.rs @@ -259,6 +259,7 @@ impl IggyConsumer { offset, &last_stored_offsets, ) + .await } }); diff --git a/server/Cargo.toml b/server/Cargo.toml index 1e530c0fd..c449bb10e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.4.61" +version = "0.4.62" edition = "2021" build = "src/build.rs" diff --git a/server/src/configs/defaults.rs b/server/src/configs/defaults.rs index 43728435b..46e72538e 100644 --- a/server/src/configs/defaults.rs +++ b/server/src/configs/defaults.rs @@ -1,3 +1,6 @@ +use iggy::utils::byte_size::IggyByteSize; +use iggy::utils::duration::IggyDuration; + use crate::configs::http::{ HttpConfig, HttpCorsConfig, HttpJwtConfig, HttpMetricsConfig, HttpTlsConfig, }; @@ -15,6 +18,9 @@ use crate::configs::system::{ }; use crate::configs::tcp::{TcpConfig, TcpTlsConfig}; use std::sync::Arc; +use std::time::Duration; + +use super::tcp::TcpSocketConfig; static_toml::static_toml! { // static_toml crate always starts from CARGO_MANIFEST_DIR (in this case iggy-server root directory) @@ -119,7 +125,9 @@ impl Default for TcpConfig { TcpConfig { enabled: SERVER_CONFIG.tcp.enabled, address: SERVER_CONFIG.tcp.address.parse().unwrap(), + ipv6: SERVER_CONFIG.tcp.ipv_6, tls: TcpTlsConfig::default(), + socket: TcpSocketConfig::default(), } } } @@ -134,6 +142,19 @@ impl Default for TcpTlsConfig { } } +impl Default for TcpSocketConfig { + fn default() -> TcpSocketConfig { + TcpSocketConfig { + override_defaults: false, + recv_buffer_size: IggyByteSize::from(100_000_u64), + send_buffer_size: IggyByteSize::from(100_000_u64), + keepalive: false, + nodelay: false, + linger: IggyDuration::new(Duration::new(0, 0)), + } + } +} + impl Default for HttpConfig { fn default() -> HttpConfig { HttpConfig { diff --git a/server/src/configs/tcp.rs b/server/src/configs/tcp.rs index 8b6d3a572..1b3883e99 100644 --- a/server/src/configs/tcp.rs +++ b/server/src/configs/tcp.rs @@ -1,10 +1,15 @@ +use iggy::utils::{byte_size::IggyByteSize, duration::IggyDuration}; use serde::{Deserialize, Serialize}; +use serde_with::serde_as; +use serde_with::DisplayFromStr; #[derive(Debug, Deserialize, Serialize, Clone)] pub struct TcpConfig { pub enabled: bool, pub address: String, + pub ipv6: bool, pub tls: TcpTlsConfig, + pub socket: TcpSocketConfig, } #[derive(Debug, Deserialize, Serialize, Clone)] @@ -13,3 +18,15 @@ pub struct TcpTlsConfig { pub certificate: String, pub password: String, } + +#[serde_as] +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct TcpSocketConfig { + pub override_defaults: bool, + pub recv_buffer_size: IggyByteSize, + pub send_buffer_size: IggyByteSize, + pub keepalive: bool, + pub nodelay: bool, + #[serde_as(as = "DisplayFromStr")] + pub linger: IggyDuration, +} diff --git a/server/src/tcp/mod.rs b/server/src/tcp/mod.rs index c2fa37049..4cbcdd201 100644 --- a/server/src/tcp/mod.rs +++ b/server/src/tcp/mod.rs @@ -3,5 +3,6 @@ pub mod sender; pub mod tcp_listener; mod tcp_sender; pub mod tcp_server; +mod tcp_socket; pub mod tcp_tls_listener; pub mod tcp_tls_sender; diff --git a/server/src/tcp/tcp_listener.rs b/server/src/tcp/tcp_listener.rs index 9b1c5ec81..5c8431579 100644 --- a/server/src/tcp/tcp_listener.rs +++ b/server/src/tcp/tcp_listener.rs @@ -4,17 +4,24 @@ use crate::tcp::connection_handler::{handle_connection, handle_error}; use crate::tcp::tcp_sender::TcpSender; use std::net::SocketAddr; use tokio::io::AsyncWriteExt; -use tokio::net::TcpListener; +use tokio::net::TcpSocket; use tokio::sync::oneshot; use tracing::{error, info}; -pub async fn start(address: &str, system: SharedSystem) -> SocketAddr { +pub async fn start(address: &str, socket: TcpSocket, system: SharedSystem) -> SocketAddr { let address = address.to_string(); let (tx, rx) = oneshot::channel(); tokio::spawn(async move { - let listener = TcpListener::bind(&address) - .await - .expect("Unable to start TCP server."); + let addr = address.parse(); + if addr.is_err() { + panic!("Unable to parse address {:?}", address); + } + + socket + .bind(addr.unwrap()) + .expect("Unable to bind socket to address"); + + let listener = socket.listen(1024).expect("Unable to start TCP server."); let local_addr = listener .local_addr() diff --git a/server/src/tcp/tcp_server.rs b/server/src/tcp/tcp_server.rs index 0f963f9b8..8d0470769 100644 --- a/server/src/tcp/tcp_server.rs +++ b/server/src/tcp/tcp_server.rs @@ -1,6 +1,6 @@ use crate::configs::tcp::TcpConfig; use crate::streaming::systems::system::SharedSystem; -use crate::tcp::{tcp_listener, tcp_tls_listener}; +use crate::tcp::{tcp_listener, tcp_socket, tcp_tls_listener}; use std::net::SocketAddr; use tracing::info; @@ -13,9 +13,10 @@ pub async fn start(config: TcpConfig, system: SharedSystem) -> SocketAddr { "Iggy TCP" }; info!("Initializing {server_name} server..."); + let socket = tcp_socket::build(config.ipv6, config.socket); let addr = match config.tls.enabled { - true => tcp_tls_listener::start(&config.address, config.tls, system).await, - false => tcp_listener::start(&config.address, system).await, + true => tcp_tls_listener::start(&config.address, config.tls, socket, system).await, + false => tcp_listener::start(&config.address, socket, system).await, }; info!("{server_name} server has started on: {:?}", addr); addr diff --git a/server/src/tcp/tcp_socket.rs b/server/src/tcp/tcp_socket.rs new file mode 100644 index 000000000..d1cc39298 --- /dev/null +++ b/server/src/tcp/tcp_socket.rs @@ -0,0 +1,70 @@ +use std::num::TryFromIntError; + +use tokio::net::TcpSocket; + +use crate::configs::tcp::TcpSocketConfig; + +pub fn build(ipv6: bool, config: TcpSocketConfig) -> TcpSocket { + let socket = if ipv6 { + TcpSocket::new_v6().expect("Unable to create an ipv6 socket") + } else { + TcpSocket::new_v4().expect("Unable to create an ipv4 socket") + }; + + if config.override_defaults { + config + .recv_buffer_size + .as_bytes_u64() + .try_into() + .map_err(|e: TryFromIntError| std::io::Error::other(e.to_string())) + .and_then(|size: u32| socket.set_recv_buffer_size(size)) + .expect("Unable to set SO_RCVBUF on socket"); + config + .send_buffer_size + .as_bytes_u64() + .try_into() + .map_err(|e: TryFromIntError| std::io::Error::other(e.to_string())) + .and_then(|size| socket.set_send_buffer_size(size)) + .expect("Unable to set SO_SNDBUF on socket"); + socket + .set_keepalive(config.keepalive) + .expect("Unable to set SO_KEEPALIVE on socket"); + socket + .set_nodelay(config.nodelay) + .expect("Unable to set TCP_NODELAY on socket"); + socket + .set_linger(Some(config.linger.get_duration())) + .expect("Unable to set SO_LINGER on socket"); + } + + socket +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use iggy::utils::{byte_size::IggyByteSize, duration::IggyDuration}; + + use super::*; + + #[test] + fn given_override_defaults_socket_should_be_configured() { + let buffer_size = 425984; + let linger_dur = Duration::new(1, 0); + let config = TcpSocketConfig { + override_defaults: true, + recv_buffer_size: IggyByteSize::from(buffer_size), + send_buffer_size: IggyByteSize::from(buffer_size), + keepalive: true, + nodelay: true, + linger: IggyDuration::new(linger_dur), + }; + let socket = build(false, config); + assert!(socket.recv_buffer_size().unwrap() >= buffer_size as u32); + assert!(socket.send_buffer_size().unwrap() >= buffer_size as u32); + assert!(socket.keepalive().unwrap()); + assert!(socket.nodelay().unwrap()); + assert_eq!(socket.linger().unwrap(), Some(linger_dur)); + } +} diff --git a/server/src/tcp/tcp_tls_listener.rs b/server/src/tcp/tcp_tls_listener.rs index 138e6f143..48eaf4bea 100644 --- a/server/src/tcp/tcp_tls_listener.rs +++ b/server/src/tcp/tcp_tls_listener.rs @@ -5,13 +5,18 @@ use crate::tcp::connection_handler::{handle_connection, handle_error}; use crate::tcp::tcp_tls_sender::TcpTlsSender; use std::net::SocketAddr; use tokio::io::AsyncWriteExt; -use tokio::net::TcpListener; +use tokio::net::TcpSocket; use tokio::sync::oneshot; use tokio_native_tls::native_tls; use tokio_native_tls::native_tls::Identity; use tracing::{error, info}; -pub(crate) async fn start(address: &str, config: TcpTlsConfig, system: SharedSystem) -> SocketAddr { +pub(crate) async fn start( + address: &str, + config: TcpTlsConfig, + socket: TcpSocket, + system: SharedSystem, +) -> SocketAddr { let address = address.to_string(); let (tx, rx) = oneshot::channel(); tokio::spawn(async move { @@ -31,8 +36,17 @@ pub(crate) async fn start(address: &str, config: TcpTlsConfig, system: SharedSys .unwrap(), ); - let listener = TcpListener::bind(&address) - .await + let addr = address.parse(); + if addr.is_err() { + panic!("Unable to parse address {:?}", address); + } + + socket + .bind(addr.unwrap()) + .expect("Unable to bind socket to address"); + + let listener = socket + .listen(1024) .expect("Unable to start TCP TLS server."); let local_addr = listener