Skip to content

Commit

Permalink
instr(buffer): Measure busy time in nanoseconds (#4488)
Browse files Browse the repository at this point in the history
We're underreporting buffer busy time, presumably because the individual
submissions are rounded to milliseconds.

Use nanosecond precision instead, and use a counter metric instead of a
distribution because we only care about the sum.
  • Loading branch information
jjbayer authored Feb 6, 2025
1 parent d6f592e commit 24e34e1
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 11 deletions.
26 changes: 21 additions & 5 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::services::processor::{EnvelopeProcessor, ProcessEnvelope, ProcessingG
use crate::services::projects::cache::{CheckedEnvelope, ProjectCacheHandle, ProjectChange};
use crate::services::test_store::TestStore;
use crate::statsd::RelayCounters;
use crate::statsd::RelayTimers;

use crate::utils::ManagedEnvelope;
use crate::MemoryChecker;
use crate::MemoryStat;
Expand Down Expand Up @@ -572,13 +572,29 @@ impl Service for EnvelopeBufferService {
relay_log::info!("EnvelopeBufferService {}: starting", self.partition_id);
loop {
let mut sleep = DEFAULT_SLEEP;

macro_rules! measure_busy {
($input:expr, $block:block) => {
let start = Instant::now();
{
$block
}

relay_statsd::metric!(
counter(RelayCounters::BufferBusy) += start.elapsed().as_nanos() as u64,
input = $input,
partition_id = &partition_tag
);
};
}

tokio::select! {
// NOTE: we do not select a bias here.
// On the one hand, we might want to prioritize dequeuing over enqueuing
// so we do not exceed the buffer capacity by starving the dequeue.
// on the other hand, prioritizing old messages violates the LIFO design.
_ = self.ready_to_pop(&buffer, dequeue.load(Ordering::Relaxed)) => {
relay_statsd::metric!(timer(RelayTimers::BufferBusy), input = "pop", partition_id = &partition_tag, {
measure_busy!("pop", {
match Self::try_pop(&partition_tag, &config, &mut buffer, &services).await {
Ok(new_sleep) => {
sleep = new_sleep;
Expand All @@ -592,7 +608,7 @@ impl Service for EnvelopeBufferService {
}});
}
change = project_changes.recv() => {
relay_statsd::metric!(timer(RelayTimers::BufferBusy), input = "project_change", partition_id = &partition_tag, {
measure_busy!("project_change", {
match change {
Ok(ProjectChange::Ready(project_key)) => {
buffer.mark_ready(&project_key, true);
Expand All @@ -608,13 +624,13 @@ impl Service for EnvelopeBufferService {
}
Some(message) = rx.recv() => {
let message_name = message.name();
relay_statsd::metric!(timer(RelayTimers::BufferBusy), input = message_name, partition_id = &partition_tag, {
measure_busy!(message_name, {
Self::handle_message(&mut buffer, message).await;
sleep = Duration::ZERO;
});
}
shutdown = shutdown.notified() => {
relay_statsd::metric!(timer(RelayTimers::BufferBusy), input = "shutdown", partition_id = &partition_tag, {
measure_busy!("shutdown", {
// In case the shutdown was handled, we break out of the loop signaling that
// there is no need to process anymore envelopes.
if Self::handle_shutdown(&mut buffer, shutdown).await {
Expand Down
13 changes: 7 additions & 6 deletions relay-server/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,11 +509,6 @@ pub enum RelayTimers {
StoreServiceDuration,
/// Timing in milliseconds for the time it takes for initialize the buffer.
BufferInitialization,
/// Timing in milliseconds for the time the buffer service spends handling input.
///
/// This metric is tagged with:
/// - `input`: The type of input that the service is handling.
BufferBusy,
/// Timing in milliseconds for the time it takes for the buffer to pack & spool a batch.
///
/// Contains the time it takes to pack multiple envelopes into a single memory blob.
Expand Down Expand Up @@ -580,7 +575,6 @@ impl TimerMetric for RelayTimers {
#[cfg(feature = "processing")]
RelayTimers::StoreServiceDuration => "store.message.duration",
RelayTimers::BufferInitialization => "buffer.initialization.duration",
RelayTimers::BufferBusy => "buffer.busy",
RelayTimers::BufferSpool => "buffer.spool.duration",
RelayTimers::BufferSqlWrite => "buffer.write.duration",
RelayTimers::BufferUnspool => "buffer.unspool.duration",
Expand Down Expand Up @@ -639,6 +633,12 @@ pub enum RelayCounters {
/// Number of times one or more projects of an envelope were pending when trying to pop
/// their envelope.
BufferProjectPending,
/// Timing in nanoseconds for the time the buffer service spends handling input.
///
/// This metric is tagged with:
/// - `input`: The type of input that the service is handling.
/// - `partition_id`: The ID of the buffer shard (0, 1, 2, ...)
BufferBusy,
///
/// Number of outcomes and reasons for rejected Envelopes.
///
Expand Down Expand Up @@ -846,6 +846,7 @@ impl CounterMetric for RelayCounters {
RelayCounters::BufferUnspooledEnvelopes => "buffer.unspooled_envelopes",
RelayCounters::BufferProjectChangedEvent => "buffer.project_changed_event",
RelayCounters::BufferProjectPending => "buffer.project_pending",
RelayCounters::BufferBusy => "buffer.busy",
RelayCounters::Outcomes => "events.outcomes",
RelayCounters::ProjectStateRequest => "project_state.request",
#[cfg(feature = "processing")]
Expand Down

0 comments on commit 24e34e1

Please sign in to comment.