From fe4216695d3ea65c29727684088baeada75c9c46 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Thu, 29 Feb 2024 04:15:35 +0100 Subject: [PATCH] perf: auto-tune concurrency budget (#14753) --- crates/polars-io/src/parquet/async_impl.rs | 8 +- crates/polars-io/src/pl_async.rs | 188 ++++++++++++++++++++- 2 files changed, 187 insertions(+), 9 deletions(-) diff --git a/crates/polars-io/src/parquet/async_impl.rs b/crates/polars-io/src/parquet/async_impl.rs index a8287069e2b6..d726d6d34f05 100644 --- a/crates/polars-io/src/parquet/async_impl.rs +++ b/crates/polars-io/src/parquet/async_impl.rs @@ -18,7 +18,9 @@ use super::cloud::{build_object_store, CloudLocation, CloudReader}; use super::mmap::ColumnStore; use crate::cloud::CloudOptions; use crate::parquet::read_impl::compute_row_group_range; -use crate::pl_async::{get_runtime, with_concurrency_budget, MAX_BUDGET_PER_REQUEST}; +use crate::pl_async::{ + get_runtime, tune_with_concurrency_budget, with_concurrency_budget, MAX_BUDGET_PER_REQUEST, +}; use crate::predicates::PhysicalIoExpr; use crate::prelude::predicates::read_this_row_group; @@ -50,7 +52,7 @@ impl ParquetObjectStore { } async fn get_range(&self, start: usize, length: usize) -> PolarsResult { - with_concurrency_budget(1, || async { + tune_with_concurrency_budget(1, || async { self.store .get_range(&self.path, start..start + length) .await @@ -61,7 +63,7 @@ impl ParquetObjectStore { async fn get_ranges(&self, ranges: &[Range]) -> PolarsResult> { // Object-store has a maximum of 10 concurrent. - with_concurrency_budget( + tune_with_concurrency_budget( (ranges.len() as u32).clamp(0, MAX_BUDGET_PER_REQUEST as u32), || async { self.store diff --git a/crates/polars-io/src/pl_async.rs b/crates/polars-io/src/pl_async.rs index d5868d9cd4b9..f067c60c454d 100644 --- a/crates/polars-io/src/pl_async.rs +++ b/crates/polars-io/src/pl_async.rs @@ -1,9 +1,12 @@ +use std::error::Error; use std::future::Future; use std::ops::Deref; +use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering}; use std::sync::RwLock; use std::thread::ThreadId; use once_cell::sync::Lazy; +use polars_core::config::verbose; use polars_core::POOL; use polars_utils::aliases::PlHashSet; use tokio::runtime::{Builder, Runtime}; @@ -12,17 +15,189 @@ use tokio::sync::Semaphore; static CONCURRENCY_BUDGET: std::sync::OnceLock<(Semaphore, u32)> = std::sync::OnceLock::new(); pub(super) const MAX_BUDGET_PER_REQUEST: usize = 10; +pub trait GetSize { + fn size(&self) -> u64; +} + +impl GetSize for bytes::Bytes { + fn size(&self) -> u64 { + self.len() as u64 + } +} + +impl GetSize for Vec { + fn size(&self) -> u64 { + self.iter().map(|v| v.size()).sum() + } +} + +impl GetSize for Result { + fn size(&self) -> u64 { + match self { + Ok(v) => v.size(), + Err(_) => 0, + } + } +} + +#[derive(Debug, Copy, Clone)] +enum Optimization { + Step, + Accept, + Finished, +} + +struct SemaphoreTuner { + previous_download_speed: u64, + last_tune: std::time::Instant, + downloaded: AtomicU64, + download_time: AtomicU64, + opt_state: Optimization, + increments: u32, +} + +impl SemaphoreTuner { + fn new() -> Self { + Self { + previous_download_speed: 0, + last_tune: std::time::Instant::now(), + downloaded: AtomicU64::new(0), + download_time: AtomicU64::new(0), + opt_state: Optimization::Step, + increments: 0, + } + } + fn should_tune(&self) -> bool { + match self.opt_state { + Optimization::Finished => false, + _ => self.last_tune.elapsed().as_millis() > 350, + } + } + + fn add_stats(&self, downloaded_bytes: u64, download_time: u64) { + self.downloaded + .fetch_add(downloaded_bytes, Ordering::Relaxed); + self.download_time + .fetch_add(download_time, Ordering::Relaxed); + } + + fn increment(&mut self, semaphore: &Semaphore) { + semaphore.add_permits(1); + self.increments += 1; + } + + fn tune(&mut self, semaphore: &'static Semaphore) -> bool { + let download_speed = self.downloaded.fetch_add(0, Ordering::Relaxed) + / self.download_time.fetch_add(0, Ordering::Relaxed); + + let increased = download_speed > self.previous_download_speed; + self.previous_download_speed = download_speed; + match self.opt_state { + Optimization::Step => { + self.increment(semaphore); + self.opt_state = Optimization::Accept + }, + Optimization::Accept => { + // Accept the step + if increased { + // Set new step + self.increment(semaphore); + // Keep accept state to check next iteration + } + // Decline the step + else { + self.opt_state = Optimization::Finished; + FINISHED_TUNING.store(true, Ordering::Relaxed); + if verbose() { + eprintln!( + "concurrency tuner finished after adding {} steps", + self.increments + ) + } + // Finished. + return true; + } + }, + Optimization::Finished => {}, + } + self.last_tune = std::time::Instant::now(); + // Not finished. + false + } +} +static INCR: AtomicU8 = AtomicU8::new(0); +static FINISHED_TUNING: AtomicBool = AtomicBool::new(false); +static PERMIT_STORE: std::sync::OnceLock> = + std::sync::OnceLock::new(); + +fn get_semaphore() -> &'static (Semaphore, u32) { + CONCURRENCY_BUDGET.get_or_init(|| { + let permits = std::env::var("POLARS_CONCURRENCY_BUDGET") + .map(|s| s.parse::().expect("integer")) + .unwrap_or_else(|_| std::cmp::max(POOL.current_num_threads(), MAX_BUDGET_PER_REQUEST)); + (Semaphore::new(permits), permits as u32) + }) +} + +pub async fn tune_with_concurrency_budget(requested_budget: u32, callable: F) -> Fut::Output +where + F: FnOnce() -> Fut, + Fut: Future, + Fut::Output: GetSize, +{ + let (semaphore, initial_budget) = get_semaphore(); + + // This would never finish otherwise. + assert!(requested_budget <= *initial_budget); + + // Keep permit around. + // On drop it is returned to the semaphore. + let _permit_acq = semaphore.acquire_many(requested_budget).await.unwrap(); + + let now = std::time::Instant::now(); + let res = callable().await; + + if FINISHED_TUNING.load(Ordering::Relaxed) || res.size() == 0 { + return res; + } + + let duration = now.elapsed().as_millis() as u64; + let permit_store = PERMIT_STORE.get_or_init(|| tokio::sync::RwLock::new(SemaphoreTuner::new())); + + let tuner = permit_store.read().await; + // Keep track of download speed + tuner.add_stats(res.size(), duration); + + // We only tune every n ms + if !tuner.should_tune() { + return res; + } + // Drop the read tuner before trying to acquire a writer + drop(tuner); + + // Reduce locking by letting only 1 in 5 tasks lock the tuner + if (INCR.fetch_add(1, Ordering::Relaxed) % 5) != 0 { + return res; + } + // Never lock as we will deadlock. This can run under rayon + let Ok(mut tuner) = permit_store.try_write() else { + return res; + }; + let finished = tuner.tune(semaphore); + if finished { + // Undo the last step + let undo = semaphore.acquire().await.unwrap(); + std::mem::forget(undo) + } + res +} + pub async fn with_concurrency_budget(requested_budget: u32, callable: F) -> Fut::Output where F: FnOnce() -> Fut, Fut: Future, { - let (semaphore, initial_budget) = CONCURRENCY_BUDGET.get_or_init(|| { - let permits = std::env::var("POLARS_CONCURRENCY_BUDGET") - .map(|s| s.parse::().expect("integer")) - .unwrap_or_else(|_| std::cmp::max(POOL.current_num_threads(), MAX_BUDGET_PER_REQUEST)); - (Semaphore::new(permits), permits as u32) - }); + let (semaphore, initial_budget) = get_semaphore(); // This would never finish otherwise. assert!(requested_budget <= *initial_budget); @@ -30,6 +205,7 @@ where // Keep permit around. // On drop it is returned to the semaphore. let _permit_acq = semaphore.acquire_many(requested_budget).await.unwrap(); + callable().await }