diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ad8385cf5..b85a87a073 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ - Remove `buffer` entries and scrub array contents from MongoDB queries. ([#4186](https://github.com/getsentry/relay/pull/4186)) - Use `DateTime` instead of `Instant` for tracking the received time of the `Envelope`. ([#4184](https://github.com/getsentry/relay/pull/4184)) - Add a field to suggest consumers to ingest spans in EAP. ([#4206](https://github.com/getsentry/relay/pull/4206)) +- Run internal worker threads with a lower priority. ([#4222](https://github.com/getsentry/relay/pull/4222)) ## 24.10.0 diff --git a/Cargo.lock b/Cargo.lock index 9468546750..37d2bc6e82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2073,9 +2073,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.158" +version = "0.2.161" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439" +checksum = "8e9489c2807c139ffd9c1794f4af0ebe86a828db53ecdc7fea2111d0fed085d1" [[package]] name = "libfuzzer-sys" @@ -3776,6 +3776,7 @@ dependencies = [ "insta", "itertools 0.13.0", "json-forensics", + "libc", "liblzma", "mime", "minidump", diff --git a/Cargo.toml b/Cargo.toml index f938aed97c..0626ba44f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -109,8 +109,9 @@ insta = { version = "1.31.0", features = ["json", "redactions", "ron"] } ipnetwork = "0.20.0" itertools = "0.13.0" json-forensics = "0.1.1" -lru = "0.12.4" +libc = "0.2.161" liblzma = "0.3.4" +lru = "0.12.4" maxminddb = "0.24.0" memchr = "2.7.4" md5 = "0.7.0" diff --git a/relay-server/Cargo.toml b/relay-server/Cargo.toml index a7e7be4081..6bfaaeea77 100644 --- a/relay-server/Cargo.toml +++ b/relay-server/Cargo.toml @@ -49,6 +49,7 @@ hashbrown = { workspace = true } hyper-util = { workspace = true } itertools = { workspace = true } json-forensics = { workspace = true } +libc = { workspace = true } liblzma = { workspace = true } mime = { workspace = true } minidump = { workspace = true, optional = true } diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index ac953b13ff..a736fd17f3 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -19,7 +19,7 @@ use crate::services::stats::RelayStats; use crate::services::store::StoreService; use crate::services::test_store::{TestStore, TestStoreService}; use crate::services::upstream::{UpstreamRelay, UpstreamRelayService}; -use crate::utils::{MemoryChecker, MemoryStat}; +use crate::utils::{MemoryChecker, MemoryStat, ThreadKind}; use anyhow::{Context, Result}; use axum::extract::FromRequestParts; use axum::http::request::Parts; @@ -105,6 +105,7 @@ fn create_processor_pool(config: &Config) -> Result { let pool = crate::utils::ThreadPoolBuilder::new("processor") .num_threads(thread_count) + .thread_kind(ThreadKind::Worker) .runtime(tokio::runtime::Handle::current()) .build()?; @@ -114,7 +115,7 @@ fn create_processor_pool(config: &Config) -> Result { #[cfg(feature = "processing")] fn create_store_pool(config: &Config) -> Result { // Spawn a store worker for every 12 threads in the processor pool. - // This ratio was found emperically and may need adjustments in the future. + // This ratio was found empirically and may need adjustments in the future. // // Ideally in the future the store will be single threaded again, after we move // all the heavy processing (de- and re-serialization) into the processor. diff --git a/relay-server/src/utils/thread_pool.rs b/relay-server/src/utils/thread_pool.rs index 205a1f0dbf..7ef2dc675d 100644 --- a/relay-server/src/utils/thread_pool.rs +++ b/relay-server/src/utils/thread_pool.rs @@ -5,11 +5,24 @@ use tokio::runtime::Handle; pub use rayon::{ThreadPool, ThreadPoolBuildError}; use tokio::sync::Semaphore; +/// A thread kind. +/// +/// The thread kind has an effect on how threads are prioritized and scheduled. +#[derive(Default, Debug, Clone, Copy)] +pub enum ThreadKind { + /// The default kind, just a thread like any other without any special configuration. + #[default] + Default, + /// A worker thread is a CPU intensive task with a lower priority than the [`Self::Default`] kind. + Worker, +} + /// Used to create a new [`ThreadPool`] thread pool. pub struct ThreadPoolBuilder { name: &'static str, runtime: Option, num_threads: usize, + kind: ThreadKind, } impl ThreadPoolBuilder { @@ -19,10 +32,11 @@ impl ThreadPoolBuilder { name, runtime: None, num_threads: 0, + kind: ThreadKind::Default, } } - /// Sets the number of threads to be used in the rayon threadpool. + /// Sets the number of threads to be used in the rayon thread-pool. /// /// See also [`rayon::ThreadPoolBuilder::num_threads`]. pub fn num_threads(mut self, num_threads: usize) -> Self { @@ -30,7 +44,13 @@ impl ThreadPoolBuilder { self } - /// Sets the tokio runtime which will be made available in the workers. + /// Configures the [`ThreadKind`] for all threads spawned in the pool. + pub fn thread_kind(mut self, kind: ThreadKind) -> Self { + self.kind = kind; + self + } + + /// Sets the Tokio runtime which will be made available in the workers. pub fn runtime(mut self, runtime: Handle) -> Self { self.runtime = Some(runtime); self @@ -56,6 +76,7 @@ impl ThreadPoolBuilder { } let runtime = self.runtime.clone(); b.spawn(move || { + set_current_thread_priority(self.kind); let _guard = runtime.as_ref().map(|runtime| runtime.enter()); thread.run() })?; @@ -65,7 +86,7 @@ impl ThreadPoolBuilder { } } -/// A [`WorkerGroup`] adds an async backpressure mechanism to a [`ThreadPool`]. +/// A [`WorkerGroup`] adds an async back-pressure mechanism to a [`ThreadPool`]. pub struct WorkerGroup { pool: ThreadPool, semaphore: Arc, @@ -116,6 +137,34 @@ impl WorkerGroup { } } +#[cfg(unix)] +fn set_current_thread_priority(kind: ThreadKind) { + // Lower priorities cause more favorable scheduling. + // Higher priorities cause less favorable scheduling. + // + // For details see `man setpriority(2)`. + let prio = match kind { + // The default priority needs no change, and defaults to `0`. + ThreadKind::Default => return, + // Set a priority of `10` for worker threads, + // it's just important that this is a higher priority than default. + ThreadKind::Worker => 10, + }; + if unsafe { libc::setpriority(libc::PRIO_PROCESS, 0, prio) } != 0 { + // Clear the `errno` and log it. + let error = std::io::Error::last_os_error(); + relay_log::warn!( + error = &error as &dyn std::error::Error, + "failed to set thread priority for a {kind:?} thread: {error:?}" + ); + }; +} + +#[cfg(not(unix))] +fn set_current_thread_priority(_kind: ThreadKind) { + // Ignored for non-Unix platforms. +} + #[cfg(test)] mod tests { use std::sync::Barrier; @@ -176,6 +225,34 @@ mod tests { barrier.wait(); } + #[test] + #[cfg(unix)] + fn test_thread_pool_priority() { + fn get_current_priority() -> i32 { + unsafe { libc::getpriority(libc::PRIO_PROCESS, 0) } + } + + let default_prio = get_current_priority(); + + { + let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap(); + let prio = pool.install(get_current_priority); + // Default pool priority must match current priority. + assert_eq!(prio, default_prio); + } + + { + let pool = ThreadPoolBuilder::new("s") + .num_threads(1) + .thread_kind(ThreadKind::Worker) + .build() + .unwrap(); + let prio = pool.install(get_current_priority); + // Worker must be higher than the default priority (higher number = lower priority). + assert!(prio > default_prio); + } + } + #[test] fn test_worker_group_backpressure() { let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap();