diff --git a/configs/server.json b/configs/server.json index c778f0ce2..72a64a39b 100644 --- a/configs/server.json +++ b/configs/server.json @@ -86,12 +86,13 @@ "password": "iggy123" }, "socket": { + "override_defaults": false, "ipv6": false, - "recv_buffer_size": 102400, - "send_buffer_size": 102400, + "recv_buffer_size": "100 KB", + "send_buffer_size": "100 KB", "keepalive": false, "nodelay": false, - "linger": "100 ms" + "linger": "0 s" } }, "quic": { diff --git a/configs/server.toml b/configs/server.toml index 46aea5bec..f533f66cb 100644 --- a/configs/server.toml +++ b/configs/server.toml @@ -182,25 +182,27 @@ password = "iggy123" # Configuration for the TCP socket [tcp.socket] +# Whether to overwrite the OS-default socket parameters +override_defaults = false + # Whether to use an ipv4 or ipv6 socket ipv6 = false -# Optional, SO_RCVBUF: maximum size in bytes of the receive buffer, can be clamped by the OS -recv_buffer_size = 102400 +# SO_RCVBUF: maximum size of the receive buffer, can be clamped by the OS +recv_buffer_size = "100 KB" -# Optional, SO_SNDBUF: maximum size in bytes of the send buffer, can be clamped by the OS -send_buffer_size = 102400 +# SO_SNDBUF: maximum size of the send buffer, can be clamped by the OS +send_buffer_size = "100 KB" -# Optional, SO_KEEPALIVE: whether to regularly send a keepalive packet maintaining the connection +# SO_KEEPALIVE: whether to regularly send a keepalive packet maintaining the connection keepalive = false -# Optional, TCP_NODELAY: enable/disable the Nagle algorithm which buffers data before sending -# segments +# TCP_NODELAY: enable/disable the Nagle algorithm which buffers data before sending segments nodelay = false -# Optional, SO_LINGER: delay to wait for while data is being transmitted before closing the socket -# after a close or shutdown call has been received -linger = "100 ms" +# SO_LINGER: delay to wait for while data is being transmitted before closing the socket after a +# close or shutdown call has been received +linger = "0 s" # QUIC protocol configuration. [quic] diff --git a/integration/src/test_server.rs b/integration/src/test_server.rs index adbf1c888..266b5fce5 100644 --- a/integration/src/test_server.rs +++ b/integration/src/test_server.rs @@ -25,12 +25,15 @@ use server::configs::config_provider::{ConfigProvider, FileConfigProvider}; pub const SYSTEM_PATH_ENV_VAR: &str = "IGGY_SYSTEM_PATH"; pub const TEST_VERBOSITY_ENV_VAR: &str = "IGGY_TEST_VERBOSE"; +pub const OVERRIDE_DEFAULTS_ENV_VAR: &str = "IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS"; +pub const IPV6_ENV_VAR: &str = "IGGY_TCP_SOCKET_IPV6"; const USER_PASSWORD: &str = "secret"; const SLEEP_INTERVAL_MS: u64 = 20; const LOCAL_DATA_PREFIX: &str = "local_data_"; const MAX_PORT_WAIT_DURATION_S: u64 = 60; +#[derive(PartialEq)] pub enum IpAddrKind { V4, V6, @@ -91,6 +94,11 @@ impl TestServer { } } + if ip_kind == IpAddrKind::V6 { + envs.insert(OVERRIDE_DEFAULTS_ENV_VAR.to_string(), "true".to_string()); + envs.insert(IPV6_ENV_VAR.to_string(), "true".to_string()); + } + // If IGGY_SYSTEM_PATH is not set, use a random path starting with "local_data_" let local_data_path = if let Some(system_path) = envs.get(SYSTEM_PATH_ENV_VAR) { system_path.to_string() diff --git a/sdk/src/client.rs b/sdk/src/client.rs index 9a368faa3..72ec0f7d4 100644 --- a/sdk/src/client.rs +++ b/sdk/src/client.rs @@ -444,13 +444,13 @@ impl ConnectionString { } let connection_string = connection_string.replace(CONNECTION_STRING_PREFIX, ""); - let parts = connection_string.split("@").collect::>(); + let parts = connection_string.split('@').collect::>(); if parts.len() != 2 { return Err(IggyError::InvalidConnectionString); } - let credentials = parts[0].split(":").collect::>(); + let credentials = parts[0].split(':').collect::>(); if credentials.len() != 2 { return Err(IggyError::InvalidConnectionString); } @@ -461,7 +461,7 @@ impl ConnectionString { return Err(IggyError::InvalidConnectionString); } - let server_and_options = parts[1].split("?").collect::>(); + let server_and_options = parts[1].split('?').collect::>(); if server_and_options.len() > 2 { return Err(IggyError::InvalidConnectionString); } @@ -471,11 +471,11 @@ impl ConnectionString { return Err(IggyError::InvalidConnectionString); } - if !server_address.contains(":") { + if !server_address.contains(':') { return Err(IggyError::InvalidConnectionString); } - let port = server_address.split(":").collect::>()[1]; + let port = server_address.split(':').collect::>()[1]; if port.is_empty() { return Err(IggyError::InvalidConnectionString); } @@ -502,7 +502,7 @@ impl ConnectionString { } fn parse_options(options: &str) -> Result { - let options = options.split("&").collect::>(); + let options = options.split('&').collect::>(); let mut tls_enabled = false; let mut tls_domain = "localhost".to_string(); let mut reconnection_retries = "unlimited".to_owned(); @@ -511,7 +511,7 @@ impl ConnectionString { let mut heartbeat_interval = "5s".to_owned(); for option in options { - let option_parts = option.split("=").collect::>(); + let option_parts = option.split('=').collect::>(); if option_parts.len() != 2 { return Err(IggyError::InvalidConnectionString); } diff --git a/sdk/src/clients/consumer.rs b/sdk/src/clients/consumer.rs index 02ffcf30a..cf99cf407 100644 --- a/sdk/src/clients/consumer.rs +++ b/sdk/src/clients/consumer.rs @@ -259,6 +259,7 @@ impl IggyConsumer { offset, &last_stored_offsets, ) + .await } }); diff --git a/server/src/configs/defaults.rs b/server/src/configs/defaults.rs index 6683fbd56..f300306f4 100644 --- a/server/src/configs/defaults.rs +++ b/server/src/configs/defaults.rs @@ -1,3 +1,6 @@ +use iggy::utils::byte_size::IggyByteSize; +use iggy::utils::duration::IggyDuration; + use crate::configs::http::{ HttpConfig, HttpCorsConfig, HttpJwtConfig, HttpMetricsConfig, HttpTlsConfig, }; @@ -15,6 +18,7 @@ use crate::configs::system::{ }; use crate::configs::tcp::{TcpConfig, TcpTlsConfig}; use std::sync::Arc; +use std::time::Duration; use super::tcp::TcpSocketConfig; @@ -140,12 +144,13 @@ impl Default for TcpTlsConfig { impl Default for TcpSocketConfig { fn default() -> TcpSocketConfig { TcpSocketConfig { + override_defaults: false, ipv6: false, - recv_buffer_size: None, - send_buffer_size: None, - keepalive: None, - nodelay: None, - linger: None, + recv_buffer_size: IggyByteSize::from(100_000_u64), + send_buffer_size: IggyByteSize::from(100_000_u64), + keepalive: false, + nodelay: false, + linger: IggyDuration::new(Duration::new(0, 0)), } } } diff --git a/server/src/configs/tcp.rs b/server/src/configs/tcp.rs index d88f4854a..246eb687f 100644 --- a/server/src/configs/tcp.rs +++ b/server/src/configs/tcp.rs @@ -1,5 +1,7 @@ -use iggy::utils::duration::IggyDuration; +use iggy::utils::{byte_size::IggyByteSize, duration::IggyDuration}; use serde::{Deserialize, Serialize}; +use serde_with::serde_as; +use serde_with::DisplayFromStr; #[derive(Debug, Deserialize, Serialize, Clone)] pub struct TcpConfig { @@ -16,12 +18,15 @@ pub struct TcpTlsConfig { pub password: String, } +#[serde_as] #[derive(Debug, Deserialize, Serialize, Clone)] pub struct TcpSocketConfig { + pub override_defaults: bool, pub ipv6: bool, - pub recv_buffer_size: Option, - pub send_buffer_size: Option, - pub keepalive: Option, - pub nodelay: Option, - pub linger: Option, + pub recv_buffer_size: IggyByteSize, + pub send_buffer_size: IggyByteSize, + pub keepalive: bool, + pub nodelay: bool, + #[serde_as(as = "DisplayFromStr")] + pub linger: IggyDuration, } diff --git a/server/src/tcp/tcp_socket.rs b/server/src/tcp/tcp_socket.rs index 85221941f..fe0b6980f 100644 --- a/server/src/tcp/tcp_socket.rs +++ b/server/src/tcp/tcp_socket.rs @@ -1,37 +1,39 @@ +use std::num::TryFromIntError; + use tokio::net::TcpSocket; use crate::configs::tcp::TcpSocketConfig; pub fn build(config: TcpSocketConfig) -> TcpSocket { - let socket = if config.ipv6 { + let socket = if config.override_defaults && config.ipv6 { TcpSocket::new_v6().expect("Unable to create an ipv6 socket") } else { TcpSocket::new_v4().expect("Unable to create an ipv4 socket") }; - if let Some(size) = config.recv_buffer_size { - socket - .set_recv_buffer_size(size) + if config.override_defaults { + config + .recv_buffer_size + .as_bytes_u64() + .try_into() + .map_err(|e: TryFromIntError| std::io::Error::other(e.to_string())) + .and_then(|size: u32| socket.set_recv_buffer_size(size)) .expect("Unable to set SO_RCVBUF on socket"); - } - if let Some(size) = config.send_buffer_size { - socket - .set_send_buffer_size(size) + config + .send_buffer_size + .as_bytes_u64() + .try_into() + .map_err(|e: TryFromIntError| std::io::Error::other(e.to_string())) + .and_then(|size| socket.set_send_buffer_size(size)) .expect("Unable to set SO_SNDBUF on socket"); - } - if let Some(keepalive) = config.keepalive { socket - .set_keepalive(keepalive) + .set_keepalive(config.keepalive) .expect("Unable to set SO_KEEPALIVE on socket"); - } - if let Some(nodelay) = config.nodelay { socket - .set_nodelay(nodelay) + .set_nodelay(config.nodelay) .expect("Unable to set TCP_NODELAY on socket"); - } - if let Some(linger) = config.linger { socket - .set_linger(Some(linger.get_duration())) + .set_linger(Some(config.linger.get_duration())) .expect("Unable to set SO_LINGER on socket"); } @@ -42,85 +44,28 @@ pub fn build(config: TcpSocketConfig) -> TcpSocket { mod tests { use std::time::Duration; - use iggy::utils::duration::IggyDuration; + use iggy::utils::{byte_size::IggyByteSize, duration::IggyDuration}; use super::*; #[test] - fn given_recv_config_socket_should_be_configured() { - let recv_buffer_size = 425984; - let config = TcpSocketConfig { - ipv6: true, - recv_buffer_size: Some(recv_buffer_size), - send_buffer_size: None, - keepalive: None, - nodelay: None, - linger: None, - }; - let socket = build(config); - let recv = socket.recv_buffer_size().unwrap(); - assert_eq!(recv, recv_buffer_size); - } - - #[test] - fn given_send_config_socket_should_be_configured() { - let send_buffer_size = 425984; - let config = TcpSocketConfig { - ipv6: true, - recv_buffer_size: None, - send_buffer_size: Some(send_buffer_size), - keepalive: None, - nodelay: None, - linger: None, - }; - let socket = build(config); - let send = socket.send_buffer_size().unwrap(); - assert_eq!(send, send_buffer_size); - } - - #[test] - fn given_keepalive_config_socket_should_be_configured() { - let config = TcpSocketConfig { - ipv6: true, - recv_buffer_size: None, - send_buffer_size: None, - keepalive: Some(true), - nodelay: None, - linger: None, - }; - let socket = build(config); - let keepalive = socket.keepalive().unwrap(); - assert!(keepalive); - } - - #[test] - fn given_nodelay_config_socket_should_be_configured() { - let config = TcpSocketConfig { - ipv6: true, - recv_buffer_size: None, - send_buffer_size: None, - keepalive: None, - nodelay: Some(true), - linger: None, - }; - let socket = build(config); - let nodelay = socket.nodelay().unwrap(); - assert!(nodelay); - } - - #[test] - fn given_linger_config_socket_should_be_configured() { + fn given_override_defaults_socket_should_be_configured() { + let buffer_size = 425984; let linger_dur = Duration::new(1, 0); let config = TcpSocketConfig { + override_defaults: true, ipv6: true, - recv_buffer_size: None, - send_buffer_size: None, - keepalive: None, - nodelay: None, - linger: Some(IggyDuration::new(linger_dur)), + recv_buffer_size: IggyByteSize::from(buffer_size), + send_buffer_size: IggyByteSize::from(buffer_size), + keepalive: true, + nodelay: true, + linger: IggyDuration::new(linger_dur), }; let socket = build(config); - let linger = socket.linger().unwrap(); - assert_eq!(linger, Some(linger_dur)); + assert_eq!(socket.recv_buffer_size().unwrap(), buffer_size as u32); + assert_eq!(socket.send_buffer_size().unwrap(), buffer_size as u32); + assert!(socket.keepalive().unwrap()); + assert!(socket.nodelay().unwrap()); + assert_eq!(socket.linger().unwrap(), Some(linger_dur)); } }