Skip to content

Commit

Permalink
Log errors and connection establishment to the GELF server (#16)
Browse files Browse the repository at this point in the history
Previously, connecting to the wrong IP address or port would fail
silently. Now, it's at least logged (to some other logging interface,
should one be registered).

Also move the logging suppression to the inner sending loop, there
should be no harm in logging the connection establishment phase.
  • Loading branch information
hrxi authored Mar 13, 2022
1 parent cd78830 commit dee8eed
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 38 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ thiserror = "1"
tokio = { version = "1", features = ["io-util", "net", "time"] }
tokio-rustls = { version = "0.23", optional = true }
tokio-util = { version = "0.6", features = ["codec", "net"] }
tracing = "0.1"
tracing-core = "0.1"
tracing-futures = "0.2"
tracing-subscriber = "0.3"
Expand Down
80 changes: 42 additions & 38 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,26 +278,23 @@ impl Builder {
let (sender, receiver) = mpsc::channel::<Bytes>(buffer);
let mut ok_receiver = receiver.map(Ok);

let bg_task = Box::pin(
async move {
// Reconnection loop
loop {
// Do a DNS lookup if `addr` is a hostname
let addrs = lookup_host(&addr).await.into_iter().flatten();

// Loop through the IP addresses that the hostname resolved to
for addr in addrs {
if let Err(_err) = handle_tcp_connection(addr, &f, &mut ok_receiver).await {
// TODO: Add handler
}
let bg_task = Box::pin(async move {
// Reconnection loop
loop {
// Do a DNS lookup if `addr` is a hostname
let addrs = lookup_host(&addr).await.into_iter().flatten();

// Loop through the IP addresses that the hostname resolved to
for resolved in addrs {
if let Err(err) = handle_tcp_connection(resolved, &f, &mut ok_receiver).await {
tracing::error!(error = %err, %resolved, "can't connect to graylog TCP backend");
}

// Sleep before re-attempting
time::sleep(time::Duration::from_millis(timeout_ms as u64)).await;
}

// Sleep before re-attempting
time::sleep(time::Duration::from_millis(timeout_ms as u64)).await;
}
.with_subscriber(NoSubscriber),
);
});

let logger = Logger {
base_object,
Expand Down Expand Up @@ -420,26 +417,23 @@ impl Builder {
// Construct background task
let (sender, mut receiver) = mpsc::channel::<Bytes>(buffer);

let bg_task = Box::pin(
async move {
// Reconnection loop
loop {
// Do a DNS lookup if `addr` is a hostname
let addrs = lookup_host(&addr).await.into_iter().flatten();

// Loop through the IP addresses that the hostname resolved to
for addr in addrs {
if let Err(_err) = handle_udp_connection(addr, &mut receiver).await {
// TODO: Add handler
}
}
let bg_task = Box::pin(async move {
// Reconnection loop
loop {
// Do a DNS lookup if `addr` is a hostname
let addrs = lookup_host(&addr).await.into_iter().flatten();

// Sleep before re-attempting
time::sleep(time::Duration::from_millis(timeout_ms as u64)).await;
// Loop through the IP addresses that the hostname resolved to
for resolved in addrs {
if let Err(err) = handle_udp_connection(resolved, &mut receiver).await {
tracing::error!(error = %err, %resolved, "can't connect to graylog TCP backend");
}
}

// Sleep before re-attempting
time::sleep(time::Duration::from_millis(timeout_ms as u64)).await;
}
.with_subscriber(NoSubscriber),
);
});
let logger = Logger {
base_object,
file_names: self.file_names,
Expand Down Expand Up @@ -621,7 +615,7 @@ where

// Writer
let mut sink = FramedWrite::new(writer, BytesCodec::new());
sink.send_all(receiver).await?;
sink.send_all(receiver).with_subscriber(NoSubscriber).await?;

Ok(())
}
Expand All @@ -643,9 +637,19 @@ where
// Writer
let udp_stream = UdpFramed::new(udp_socket, BytesCodec::new());
let (mut sink, _) = udp_stream.split();
while let Some(bytes) = receiver.next().await {
sink.send((bytes, addr)).await?;
}
async {
let mut result = Ok(());
while let Some(bytes) = receiver.next().await {
match sink.send((bytes, addr)).await {
Ok(_) => {},
Err(e) => {
result = Err(e);
break;
}
}
}
result
}.with_subscriber(NoSubscriber).await?;

Ok(())
}

0 comments on commit dee8eed

Please sign in to comment.