From e298db0dbac0bdc78ea95cbe7def52e63b0a3b0b Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Fri, 6 Sep 2024 11:18:47 +0200 Subject: [PATCH 1/2] wip --- relay-kafka/src/producer/mod.rs | 15 +++++++++++++++ relay-server/src/services/store.rs | 24 ++++++++++++++++++------ 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/relay-kafka/src/producer/mod.rs b/relay-kafka/src/producer/mod.rs index c4c137faa6..30d85c32cd 100644 --- a/relay-kafka/src/producer/mod.rs +++ b/relay-kafka/src/producer/mod.rs @@ -186,6 +186,21 @@ impl KafkaClient { })?; producer.send(key, headers, variant, payload) } + + /// Triggers a flush of all messages. + pub fn flush(&self, timeout: Duration) { + for (topic, producer) in &self.producers { + tokio::spawn(async move { + if let Err(e) = producer.producer.flush(timeout) { + relay_log::error!( + error = &e as &dyn std::error::Error, + tags.topic = ?topic, + "error while flushing kafka topic" + ); + } + }) + } + } } /// Helper structure responsible for building the actual [`KafkaClient`]. diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 1af723ed68..34205e4497 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -22,7 +22,7 @@ use relay_metrics::{ }; use relay_quotas::Scoping; use relay_statsd::metric; -use relay_system::{Addr, FromMessage, Interface, NoResponse, Service}; +use relay_system::{Addr, Controller, FromMessage, Interface, NoResponse, Service, Shutdown}; use serde::{Deserialize, Serialize}; use serde_json::value::RawValue; use serde_json::Deserializer; @@ -164,6 +164,10 @@ impl StoreService { }) } + fn flush(&self, timeout: Duration) { + self.producer.client.flush(timeout); + } + fn handle_store_envelope(&self, message: StoreEnvelope) { let StoreEnvelope { envelope: mut managed, @@ -1047,14 +1051,22 @@ impl Service for StoreService { fn spawn_handler(self, mut rx: relay_system::Receiver) { let this = Arc::new(self); + let mut shutdown = Controller::shutdown_handle(); + tokio::spawn(async move { relay_log::info!("store forwarder started"); - while let Some(message) = rx.recv().await { - let service = Arc::clone(&this); - this.workers - .spawn(move || service.handle_message(message)) - .await; + loop { + tokio::select! { + Some(message) = rx.recv() => { + let service = Arc::clone(&this); + this.workers + .spawn(move || service.handle_message(message)) + .await; + }, + Shutdown{ timeout: Some(timeout) } = shutdown.notified() => this.flush(), + else => break, + } } relay_log::info!("store forwarder stopped"); From 4f9ad1ceeda931eb2788562c13dfe9a2a9674ba0 Mon Sep 17 00:00:00 2001 From: Joris Bayer Date: Mon, 9 Sep 2024 16:10:43 +0200 Subject: [PATCH 2/2] ref: async --- relay-kafka/src/producer/mod.rs | 22 ++++++++++++---------- relay-server/src/services/store.rs | 9 +++++++-- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/relay-kafka/src/producer/mod.rs b/relay-kafka/src/producer/mod.rs index 30d85c32cd..b08131f494 100644 --- a/relay-kafka/src/producer/mod.rs +++ b/relay-kafka/src/producer/mod.rs @@ -187,18 +187,20 @@ impl KafkaClient { producer.send(key, headers, variant, payload) } - /// Triggers a flush of all messages. + /// Flush all messages. pub fn flush(&self, timeout: Duration) { + let start = Instant::now(); for (topic, producer) in &self.producers { - tokio::spawn(async move { - if let Err(e) = producer.producer.flush(timeout) { - relay_log::error!( - error = &e as &dyn std::error::Error, - tags.topic = ?topic, - "error while flushing kafka topic" - ); - } - }) + if let Err(e) = producer + .producer + .flush(timeout.saturating_sub(start.elapsed())) + { + relay_log::error!( + error = &e as &dyn std::error::Error, + tags.topic = ?topic, + "error while flushing kafka topic" + ); + } } } } diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 34205e4497..2c03d76d10 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -6,7 +6,7 @@ use std::borrow::Cow; use std::collections::BTreeMap; use std::error::Error; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use bytes::Bytes; use relay_base_schema::data_category::DataCategory; @@ -1064,7 +1064,12 @@ impl Service for StoreService { .spawn(move || service.handle_message(message)) .await; }, - Shutdown{ timeout: Some(timeout) } = shutdown.notified() => this.flush(), + Shutdown{ timeout: Some(timeout) } = shutdown.notified() => { + let service = Arc::clone(&this); + this.workers.spawn(move || { + service.flush(timeout); + }).await; + }, else => break, } }