-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactor API to allow user to handle reconnection loop (#20)
* Refactor * Update README.md * Improve documentation * Bump tracing-core * Re-order imports * Rework connection fn signature
- Loading branch information
Showing
11 changed files
with
450 additions
and
339 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
use std::time::Duration; | ||
|
||
use tokio::time::sleep; | ||
use tracing_gelf::Logger; | ||
|
||
#[tokio::main] | ||
async fn main() { | ||
// Graylog address | ||
let address = "127.0.0.1:12201"; | ||
|
||
// Initialize subscriber, returning a connection handle | ||
let mut conn_handle = Logger::builder().init_tcp(address).unwrap(); | ||
|
||
// Reconnection loop | ||
let reconnect = async move { | ||
loop { | ||
// Attempt to connect | ||
let errors = conn_handle.connect().await; | ||
|
||
// Process errors | ||
for (socket, error) in errors.0 { | ||
// Perhaps log errors to an active layer | ||
tracing::error!(%socket, %error); | ||
} | ||
|
||
// Don't attempt reconnect immediately | ||
sleep(Duration::from_secs(5)).await; | ||
} | ||
}; | ||
|
||
// Spawn background task | ||
// Any futures executor can be used | ||
tokio::spawn(reconnect); | ||
|
||
// Send a log to Graylog | ||
tracing::info!("one day"); | ||
|
||
// Don't exit | ||
loop {} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
mod tcp; | ||
mod udp; | ||
|
||
use std::{io, net::SocketAddr}; | ||
|
||
use bytes::Bytes; | ||
use futures_channel::mpsc; | ||
use tokio::net::{lookup_host, ToSocketAddrs}; | ||
use tracing_core::subscriber::NoSubscriber; | ||
use tracing_futures::WithSubscriber; | ||
|
||
pub use tcp::*; | ||
pub use udp::*; | ||
|
||
/// A sequence of [errors](std::io::Error) which occurred during a connection attempt. | ||
/// | ||
/// These are paired with a [`SocketAddr`] because the connection attempt might fail multiple times | ||
/// during DNS resolution. | ||
#[derive(Debug)] | ||
pub struct ConnectionErrors(pub Vec<(SocketAddr, io::Error)>); | ||
|
||
/// Provides an interface for connecting (and reconnecting) to Graylog. Without an established | ||
/// connection logs will not be sent. Messages logged without an established connection will sit in | ||
/// the buffer until they can be drained. | ||
#[derive(Debug)] | ||
#[must_use] | ||
pub struct ConnectionHandle<A, Conn> { | ||
pub(crate) addr: A, | ||
pub(crate) receiver: mpsc::Receiver<Bytes>, | ||
pub(crate) conn: Conn, | ||
} | ||
|
||
impl<A, Conn> ConnectionHandle<A, Conn> { | ||
/// Returns the connection address. | ||
pub fn address(&self) -> &A { | ||
&self.addr | ||
} | ||
} | ||
|
||
impl<A> ConnectionHandle<A, TcpConnection> | ||
where | ||
A: ToSocketAddrs, | ||
{ | ||
/// Connects to Graylog via TCP using the address provided. | ||
/// | ||
/// This will perform DNS resolution and attempt to connect to each [`SocketAddr`] provided. | ||
pub async fn connect(&mut self) -> ConnectionErrors { | ||
// Do a DNS lookup if `addr` is a hostname | ||
let addrs = lookup_host(&self.addr).await.into_iter().flatten(); | ||
|
||
// Loop through the IP addresses that the hostname resolved to | ||
let mut errors = Vec::new(); | ||
for addr in addrs { | ||
let fut = self | ||
.conn | ||
.handle(addr, &mut self.receiver) | ||
.with_subscriber(NoSubscriber::default()); | ||
if let Err(err) = fut.await { | ||
errors.push((addr, err)); | ||
} | ||
} | ||
ConnectionErrors(errors) | ||
} | ||
} | ||
|
||
#[cfg(feature = "rustls-tls")] | ||
impl<A> ConnectionHandle<A, TlsConnection> | ||
where | ||
A: ToSocketAddrs, | ||
{ | ||
/// Connects to Graylog via TLS using the address provided. | ||
/// | ||
/// This will perform DNS resolution and attempt to connect to each [`SocketAddr`] provided. | ||
pub async fn connect(&mut self) -> ConnectionErrors { | ||
// Do a DNS lookup if `addr` is a hostname | ||
let addrs = lookup_host(&self.addr).await.into_iter().flatten(); | ||
|
||
// Loop through the IP addresses that the hostname resolved to | ||
let mut errors = Vec::new(); | ||
for addr in addrs { | ||
let fut = self | ||
.conn | ||
.handle(addr, &mut self.receiver) | ||
.with_subscriber(NoSubscriber::default()); | ||
if let Err(err) = fut.await { | ||
errors.push((addr, err)); | ||
} | ||
} | ||
ConnectionErrors(errors) | ||
} | ||
} | ||
|
||
impl<A> ConnectionHandle<A, UdpConnection> | ||
where | ||
A: ToSocketAddrs, | ||
{ | ||
/// Connects to Graylog via UDP using the address provided. | ||
/// | ||
/// This will perform DNS resolution and attempt to connect to each [`SocketAddr`] provided. | ||
pub async fn connect(&mut self) -> ConnectionErrors { | ||
// Do a DNS lookup if `addr` is a hostname | ||
let addrs = lookup_host(&self.addr).await.into_iter().flatten(); | ||
|
||
// Loop through the IP addresses that the hostname resolved to | ||
let mut errors = Vec::new(); | ||
for addr in addrs { | ||
let fut = self | ||
.conn | ||
.handle(addr, &mut self.receiver) | ||
.with_subscriber(NoSubscriber::default()); | ||
if let Err(err) = fut.await { | ||
errors.push((addr, err)); | ||
} | ||
} | ||
ConnectionErrors(errors) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
use std::{future::Future, net::SocketAddr}; | ||
|
||
use bytes::Bytes; | ||
use futures_util::{Stream, StreamExt}; | ||
use tokio::{io, net::TcpStream}; | ||
use tokio_util::codec::{BytesCodec, FramedWrite}; | ||
|
||
/// Handle TCP connection, generic over TCP/TLS via `F`. | ||
async fn handle_tcp<F, R, S, I>( | ||
addr: SocketAddr, | ||
f: F, | ||
receiver: &mut S, | ||
) -> Result<(), std::io::Error> | ||
where | ||
S: Stream<Item = Bytes>, | ||
S: Unpin, | ||
I: io::AsyncRead + io::AsyncWrite + Send + Unpin, | ||
F: FnOnce(TcpStream) -> R, | ||
R: Future<Output = Result<I, std::io::Error>> + Send, | ||
{ | ||
let tcp = TcpStream::connect(addr).await?; | ||
let wrapped = (f)(tcp).await?; | ||
let (_, writer) = io::split(wrapped); | ||
|
||
// Writer | ||
let sink = FramedWrite::new(writer, BytesCodec::new()); | ||
receiver.map(Ok).forward(sink).await?; | ||
|
||
Ok(()) | ||
} | ||
|
||
/// A TCP connection to Graylog. | ||
#[derive(Debug)] | ||
pub struct TcpConnection; | ||
|
||
impl TcpConnection { | ||
pub(super) async fn handle<S>( | ||
&self, | ||
addr: SocketAddr, | ||
receiver: &mut S, | ||
) -> Result<(), std::io::Error> | ||
where | ||
S: Stream<Item = Bytes> + Unpin, | ||
{ | ||
let wrapper = |tcp_stream| async { Ok(tcp_stream) }; | ||
handle_tcp(addr, wrapper, receiver).await | ||
} | ||
} | ||
|
||
/// A TLS connection to Graylog. | ||
#[cfg(feature = "rustls-tls")] | ||
pub struct TlsConnection { | ||
pub(crate) server_name: tokio_rustls::rustls::ServerName, | ||
pub(crate) client_config: std::sync::Arc<tokio_rustls::rustls::ClientConfig>, | ||
} | ||
|
||
#[cfg(feature = "rustls-tls")] | ||
impl TlsConnection { | ||
pub(super) async fn handle<S>( | ||
&self, | ||
addr: SocketAddr, | ||
receiver: &mut S, | ||
) -> Result<(), std::io::Error> | ||
where | ||
S: Stream<Item = Bytes> + Unpin, | ||
{ | ||
let wrapper = move |tcp_stream| { | ||
let server_name = self.server_name.clone(); | ||
let config = tokio_rustls::TlsConnector::from(self.client_config.clone()); | ||
|
||
config.connect(server_name, tcp_stream) | ||
}; | ||
handle_tcp(addr, wrapper, receiver).await | ||
} | ||
} | ||
|
||
#[cfg(feature = "rustls-tls")] | ||
impl std::fmt::Debug for TlsConnection { | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
f.debug_struct("TlsConnection") | ||
.field("server_name", &self.server_name) | ||
.finish_non_exhaustive() | ||
} | ||
} |
Oops, something went wrong.