Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose TCP parameters #1321

Merged
merged 1 commit into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions configs/server.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,19 @@
"tcp": {
"enabled": true,
"address": "0.0.0.0:8090",
"ipv6": false,
"tls": {
"enabled": false,
"certificate": "certs/iggy.pfx",
"password": "iggy123"
},
"socket": {
"override_defaults": false,
"recv_buffer_size": "100 KB",
"send_buffer_size": "100 KB",
"keepalive": false,
"nodelay": false,
"linger": "0 s"
}
},
"quic": {
Expand Down
24 changes: 24 additions & 0 deletions configs/server.toml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ enabled = true
# For example, "0.0.0.0:8090" listens on all network interfaces on port 8090.
address = "0.0.0.0:8090"

# Whether to use ipv4 or ipv6
ipv6 = false

# TLS configuration for the TCP server.
[tcp.tls]
# Enables or disables TLS for TCP connections.
Expand All @@ -180,6 +183,27 @@ certificate = "certs/iggy.pfx"
# Password for the TLS certificate, required for accessing the private key.
password = "iggy123"

# Configuration for the TCP socket
[tcp.socket]
# Whether to overwrite the OS-default socket parameters
override_defaults = false

# SO_RCVBUF: maximum size of the receive buffer, can be clamped by the OS
recv_buffer_size = "100 KB"

# SO_SNDBUF: maximum size of the send buffer, can be clamped by the OS
send_buffer_size = "100 KB"

# SO_KEEPALIVE: whether to regularly send a keepalive packet maintaining the connection
keepalive = false

# TCP_NODELAY: enable/disable the Nagle algorithm which buffers data before sending segments
nodelay = false

# SO_LINGER: delay to wait for while data is being transmitted before closing the socket after a
# close or shutdown call has been received
linger = "0 s"

# QUIC protocol configuration.
[quic]
# Controls whether the QUIC server is enabled.
Expand Down
6 changes: 6 additions & 0 deletions integration/src/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ use server::configs::config_provider::{ConfigProvider, FileConfigProvider};

pub const SYSTEM_PATH_ENV_VAR: &str = "IGGY_SYSTEM_PATH";
pub const TEST_VERBOSITY_ENV_VAR: &str = "IGGY_TEST_VERBOSE";
pub const IPV6_ENV_VAR: &str = "IGGY_TCP_IPV6";
const USER_PASSWORD: &str = "secret";
const SLEEP_INTERVAL_MS: u64 = 20;
const LOCAL_DATA_PREFIX: &str = "local_data_";

const MAX_PORT_WAIT_DURATION_S: u64 = 60;

#[derive(PartialEq)]
pub enum IpAddrKind {
V4,
V6,
Expand Down Expand Up @@ -91,6 +93,10 @@ impl TestServer {
}
}

if ip_kind == IpAddrKind::V6 {
envs.insert(IPV6_ENV_VAR.to_string(), "true".to_string());
}

// If IGGY_SYSTEM_PATH is not set, use a random path starting with "local_data_"
let local_data_path = if let Some(system_path) = envs.get(SYSTEM_PATH_ENV_VAR) {
system_path.to_string()
Expand Down
2 changes: 1 addition & 1 deletion sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.6.32"
version = "0.6.33"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "MIT"
Expand Down
14 changes: 7 additions & 7 deletions sdk/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,13 +454,13 @@ impl ConnectionString {
}

let connection_string = connection_string.replace(CONNECTION_STRING_PREFIX, "");
let parts = connection_string.split("@").collect::<Vec<&str>>();
let parts = connection_string.split('@').collect::<Vec<&str>>();

if parts.len() != 2 {
return Err(IggyError::InvalidConnectionString);
}

let credentials = parts[0].split(":").collect::<Vec<&str>>();
let credentials = parts[0].split(':').collect::<Vec<&str>>();
if credentials.len() != 2 {
return Err(IggyError::InvalidConnectionString);
}
Expand All @@ -471,7 +471,7 @@ impl ConnectionString {
return Err(IggyError::InvalidConnectionString);
}

let server_and_options = parts[1].split("?").collect::<Vec<&str>>();
let server_and_options = parts[1].split('?').collect::<Vec<&str>>();
if server_and_options.len() > 2 {
return Err(IggyError::InvalidConnectionString);
}
Expand All @@ -481,11 +481,11 @@ impl ConnectionString {
return Err(IggyError::InvalidConnectionString);
}

if !server_address.contains(":") {
if !server_address.contains(':') {
return Err(IggyError::InvalidConnectionString);
}

let port = server_address.split(":").collect::<Vec<&str>>()[1];
let port = server_address.split(':').collect::<Vec<&str>>()[1];
if port.is_empty() {
return Err(IggyError::InvalidConnectionString);
}
Expand All @@ -512,7 +512,7 @@ impl ConnectionString {
}

fn parse_options(options: &str) -> Result<ConnectionStringOptions, IggyError> {
let options = options.split("&").collect::<Vec<&str>>();
let options = options.split('&').collect::<Vec<&str>>();
let mut tls_enabled = false;
let mut tls_domain = "localhost".to_string();
let mut reconnection_retries = "unlimited".to_owned();
Expand All @@ -521,7 +521,7 @@ impl ConnectionString {
let mut heartbeat_interval = "5s".to_owned();

for option in options {
let option_parts = option.split("=").collect::<Vec<&str>>();
let option_parts = option.split('=').collect::<Vec<&str>>();
if option_parts.len() != 2 {
return Err(IggyError::InvalidConnectionString);
}
Expand Down
1 change: 1 addition & 0 deletions sdk/src/clients/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ impl IggyConsumer {
offset,
&last_stored_offsets,
)
.await
}
});

Expand Down
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.61"
version = "0.4.62"
edition = "2021"
build = "src/build.rs"

Expand Down
21 changes: 21 additions & 0 deletions server/src/configs/defaults.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::duration::IggyDuration;

use crate::configs::http::{
HttpConfig, HttpCorsConfig, HttpJwtConfig, HttpMetricsConfig, HttpTlsConfig,
};
Expand All @@ -15,6 +18,9 @@ use crate::configs::system::{
};
use crate::configs::tcp::{TcpConfig, TcpTlsConfig};
use std::sync::Arc;
use std::time::Duration;

use super::tcp::TcpSocketConfig;

static_toml::static_toml! {
// static_toml crate always starts from CARGO_MANIFEST_DIR (in this case iggy-server root directory)
Expand Down Expand Up @@ -119,7 +125,9 @@ impl Default for TcpConfig {
TcpConfig {
enabled: SERVER_CONFIG.tcp.enabled,
address: SERVER_CONFIG.tcp.address.parse().unwrap(),
ipv6: SERVER_CONFIG.tcp.ipv_6,
tls: TcpTlsConfig::default(),
socket: TcpSocketConfig::default(),
}
}
}
Expand All @@ -134,6 +142,19 @@ impl Default for TcpTlsConfig {
}
}

impl Default for TcpSocketConfig {
fn default() -> TcpSocketConfig {
TcpSocketConfig {
override_defaults: false,
recv_buffer_size: IggyByteSize::from(100_000_u64),
send_buffer_size: IggyByteSize::from(100_000_u64),
keepalive: false,
nodelay: false,
linger: IggyDuration::new(Duration::new(0, 0)),
}
}
}

impl Default for HttpConfig {
fn default() -> HttpConfig {
HttpConfig {
Expand Down
17 changes: 17 additions & 0 deletions server/src/configs/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
use iggy::utils::{byte_size::IggyByteSize, duration::IggyDuration};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use serde_with::DisplayFromStr;

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct TcpConfig {
pub enabled: bool,
pub address: String,
pub ipv6: bool,
pub tls: TcpTlsConfig,
pub socket: TcpSocketConfig,
}

#[derive(Debug, Deserialize, Serialize, Clone)]
Expand All @@ -13,3 +18,15 @@ pub struct TcpTlsConfig {
pub certificate: String,
pub password: String,
}

#[serde_as]
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct TcpSocketConfig {
pub override_defaults: bool,
pub recv_buffer_size: IggyByteSize,
pub send_buffer_size: IggyByteSize,
pub keepalive: bool,
pub nodelay: bool,
#[serde_as(as = "DisplayFromStr")]
pub linger: IggyDuration,
}
1 change: 1 addition & 0 deletions server/src/tcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ pub mod sender;
pub mod tcp_listener;
mod tcp_sender;
pub mod tcp_server;
mod tcp_socket;
pub mod tcp_tls_listener;
pub mod tcp_tls_sender;
17 changes: 12 additions & 5 deletions server/src/tcp/tcp_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,24 @@ use crate::tcp::connection_handler::{handle_connection, handle_error};
use crate::tcp::tcp_sender::TcpSender;
use std::net::SocketAddr;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tokio::net::TcpSocket;
use tokio::sync::oneshot;
use tracing::{error, info};

pub async fn start(address: &str, system: SharedSystem) -> SocketAddr {
pub async fn start(address: &str, socket: TcpSocket, system: SharedSystem) -> SocketAddr {
let address = address.to_string();
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
let listener = TcpListener::bind(&address)
.await
.expect("Unable to start TCP server.");
let addr = address.parse();
if addr.is_err() {
panic!("Unable to parse address {:?}", address);
}

socket
.bind(addr.unwrap())
.expect("Unable to bind socket to address");

let listener = socket.listen(1024).expect("Unable to start TCP server.");

let local_addr = listener
.local_addr()
Expand Down
7 changes: 4 additions & 3 deletions server/src/tcp/tcp_server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::configs::tcp::TcpConfig;
use crate::streaming::systems::system::SharedSystem;
use crate::tcp::{tcp_listener, tcp_tls_listener};
use crate::tcp::{tcp_listener, tcp_socket, tcp_tls_listener};
use std::net::SocketAddr;
use tracing::info;

Expand All @@ -13,9 +13,10 @@ pub async fn start(config: TcpConfig, system: SharedSystem) -> SocketAddr {
"Iggy TCP"
};
info!("Initializing {server_name} server...");
let socket = tcp_socket::build(config.ipv6, config.socket);
let addr = match config.tls.enabled {
true => tcp_tls_listener::start(&config.address, config.tls, system).await,
false => tcp_listener::start(&config.address, system).await,
true => tcp_tls_listener::start(&config.address, config.tls, socket, system).await,
false => tcp_listener::start(&config.address, socket, system).await,
};
info!("{server_name} server has started on: {:?}", addr);
addr
Expand Down
70 changes: 70 additions & 0 deletions server/src/tcp/tcp_socket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use std::num::TryFromIntError;

use tokio::net::TcpSocket;

use crate::configs::tcp::TcpSocketConfig;

pub fn build(ipv6: bool, config: TcpSocketConfig) -> TcpSocket {
let socket = if ipv6 {
TcpSocket::new_v6().expect("Unable to create an ipv6 socket")
} else {
TcpSocket::new_v4().expect("Unable to create an ipv4 socket")
};

if config.override_defaults {
config
.recv_buffer_size
.as_bytes_u64()
.try_into()
.map_err(|e: TryFromIntError| std::io::Error::other(e.to_string()))
.and_then(|size: u32| socket.set_recv_buffer_size(size))
.expect("Unable to set SO_RCVBUF on socket");
config
.send_buffer_size
.as_bytes_u64()
.try_into()
.map_err(|e: TryFromIntError| std::io::Error::other(e.to_string()))
.and_then(|size| socket.set_send_buffer_size(size))
.expect("Unable to set SO_SNDBUF on socket");
socket
.set_keepalive(config.keepalive)
.expect("Unable to set SO_KEEPALIVE on socket");
socket
.set_nodelay(config.nodelay)
.expect("Unable to set TCP_NODELAY on socket");
socket
.set_linger(Some(config.linger.get_duration()))
.expect("Unable to set SO_LINGER on socket");
}

socket
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use iggy::utils::{byte_size::IggyByteSize, duration::IggyDuration};

use super::*;

#[test]
fn given_override_defaults_socket_should_be_configured() {
let buffer_size = 425984;
let linger_dur = Duration::new(1, 0);
let config = TcpSocketConfig {
override_defaults: true,
recv_buffer_size: IggyByteSize::from(buffer_size),
send_buffer_size: IggyByteSize::from(buffer_size),
keepalive: true,
nodelay: true,
linger: IggyDuration::new(linger_dur),
};
let socket = build(false, config);
assert!(socket.recv_buffer_size().unwrap() >= buffer_size as u32);
assert!(socket.send_buffer_size().unwrap() >= buffer_size as u32);
assert!(socket.keepalive().unwrap());
assert!(socket.nodelay().unwrap());
assert_eq!(socket.linger().unwrap(), Some(linger_dur));
}
}
Loading
Loading