Skip to content

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
BenFradet committed Nov 3, 2024
1 parent 4261709 commit 2668fde
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 119 deletions.
7 changes: 4 additions & 3 deletions configs/server.json
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,13 @@
"password": "iggy123"
},
"socket": {
"override_defaults": false,
"ipv6": false,
"recv_buffer_size": 102400,
"send_buffer_size": 102400,
"recv_buffer_size": "100 KB",
"send_buffer_size": "100 KB",
"keepalive": false,
"nodelay": false,
"linger": "100 ms"
"linger": "0 s"
}
},
"quic": {
Expand Down
22 changes: 12 additions & 10 deletions configs/server.toml
Original file line number Diff line number Diff line change
Expand Up @@ -182,25 +182,27 @@ password = "iggy123"

# Configuration for the TCP socket
[tcp.socket]
# Whether to overwrite the OS-default socket parameters
override_defaults = false

# Whether to use an ipv4 or ipv6 socket
ipv6 = false

# Optional, SO_RCVBUF: maximum size in bytes of the receive buffer, can be clamped by the OS
recv_buffer_size = 102400
# SO_RCVBUF: maximum size of the receive buffer, can be clamped by the OS
recv_buffer_size = "100 KB"

# Optional, SO_SNDBUF: maximum size in bytes of the send buffer, can be clamped by the OS
send_buffer_size = 102400
# SO_SNDBUF: maximum size of the send buffer, can be clamped by the OS
send_buffer_size = "100 KB"

# Optional, SO_KEEPALIVE: whether to regularly send a keepalive packet maintaining the connection
# SO_KEEPALIVE: whether to regularly send a keepalive packet maintaining the connection
keepalive = false

# Optional, TCP_NODELAY: enable/disable the Nagle algorithm which buffers data before sending
# segments
# TCP_NODELAY: enable/disable the Nagle algorithm which buffers data before sending segments
nodelay = false

# Optional, SO_LINGER: delay to wait for while data is being transmitted before closing the socket
# after a close or shutdown call has been received
linger = "100 ms"
# SO_LINGER: delay to wait for while data is being transmitted before closing the socket after a
# close or shutdown call has been received
linger = "0 s"

# QUIC protocol configuration.
[quic]
Expand Down
8 changes: 8 additions & 0 deletions integration/src/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ use server::configs::config_provider::{ConfigProvider, FileConfigProvider};

pub const SYSTEM_PATH_ENV_VAR: &str = "IGGY_SYSTEM_PATH";
pub const TEST_VERBOSITY_ENV_VAR: &str = "IGGY_TEST_VERBOSE";
pub const OVERRIDE_DEFAULTS_ENV_VAR: &str = "IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS";
pub const IPV6_ENV_VAR: &str = "IGGY_TCP_SOCKET_IPV6";
const USER_PASSWORD: &str = "secret";
const SLEEP_INTERVAL_MS: u64 = 20;
const LOCAL_DATA_PREFIX: &str = "local_data_";

const MAX_PORT_WAIT_DURATION_S: u64 = 60;

#[derive(PartialEq)]
pub enum IpAddrKind {
V4,
V6,
Expand Down Expand Up @@ -91,6 +94,11 @@ impl TestServer {
}
}

if ip_kind == IpAddrKind::V6 {
envs.insert(OVERRIDE_DEFAULTS_ENV_VAR.to_string(), "true".to_string());
envs.insert(IPV6_ENV_VAR.to_string(), "true".to_string());
}

// If IGGY_SYSTEM_PATH is not set, use a random path starting with "local_data_"
let local_data_path = if let Some(system_path) = envs.get(SYSTEM_PATH_ENV_VAR) {
system_path.to_string()
Expand Down
14 changes: 7 additions & 7 deletions sdk/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,13 +444,13 @@ impl ConnectionString {
}

let connection_string = connection_string.replace(CONNECTION_STRING_PREFIX, "");
let parts = connection_string.split("@").collect::<Vec<&str>>();
let parts = connection_string.split('@').collect::<Vec<&str>>();

if parts.len() != 2 {
return Err(IggyError::InvalidConnectionString);
}

let credentials = parts[0].split(":").collect::<Vec<&str>>();
let credentials = parts[0].split(':').collect::<Vec<&str>>();
if credentials.len() != 2 {
return Err(IggyError::InvalidConnectionString);
}
Expand All @@ -461,7 +461,7 @@ impl ConnectionString {
return Err(IggyError::InvalidConnectionString);
}

let server_and_options = parts[1].split("?").collect::<Vec<&str>>();
let server_and_options = parts[1].split('?').collect::<Vec<&str>>();
if server_and_options.len() > 2 {
return Err(IggyError::InvalidConnectionString);
}
Expand All @@ -471,11 +471,11 @@ impl ConnectionString {
return Err(IggyError::InvalidConnectionString);
}

if !server_address.contains(":") {
if !server_address.contains(':') {
return Err(IggyError::InvalidConnectionString);
}

let port = server_address.split(":").collect::<Vec<&str>>()[1];
let port = server_address.split(':').collect::<Vec<&str>>()[1];
if port.is_empty() {
return Err(IggyError::InvalidConnectionString);
}
Expand All @@ -502,7 +502,7 @@ impl ConnectionString {
}

fn parse_options(options: &str) -> Result<ConnectionStringOptions, IggyError> {
let options = options.split("&").collect::<Vec<&str>>();
let options = options.split('&').collect::<Vec<&str>>();
let mut tls_enabled = false;
let mut tls_domain = "localhost".to_string();
let mut reconnection_retries = "unlimited".to_owned();
Expand All @@ -511,7 +511,7 @@ impl ConnectionString {
let mut heartbeat_interval = "5s".to_owned();

for option in options {
let option_parts = option.split("=").collect::<Vec<&str>>();
let option_parts = option.split('=').collect::<Vec<&str>>();
if option_parts.len() != 2 {
return Err(IggyError::InvalidConnectionString);
}
Expand Down
1 change: 1 addition & 0 deletions sdk/src/clients/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ impl IggyConsumer {
offset,
&last_stored_offsets,
)
.await
}
});

Expand Down
15 changes: 10 additions & 5 deletions server/src/configs/defaults.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::duration::IggyDuration;

use crate::configs::http::{
HttpConfig, HttpCorsConfig, HttpJwtConfig, HttpMetricsConfig, HttpTlsConfig,
};
Expand All @@ -15,6 +18,7 @@ use crate::configs::system::{
};
use crate::configs::tcp::{TcpConfig, TcpTlsConfig};
use std::sync::Arc;
use std::time::Duration;

use super::tcp::TcpSocketConfig;

Expand Down Expand Up @@ -140,12 +144,13 @@ impl Default for TcpTlsConfig {
impl Default for TcpSocketConfig {
fn default() -> TcpSocketConfig {
TcpSocketConfig {
override_defaults: false,
ipv6: false,
recv_buffer_size: None,
send_buffer_size: None,
keepalive: None,
nodelay: None,
linger: None,
recv_buffer_size: IggyByteSize::from(100_000_u64),
send_buffer_size: IggyByteSize::from(100_000_u64),
keepalive: false,
nodelay: false,
linger: IggyDuration::new(Duration::new(0, 0)),
}
}
}
Expand Down
17 changes: 11 additions & 6 deletions server/src/configs/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use iggy::utils::duration::IggyDuration;
use iggy::utils::{byte_size::IggyByteSize, duration::IggyDuration};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use serde_with::DisplayFromStr;

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct TcpConfig {
Expand All @@ -16,12 +18,15 @@ pub struct TcpTlsConfig {
pub password: String,
}

#[serde_as]
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct TcpSocketConfig {
pub override_defaults: bool,
pub ipv6: bool,
pub recv_buffer_size: Option<u32>,
pub send_buffer_size: Option<u32>,
pub keepalive: Option<bool>,
pub nodelay: Option<bool>,
pub linger: Option<IggyDuration>,
pub recv_buffer_size: IggyByteSize,
pub send_buffer_size: IggyByteSize,
pub keepalive: bool,
pub nodelay: bool,
#[serde_as(as = "DisplayFromStr")]
pub linger: IggyDuration,
}
121 changes: 33 additions & 88 deletions server/src/tcp/tcp_socket.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,39 @@
use std::num::TryFromIntError;

use tokio::net::TcpSocket;

use crate::configs::tcp::TcpSocketConfig;

pub fn build(config: TcpSocketConfig) -> TcpSocket {
let socket = if config.ipv6 {
let socket = if config.override_defaults && config.ipv6 {
TcpSocket::new_v6().expect("Unable to create an ipv6 socket")
} else {
TcpSocket::new_v4().expect("Unable to create an ipv4 socket")
};

if let Some(size) = config.recv_buffer_size {
socket
.set_recv_buffer_size(size)
if config.override_defaults {
config
.recv_buffer_size
.as_bytes_u64()
.try_into()
.map_err(|e: TryFromIntError| std::io::Error::other(e.to_string()))
.and_then(|size: u32| socket.set_recv_buffer_size(size))
.expect("Unable to set SO_RCVBUF on socket");
}
if let Some(size) = config.send_buffer_size {
socket
.set_send_buffer_size(size)
config
.send_buffer_size
.as_bytes_u64()
.try_into()
.map_err(|e: TryFromIntError| std::io::Error::other(e.to_string()))
.and_then(|size| socket.set_send_buffer_size(size))
.expect("Unable to set SO_SNDBUF on socket");
}
if let Some(keepalive) = config.keepalive {
socket
.set_keepalive(keepalive)
.set_keepalive(config.keepalive)
.expect("Unable to set SO_KEEPALIVE on socket");
}
if let Some(nodelay) = config.nodelay {
socket
.set_nodelay(nodelay)
.set_nodelay(config.nodelay)
.expect("Unable to set TCP_NODELAY on socket");
}
if let Some(linger) = config.linger {
socket
.set_linger(Some(linger.get_duration()))
.set_linger(Some(config.linger.get_duration()))
.expect("Unable to set SO_LINGER on socket");
}

Expand All @@ -42,85 +44,28 @@ pub fn build(config: TcpSocketConfig) -> TcpSocket {
mod tests {
use std::time::Duration;

use iggy::utils::duration::IggyDuration;
use iggy::utils::{byte_size::IggyByteSize, duration::IggyDuration};

use super::*;

#[test]
fn given_recv_config_socket_should_be_configured() {
let recv_buffer_size = 425984;
let config = TcpSocketConfig {
ipv6: true,
recv_buffer_size: Some(recv_buffer_size),
send_buffer_size: None,
keepalive: None,
nodelay: None,
linger: None,
};
let socket = build(config);
let recv = socket.recv_buffer_size().unwrap();
assert_eq!(recv, recv_buffer_size);
}

#[test]
fn given_send_config_socket_should_be_configured() {
let send_buffer_size = 425984;
let config = TcpSocketConfig {
ipv6: true,
recv_buffer_size: None,
send_buffer_size: Some(send_buffer_size),
keepalive: None,
nodelay: None,
linger: None,
};
let socket = build(config);
let send = socket.send_buffer_size().unwrap();
assert_eq!(send, send_buffer_size);
}

#[test]
fn given_keepalive_config_socket_should_be_configured() {
let config = TcpSocketConfig {
ipv6: true,
recv_buffer_size: None,
send_buffer_size: None,
keepalive: Some(true),
nodelay: None,
linger: None,
};
let socket = build(config);
let keepalive = socket.keepalive().unwrap();
assert!(keepalive);
}

#[test]
fn given_nodelay_config_socket_should_be_configured() {
let config = TcpSocketConfig {
ipv6: true,
recv_buffer_size: None,
send_buffer_size: None,
keepalive: None,
nodelay: Some(true),
linger: None,
};
let socket = build(config);
let nodelay = socket.nodelay().unwrap();
assert!(nodelay);
}

#[test]
fn given_linger_config_socket_should_be_configured() {
fn given_override_defaults_socket_should_be_configured() {
let buffer_size = 425984;
let linger_dur = Duration::new(1, 0);
let config = TcpSocketConfig {
override_defaults: true,
ipv6: true,
recv_buffer_size: None,
send_buffer_size: None,
keepalive: None,
nodelay: None,
linger: Some(IggyDuration::new(linger_dur)),
recv_buffer_size: IggyByteSize::from(buffer_size),
send_buffer_size: IggyByteSize::from(buffer_size),
keepalive: true,
nodelay: true,
linger: IggyDuration::new(linger_dur),
};
let socket = build(config);
let linger = socket.linger().unwrap();
assert_eq!(linger, Some(linger_dur));
assert_eq!(socket.recv_buffer_size().unwrap(), buffer_size as u32);
assert_eq!(socket.send_buffer_size().unwrap(), buffer_size as u32);
assert!(socket.keepalive().unwrap());
assert!(socket.nodelay().unwrap());
assert_eq!(socket.linger().unwrap(), Some(linger_dur));
}
}

0 comments on commit 2668fde

Please sign in to comment.