Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(server): Run processor threads with lower priority #4222

Merged
merged 3 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
- Remove `buffer` entries and scrub array contents from MongoDB queries. ([#4186](https://github.com/getsentry/relay/pull/4186))
- Use `DateTime<Utc>` instead of `Instant` for tracking the received time of the `Envelope`. ([#4184](https://github.com/getsentry/relay/pull/4184))
- Add a field to suggest consumers to ingest spans in EAP. ([#4206](https://github.com/getsentry/relay/pull/4206))
- Run internal worker threads with a lower priority. ([#4222](https://github.com/getsentry/relay/pull/4222))

## 24.10.0

Expand Down
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ insta = { version = "1.31.0", features = ["json", "redactions", "ron"] }
ipnetwork = "0.20.0"
itertools = "0.13.0"
json-forensics = "0.1.1"
lru = "0.12.4"
libc = "0.2.161"
liblzma = "0.3.4"
lru = "0.12.4"
maxminddb = "0.24.0"
memchr = "2.7.4"
md5 = "0.7.0"
Expand Down
1 change: 1 addition & 0 deletions relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ hashbrown = { workspace = true }
hyper-util = { workspace = true }
itertools = { workspace = true }
json-forensics = { workspace = true }
libc = { workspace = true }
liblzma = { workspace = true }
mime = { workspace = true }
minidump = { workspace = true, optional = true }
Expand Down
5 changes: 3 additions & 2 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::services::stats::RelayStats;
use crate::services::store::StoreService;
use crate::services::test_store::{TestStore, TestStoreService};
use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
use crate::utils::{MemoryChecker, MemoryStat};
use crate::utils::{MemoryChecker, MemoryStat, ThreadKind};
use anyhow::{Context, Result};
use axum::extract::FromRequestParts;
use axum::http::request::Parts;
Expand Down Expand Up @@ -105,6 +105,7 @@ fn create_processor_pool(config: &Config) -> Result<ThreadPool> {

let pool = crate::utils::ThreadPoolBuilder::new("processor")
.num_threads(thread_count)
.thread_kind(ThreadKind::Worker)
.runtime(tokio::runtime::Handle::current())
.build()?;

Expand All @@ -114,7 +115,7 @@ fn create_processor_pool(config: &Config) -> Result<ThreadPool> {
#[cfg(feature = "processing")]
fn create_store_pool(config: &Config) -> Result<ThreadPool> {
// Spawn a store worker for every 12 threads in the processor pool.
// This ratio was found emperically and may need adjustments in the future.
// This ratio was found empirically and may need adjustments in the future.
//
// Ideally in the future the store will be single threaded again, after we move
// all the heavy processing (de- and re-serialization) into the processor.
Expand Down
83 changes: 80 additions & 3 deletions relay-server/src/utils/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,24 @@ use tokio::runtime::Handle;
pub use rayon::{ThreadPool, ThreadPoolBuildError};
use tokio::sync::Semaphore;

/// A thread kind.
///
/// The thread kind has an effect on how threads are prioritized and scheduled.
#[derive(Default, Debug, Clone, Copy)]
pub enum ThreadKind {
/// The default kind, just a thread like any other without any special configuration.
#[default]
Default,
/// A worker thread is a CPU intensive task with a lower priority than the [`Self::Default`] kind.
Worker,
}

/// Used to create a new [`ThreadPool`] thread pool.
pub struct ThreadPoolBuilder {
name: &'static str,
runtime: Option<Handle>,
num_threads: usize,
kind: ThreadKind,
}

impl ThreadPoolBuilder {
Expand All @@ -19,18 +32,25 @@ impl ThreadPoolBuilder {
name,
runtime: None,
num_threads: 0,
kind: ThreadKind::Default,
}
}

/// Sets the number of threads to be used in the rayon threadpool.
/// Sets the number of threads to be used in the rayon thread-pool.
///
/// See also [`rayon::ThreadPoolBuilder::num_threads`].
pub fn num_threads(mut self, num_threads: usize) -> Self {
self.num_threads = num_threads;
self
}

/// Sets the tokio runtime which will be made available in the workers.
/// Configures the [`ThreadKind`] for all threads spawned in the pool.
pub fn thread_kind(mut self, kind: ThreadKind) -> Self {
self.kind = kind;
self
}

/// Sets the Tokio runtime which will be made available in the workers.
pub fn runtime(mut self, runtime: Handle) -> Self {
self.runtime = Some(runtime);
self
Expand All @@ -56,6 +76,7 @@ impl ThreadPoolBuilder {
}
let runtime = self.runtime.clone();
b.spawn(move || {
set_current_thread_priority(self.kind);
let _guard = runtime.as_ref().map(|runtime| runtime.enter());
thread.run()
})?;
Expand All @@ -65,7 +86,7 @@ impl ThreadPoolBuilder {
}
}

/// A [`WorkerGroup`] adds an async backpressure mechanism to a [`ThreadPool`].
/// A [`WorkerGroup`] adds an async back-pressure mechanism to a [`ThreadPool`].
pub struct WorkerGroup {
pool: ThreadPool,
semaphore: Arc<Semaphore>,
Expand Down Expand Up @@ -116,6 +137,34 @@ impl WorkerGroup {
}
}

#[cfg(unix)]
fn set_current_thread_priority(kind: ThreadKind) {
// Lower priorities cause more favorable scheduling.
// Higher priorities cause less favorable scheduling.
//
// For details see `man setpriority(2)`.
let prio = match kind {
// The default priority needs no change, and defaults to `0`.
ThreadKind::Default => return,
// Set a priority of `10` for worker threads,
// it's just important that this is a higher priority than default.
ThreadKind::Worker => 10,
};
if unsafe { libc::setpriority(libc::PRIO_PROCESS, 0, prio) } != 0 {
// Clear the `errno` and log it.
let error = std::io::Error::last_os_error();
relay_log::warn!(
error = &error as &dyn std::error::Error,
"failed to set thread priority for a {kind:?} thread: {error:?}"
);
};
}

#[cfg(not(unix))]
fn set_current_thread_priority(_kind: ThreadKind) {
// Ignored for non-Unix platforms.
}

#[cfg(test)]
mod tests {
use std::sync::Barrier;
Expand Down Expand Up @@ -176,6 +225,34 @@ mod tests {
barrier.wait();
}

#[test]
#[cfg(unix)]
fn test_thread_pool_priority() {
fn get_current_priority() -> i32 {
unsafe { libc::getpriority(libc::PRIO_PROCESS, 0) }
}

let default_prio = get_current_priority();

{
let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap();
let prio = pool.install(get_current_priority);
// Default pool priority must match current priority.
assert_eq!(prio, default_prio);
}

{
let pool = ThreadPoolBuilder::new("s")
.num_threads(1)
.thread_kind(ThreadKind::Worker)
.build()
.unwrap();
let prio = pool.install(get_current_priority);
// Worker must be higher than the default priority (higher number = lower priority).
assert!(prio > default_prio);
}
}

#[test]
fn test_worker_group_backpressure() {
let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap();
Expand Down
Loading