Skip to content

Commit

Permalink
Expose TCP parameters apache#31
Browse files Browse the repository at this point in the history
  • Loading branch information
BenFradet committed Nov 3, 2024
1 parent 5fb550d commit 95cefc9
Show file tree
Hide file tree
Showing 12 changed files with 193 additions and 19 deletions.
9 changes: 9 additions & 0 deletions configs/server.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@
"enabled": false,
"certificate": "certs/iggy.pfx",
"password": "iggy123"
},
"socket": {
"override_defaults": false,
"ipv6": false,
"recv_buffer_size": "100 KB",
"send_buffer_size": "100 KB",
"keepalive": false,
"nodelay": false,
"linger": "0 s"
}
},
"quic": {
Expand Down
24 changes: 24 additions & 0 deletions configs/server.toml
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,30 @@ certificate = "certs/iggy.pfx"
# Password for the TLS certificate, required for accessing the private key.
password = "iggy123"

# Configuration for the TCP socket
[tcp.socket]
# Whether to overwrite the OS-default socket parameters
override_defaults = false

# Whether to use an ipv4 or ipv6 socket
ipv6 = false

# SO_RCVBUF: maximum size of the receive buffer, can be clamped by the OS
recv_buffer_size = "100 KB"

# SO_SNDBUF: maximum size of the send buffer, can be clamped by the OS
send_buffer_size = "100 KB"

# SO_KEEPALIVE: whether to regularly send a keepalive packet maintaining the connection
keepalive = false

# TCP_NODELAY: enable/disable the Nagle algorithm which buffers data before sending segments
nodelay = false

# SO_LINGER: delay to wait for while data is being transmitted before closing the socket after a
# close or shutdown call has been received
linger = "0 s"

# QUIC protocol configuration.
[quic]
# Controls whether the QUIC server is enabled.
Expand Down
8 changes: 8 additions & 0 deletions integration/src/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ use server::configs::config_provider::{ConfigProvider, FileConfigProvider};

pub const SYSTEM_PATH_ENV_VAR: &str = "IGGY_SYSTEM_PATH";
pub const TEST_VERBOSITY_ENV_VAR: &str = "IGGY_TEST_VERBOSE";
pub const OVERRIDE_DEFAULTS_ENV_VAR: &str = "IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS";
pub const IPV6_ENV_VAR: &str = "IGGY_TCP_SOCKET_IPV6";
const USER_PASSWORD: &str = "secret";
const SLEEP_INTERVAL_MS: u64 = 20;
const LOCAL_DATA_PREFIX: &str = "local_data_";

const MAX_PORT_WAIT_DURATION_S: u64 = 60;

#[derive(PartialEq)]
pub enum IpAddrKind {
V4,
V6,
Expand Down Expand Up @@ -91,6 +94,11 @@ impl TestServer {
}
}

if ip_kind == IpAddrKind::V6 {
envs.insert(OVERRIDE_DEFAULTS_ENV_VAR.to_string(), "true".to_string());
envs.insert(IPV6_ENV_VAR.to_string(), "true".to_string());
}

// If IGGY_SYSTEM_PATH is not set, use a random path starting with "local_data_"
let local_data_path = if let Some(system_path) = envs.get(SYSTEM_PATH_ENV_VAR) {
system_path.to_string()
Expand Down
14 changes: 7 additions & 7 deletions sdk/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,13 +444,13 @@ impl ConnectionString {
}

let connection_string = connection_string.replace(CONNECTION_STRING_PREFIX, "");
let parts = connection_string.split("@").collect::<Vec<&str>>();
let parts = connection_string.split('@').collect::<Vec<&str>>();

if parts.len() != 2 {
return Err(IggyError::InvalidConnectionString);
}

let credentials = parts[0].split(":").collect::<Vec<&str>>();
let credentials = parts[0].split(':').collect::<Vec<&str>>();
if credentials.len() != 2 {
return Err(IggyError::InvalidConnectionString);
}
Expand All @@ -461,7 +461,7 @@ impl ConnectionString {
return Err(IggyError::InvalidConnectionString);
}

let server_and_options = parts[1].split("?").collect::<Vec<&str>>();
let server_and_options = parts[1].split('?').collect::<Vec<&str>>();
if server_and_options.len() > 2 {
return Err(IggyError::InvalidConnectionString);
}
Expand All @@ -471,11 +471,11 @@ impl ConnectionString {
return Err(IggyError::InvalidConnectionString);
}

if !server_address.contains(":") {
if !server_address.contains(':') {
return Err(IggyError::InvalidConnectionString);
}

let port = server_address.split(":").collect::<Vec<&str>>()[1];
let port = server_address.split(':').collect::<Vec<&str>>()[1];
if port.is_empty() {
return Err(IggyError::InvalidConnectionString);
}
Expand All @@ -502,7 +502,7 @@ impl ConnectionString {
}

fn parse_options(options: &str) -> Result<ConnectionStringOptions, IggyError> {
let options = options.split("&").collect::<Vec<&str>>();
let options = options.split('&').collect::<Vec<&str>>();
let mut tls_enabled = false;
let mut tls_domain = "localhost".to_string();
let mut reconnection_retries = "unlimited".to_owned();
Expand All @@ -511,7 +511,7 @@ impl ConnectionString {
let mut heartbeat_interval = "5s".to_owned();

for option in options {
let option_parts = option.split("=").collect::<Vec<&str>>();
let option_parts = option.split('=').collect::<Vec<&str>>();
if option_parts.len() != 2 {
return Err(IggyError::InvalidConnectionString);
}
Expand Down
1 change: 1 addition & 0 deletions sdk/src/clients/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ impl IggyConsumer {
offset,
&last_stored_offsets,
)
.await
}
});

Expand Down
21 changes: 21 additions & 0 deletions server/src/configs/defaults.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::duration::IggyDuration;

use crate::configs::http::{
HttpConfig, HttpCorsConfig, HttpJwtConfig, HttpMetricsConfig, HttpTlsConfig,
};
Expand All @@ -15,6 +18,9 @@ use crate::configs::system::{
};
use crate::configs::tcp::{TcpConfig, TcpTlsConfig};
use std::sync::Arc;
use std::time::Duration;

use super::tcp::TcpSocketConfig;

static_toml::static_toml! {
// static_toml crate always starts from CARGO_MANIFEST_DIR (in this case iggy-server root directory)
Expand Down Expand Up @@ -120,6 +126,7 @@ impl Default for TcpConfig {
enabled: SERVER_CONFIG.tcp.enabled,
address: SERVER_CONFIG.tcp.address.parse().unwrap(),
tls: TcpTlsConfig::default(),
socket: TcpSocketConfig::default(),
}
}
}
Expand All @@ -134,6 +141,20 @@ impl Default for TcpTlsConfig {
}
}

impl Default for TcpSocketConfig {
fn default() -> TcpSocketConfig {
TcpSocketConfig {
override_defaults: false,
ipv6: false,
recv_buffer_size: IggyByteSize::from(100_000_u64),
send_buffer_size: IggyByteSize::from(100_000_u64),
keepalive: false,
nodelay: false,
linger: IggyDuration::new(Duration::new(0, 0)),
}
}
}

impl Default for HttpConfig {
fn default() -> HttpConfig {
HttpConfig {
Expand Down
17 changes: 17 additions & 0 deletions server/src/configs/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use iggy::utils::{byte_size::IggyByteSize, duration::IggyDuration};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use serde_with::DisplayFromStr;

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct TcpConfig {
pub enabled: bool,
pub address: String,
pub tls: TcpTlsConfig,
pub socket: TcpSocketConfig,
}

#[derive(Debug, Deserialize, Serialize, Clone)]
Expand All @@ -13,3 +17,16 @@ pub struct TcpTlsConfig {
pub certificate: String,
pub password: String,
}

#[serde_as]
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct TcpSocketConfig {
pub override_defaults: bool,
pub ipv6: bool,
pub recv_buffer_size: IggyByteSize,
pub send_buffer_size: IggyByteSize,
pub keepalive: bool,
pub nodelay: bool,
#[serde_as(as = "DisplayFromStr")]
pub linger: IggyDuration,
}
1 change: 1 addition & 0 deletions server/src/tcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ pub mod sender;
pub mod tcp_listener;
mod tcp_sender;
pub mod tcp_server;
mod tcp_socket;
pub mod tcp_tls_listener;
pub mod tcp_tls_sender;
17 changes: 12 additions & 5 deletions server/src/tcp/tcp_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,24 @@ use crate::tcp::connection_handler::{handle_connection, handle_error};
use crate::tcp::tcp_sender::TcpSender;
use std::net::SocketAddr;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tokio::net::TcpSocket;
use tokio::sync::oneshot;
use tracing::{error, info};

pub async fn start(address: &str, system: SharedSystem) -> SocketAddr {
pub async fn start(address: &str, socket: TcpSocket, system: SharedSystem) -> SocketAddr {
let address = address.to_string();
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
let listener = TcpListener::bind(&address)
.await
.expect("Unable to start TCP server.");
let addr = address.parse();
if addr.is_err() {
panic!("Unable to parse address {:?}", address);
}

socket
.bind(addr.unwrap())
.expect("Unable to bind socket to address");

let listener = socket.listen(1024).expect("Unable to start TCP server.");

let local_addr = listener
.local_addr()
Expand Down
7 changes: 4 additions & 3 deletions server/src/tcp/tcp_server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::configs::tcp::TcpConfig;
use crate::streaming::systems::system::SharedSystem;
use crate::tcp::{tcp_listener, tcp_tls_listener};
use crate::tcp::{tcp_listener, tcp_socket, tcp_tls_listener};
use std::net::SocketAddr;
use tracing::info;

Expand All @@ -13,9 +13,10 @@ pub async fn start(config: TcpConfig, system: SharedSystem) -> SocketAddr {
"Iggy TCP"
};
info!("Initializing {server_name} server...");
let socket = tcp_socket::build(config.socket);
let addr = match config.tls.enabled {
true => tcp_tls_listener::start(&config.address, config.tls, system).await,
false => tcp_listener::start(&config.address, system).await,
true => tcp_tls_listener::start(&config.address, config.tls, socket, system).await,
false => tcp_listener::start(&config.address, socket, system).await,
};
info!("{server_name} server has started on: {:?}", addr);
addr
Expand Down
71 changes: 71 additions & 0 deletions server/src/tcp/tcp_socket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use std::num::TryFromIntError;

use tokio::net::TcpSocket;

use crate::configs::tcp::TcpSocketConfig;

pub fn build(config: TcpSocketConfig) -> TcpSocket {
let socket = if config.override_defaults && config.ipv6 {
TcpSocket::new_v6().expect("Unable to create an ipv6 socket")
} else {
TcpSocket::new_v4().expect("Unable to create an ipv4 socket")
};

if config.override_defaults {
config
.recv_buffer_size
.as_bytes_u64()
.try_into()
.map_err(|e: TryFromIntError| std::io::Error::other(e.to_string()))
.and_then(|size: u32| socket.set_recv_buffer_size(size))
.expect("Unable to set SO_RCVBUF on socket");
config
.send_buffer_size
.as_bytes_u64()
.try_into()
.map_err(|e: TryFromIntError| std::io::Error::other(e.to_string()))
.and_then(|size| socket.set_send_buffer_size(size))
.expect("Unable to set SO_SNDBUF on socket");
socket
.set_keepalive(config.keepalive)
.expect("Unable to set SO_KEEPALIVE on socket");
socket
.set_nodelay(config.nodelay)
.expect("Unable to set TCP_NODELAY on socket");
socket
.set_linger(Some(config.linger.get_duration()))
.expect("Unable to set SO_LINGER on socket");
}

socket
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use iggy::utils::{byte_size::IggyByteSize, duration::IggyDuration};

use super::*;

#[test]
fn given_override_defaults_socket_should_be_configured() {
let buffer_size = 425984;
let linger_dur = Duration::new(1, 0);
let config = TcpSocketConfig {
override_defaults: true,
ipv6: true,
recv_buffer_size: IggyByteSize::from(buffer_size),
send_buffer_size: IggyByteSize::from(buffer_size),
keepalive: true,
nodelay: true,
linger: IggyDuration::new(linger_dur),
};
let socket = build(config);
assert_eq!(socket.recv_buffer_size().unwrap(), buffer_size as u32);
assert_eq!(socket.send_buffer_size().unwrap(), buffer_size as u32);
assert!(socket.keepalive().unwrap());
assert!(socket.nodelay().unwrap());
assert_eq!(socket.linger().unwrap(), Some(linger_dur));
}
}
22 changes: 18 additions & 4 deletions server/src/tcp/tcp_tls_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@ use crate::tcp::connection_handler::{handle_connection, handle_error};
use crate::tcp::tcp_tls_sender::TcpTlsSender;
use std::net::SocketAddr;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tokio::net::TcpSocket;
use tokio::sync::oneshot;
use tokio_native_tls::native_tls;
use tokio_native_tls::native_tls::Identity;
use tracing::{error, info};

pub(crate) async fn start(address: &str, config: TcpTlsConfig, system: SharedSystem) -> SocketAddr {
pub(crate) async fn start(
address: &str,
config: TcpTlsConfig,
socket: TcpSocket,
system: SharedSystem,
) -> SocketAddr {
let address = address.to_string();
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
Expand All @@ -31,8 +36,17 @@ pub(crate) async fn start(address: &str, config: TcpTlsConfig, system: SharedSys
.unwrap(),
);

let listener = TcpListener::bind(&address)
.await
let addr = address.parse();
if addr.is_err() {
panic!("Unable to parse address {:?}", address);
}

socket
.bind(addr.unwrap())
.expect("Unable to bind socket to address");

let listener = socket
.listen(1024)
.expect("Unable to start TCP TLS server.");

let local_addr = listener
Expand Down

0 comments on commit 95cefc9

Please sign in to comment.