Skip to content

Commit

Permalink
Add Global connection counter
Browse files Browse the repository at this point in the history
  • Loading branch information
git001 committed Jun 26, 2024
1 parent 4fd5eee commit ec8db36
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 20 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "layer4-proxy"
version = "0.5.1"
version = "0.6.1"
edition = "2021"
authors = ["Aleksandar Lazic <[email protected]>","Jacob Kiers <[email protected]>"]
license = "Apache-2.0"
Expand Down
2 changes: 1 addition & 1 deletion container-files/etc/l4p/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ servers:
www.test1.com: proxy-via
default: echo
# note yet implemented
maxclients: 4
maxclients: 2
via:
*viaanchor

Expand Down
3 changes: 1 addition & 2 deletions src/config/config_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ fn load_config(path: &str) -> Result<ParsedConfigV1, ConfigError> {
env_logger::builder().default_format().init();
//pretty_env_logger::init_custom_env("FOURTH_LOG");
//pretty_env_logger::init_timed();

}

info!("Using config file: {}", &path);
Expand All @@ -154,7 +153,7 @@ fn load_config(path: &str) -> Result<ParsedConfigV1, ConfigError> {
let ups = upstream.as_str().try_into()?;
parsed_upstream.insert(name.to_string(), Upstream::Proxy(ups));
}

let via: ViaUpstream = base.via.clone();
debug!("via {:?}", via);

Expand Down
6 changes: 5 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use crate::config::ConfigV1;
use crate::servers::Server;

use log::{debug, error, info};
use std::path::PathBuf;
use std::{path::PathBuf, sync::atomic::AtomicUsize};

static GLOBAL_THREAD_COUNT: AtomicUsize = AtomicUsize::new(0);

fn main() {
let config_path = match find_config() {
Expand All @@ -27,6 +29,8 @@ fn main() {
std::process::exit(1);
}
};

debug!("GLOBAL_THREAD_COUNT :{:?}:", GLOBAL_THREAD_COUNT);
debug!("{:?}", config);

let mut server = Server::new_from_v1_config(config.base);
Expand Down
9 changes: 4 additions & 5 deletions src/servers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use log::{debug, error, info};
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::runtime::Handle;
use tokio::sync::Semaphore;

use tokio::signal::unix::{signal, SignalKind};
Expand Down Expand Up @@ -33,8 +32,8 @@ pub(crate) struct Proxy {
pub default_action: String,
pub upstream: HashMap<String, Upstream>,
pub via: ViaUpstream,
//pub maxclients: Arc<Semaphore<>>,
pub maxclients: usize,
pub maxclients: Arc<Semaphore>,
//pub maxclients: usize,
}

impl Server {
Expand Down Expand Up @@ -76,8 +75,8 @@ impl Server {
default_action: default.clone(),
upstream: upstream.clone(),
via: proxy.via.clone(),
//maxclients: Arc::new(Semaphore::new(proxy.maxclients)),
maxclients: proxy.maxclients,
maxclients: Arc::new(Semaphore::new(proxy.maxclients)),
//maxclients: proxy.maxclients,
};
new_server.proxies.push(Arc::new(proxy));
}
Expand Down
28 changes: 24 additions & 4 deletions src/servers/protocol/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::servers::protocol::tls::get_sni;
use crate::servers::Proxy;
use crate::GLOBAL_THREAD_COUNT;
use log::{debug, error, info, warn};
use std::error::Error;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream};

Expand All @@ -16,8 +18,6 @@ pub(crate) async fn proxy(config: Arc<Proxy>) -> Result<(), Box<dyn Error>> {

loop {
let thread_proxy = config.clone();
//let permit = config.maxclients.clone().acquire_owned().await.unwrap();
//debug!("permit.num_permits {:?}",permit.num_permits());
match listener.accept().await {
Err(err) => {
error!("Failed to accept connection: {}", err);
Expand All @@ -40,10 +40,19 @@ pub(crate) async fn proxy(config: Arc<Proxy>) -> Result<(), Box<dyn Error>> {
}

async fn accept(inbound: TcpStream, proxy: Arc<Proxy>) -> Result<(), Box<dyn Error>> {
//let permit = proxy.maxclients.clone().acquire_owned().await.unwrap();
//info!("permit.num_permits {:?}", permit.num_permits());

if proxy.default_action.contains("health") {
debug!("Health check request")
} else {
info!("New connection from {:?}", inbound.peer_addr()?);
let old = GLOBAL_THREAD_COUNT.fetch_add(1, Ordering::SeqCst);
info!(
"New connection from {:?} , num :{:?}: Current Connections :{:?}",
inbound.peer_addr()?,
old + 1,
GLOBAL_THREAD_COUNT
);
}

let upstream_name = match proxy.tls {
Expand Down Expand Up @@ -88,10 +97,21 @@ async fn accept(inbound: TcpStream, proxy: Arc<Proxy>) -> Result<(), Box<dyn Err

match upstream.process(inbound, proxy.clone()).await {
Ok(_) => {
info!("Connection closed for {:?}", upstream_name);
let old = GLOBAL_THREAD_COUNT.fetch_sub(1, Ordering::SeqCst);
info!(
"Connection closed for {:?}, num :{:?}: Current Connections :{:?}",
upstream_name, old, GLOBAL_THREAD_COUNT
);
//drop(permit);
Ok(())
}
Err(e) => {
let old = GLOBAL_THREAD_COUNT.fetch_sub(1, Ordering::SeqCst);
info!(
"Connection closed for {:?}, num :{:?}: Current Connections :{:?}",
upstream_name, old, GLOBAL_THREAD_COUNT
);
//drop(permit);
error!("my error {:?}", e);
Ok(())
}
Expand Down
9 changes: 4 additions & 5 deletions src/upstreams/mod.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
mod proxy_to_upstream;

use crate::servers::Proxy;
use http_body_util::Full;
use hyper::body::Bytes;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use hyper::{Request, Response};
use http_body_util::Full;
use hyper::body::Bytes;
use hyper_util::rt::TokioIo;
use log::debug;
use serde::Deserialize;
use std::convert::Infallible;
use std::error::Error;
use std::sync::Arc;
use tokio::io;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpStream;
use std::convert::Infallible;


pub use crate::upstreams::proxy_to_upstream::ProxyToUpstream;

Expand Down

0 comments on commit ec8db36

Please sign in to comment.