Skip to content

Commit

Permalink
perf: auto-tune concurrency budget (#14753)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Feb 29, 2024
1 parent caff0f9 commit fe42166
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 9 deletions.
8 changes: 5 additions & 3 deletions crates/polars-io/src/parquet/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use super::cloud::{build_object_store, CloudLocation, CloudReader};
use super::mmap::ColumnStore;
use crate::cloud::CloudOptions;
use crate::parquet::read_impl::compute_row_group_range;
use crate::pl_async::{get_runtime, with_concurrency_budget, MAX_BUDGET_PER_REQUEST};
use crate::pl_async::{
get_runtime, tune_with_concurrency_budget, with_concurrency_budget, MAX_BUDGET_PER_REQUEST,
};
use crate::predicates::PhysicalIoExpr;
use crate::prelude::predicates::read_this_row_group;

Expand Down Expand Up @@ -50,7 +52,7 @@ impl ParquetObjectStore {
}

async fn get_range(&self, start: usize, length: usize) -> PolarsResult<Bytes> {
with_concurrency_budget(1, || async {
tune_with_concurrency_budget(1, || async {
self.store
.get_range(&self.path, start..start + length)
.await
Expand All @@ -61,7 +63,7 @@ impl ParquetObjectStore {

async fn get_ranges(&self, ranges: &[Range<usize>]) -> PolarsResult<Vec<Bytes>> {
// Object-store has a maximum of 10 concurrent.
with_concurrency_budget(
tune_with_concurrency_budget(
(ranges.len() as u32).clamp(0, MAX_BUDGET_PER_REQUEST as u32),
|| async {
self.store
Expand Down
188 changes: 182 additions & 6 deletions crates/polars-io/src/pl_async.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::error::Error;
use std::future::Future;
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering};
use std::sync::RwLock;
use std::thread::ThreadId;

use once_cell::sync::Lazy;
use polars_core::config::verbose;
use polars_core::POOL;
use polars_utils::aliases::PlHashSet;
use tokio::runtime::{Builder, Runtime};
Expand All @@ -12,24 +15,197 @@ use tokio::sync::Semaphore;
static CONCURRENCY_BUDGET: std::sync::OnceLock<(Semaphore, u32)> = std::sync::OnceLock::new();
pub(super) const MAX_BUDGET_PER_REQUEST: usize = 10;

pub trait GetSize {
fn size(&self) -> u64;
}

impl GetSize for bytes::Bytes {
fn size(&self) -> u64 {
self.len() as u64
}
}

impl<T: GetSize> GetSize for Vec<T> {
fn size(&self) -> u64 {
self.iter().map(|v| v.size()).sum()
}
}

impl<T: GetSize, E: Error> GetSize for Result<T, E> {
fn size(&self) -> u64 {
match self {
Ok(v) => v.size(),
Err(_) => 0,
}
}
}

#[derive(Debug, Copy, Clone)]
enum Optimization {
Step,
Accept,
Finished,
}

struct SemaphoreTuner {
previous_download_speed: u64,
last_tune: std::time::Instant,
downloaded: AtomicU64,
download_time: AtomicU64,
opt_state: Optimization,
increments: u32,
}

impl SemaphoreTuner {
fn new() -> Self {
Self {
previous_download_speed: 0,
last_tune: std::time::Instant::now(),
downloaded: AtomicU64::new(0),
download_time: AtomicU64::new(0),
opt_state: Optimization::Step,
increments: 0,
}
}
fn should_tune(&self) -> bool {
match self.opt_state {
Optimization::Finished => false,
_ => self.last_tune.elapsed().as_millis() > 350,
}
}

fn add_stats(&self, downloaded_bytes: u64, download_time: u64) {
self.downloaded
.fetch_add(downloaded_bytes, Ordering::Relaxed);
self.download_time
.fetch_add(download_time, Ordering::Relaxed);
}

fn increment(&mut self, semaphore: &Semaphore) {
semaphore.add_permits(1);
self.increments += 1;
}

fn tune(&mut self, semaphore: &'static Semaphore) -> bool {
let download_speed = self.downloaded.fetch_add(0, Ordering::Relaxed)
/ self.download_time.fetch_add(0, Ordering::Relaxed);

let increased = download_speed > self.previous_download_speed;
self.previous_download_speed = download_speed;
match self.opt_state {
Optimization::Step => {
self.increment(semaphore);
self.opt_state = Optimization::Accept
},
Optimization::Accept => {
// Accept the step
if increased {
// Set new step
self.increment(semaphore);
// Keep accept state to check next iteration
}
// Decline the step
else {
self.opt_state = Optimization::Finished;
FINISHED_TUNING.store(true, Ordering::Relaxed);
if verbose() {
eprintln!(
"concurrency tuner finished after adding {} steps",
self.increments
)
}
// Finished.
return true;
}
},
Optimization::Finished => {},
}
self.last_tune = std::time::Instant::now();
// Not finished.
false
}
}
static INCR: AtomicU8 = AtomicU8::new(0);
static FINISHED_TUNING: AtomicBool = AtomicBool::new(false);
static PERMIT_STORE: std::sync::OnceLock<tokio::sync::RwLock<SemaphoreTuner>> =
std::sync::OnceLock::new();

fn get_semaphore() -> &'static (Semaphore, u32) {
CONCURRENCY_BUDGET.get_or_init(|| {
let permits = std::env::var("POLARS_CONCURRENCY_BUDGET")
.map(|s| s.parse::<usize>().expect("integer"))
.unwrap_or_else(|_| std::cmp::max(POOL.current_num_threads(), MAX_BUDGET_PER_REQUEST));
(Semaphore::new(permits), permits as u32)
})
}

pub async fn tune_with_concurrency_budget<F, Fut>(requested_budget: u32, callable: F) -> Fut::Output
where
F: FnOnce() -> Fut,
Fut: Future,
Fut::Output: GetSize,
{
let (semaphore, initial_budget) = get_semaphore();

// This would never finish otherwise.
assert!(requested_budget <= *initial_budget);

// Keep permit around.
// On drop it is returned to the semaphore.
let _permit_acq = semaphore.acquire_many(requested_budget).await.unwrap();

let now = std::time::Instant::now();
let res = callable().await;

if FINISHED_TUNING.load(Ordering::Relaxed) || res.size() == 0 {
return res;
}

let duration = now.elapsed().as_millis() as u64;
let permit_store = PERMIT_STORE.get_or_init(|| tokio::sync::RwLock::new(SemaphoreTuner::new()));

let tuner = permit_store.read().await;
// Keep track of download speed
tuner.add_stats(res.size(), duration);

// We only tune every n ms
if !tuner.should_tune() {
return res;
}
// Drop the read tuner before trying to acquire a writer
drop(tuner);

// Reduce locking by letting only 1 in 5 tasks lock the tuner
if (INCR.fetch_add(1, Ordering::Relaxed) % 5) != 0 {
return res;
}
// Never lock as we will deadlock. This can run under rayon
let Ok(mut tuner) = permit_store.try_write() else {
return res;
};
let finished = tuner.tune(semaphore);
if finished {
// Undo the last step
let undo = semaphore.acquire().await.unwrap();
std::mem::forget(undo)
}
res
}

pub async fn with_concurrency_budget<F, Fut>(requested_budget: u32, callable: F) -> Fut::Output
where
F: FnOnce() -> Fut,
Fut: Future,
{
let (semaphore, initial_budget) = CONCURRENCY_BUDGET.get_or_init(|| {
let permits = std::env::var("POLARS_CONCURRENCY_BUDGET")
.map(|s| s.parse::<usize>().expect("integer"))
.unwrap_or_else(|_| std::cmp::max(POOL.current_num_threads(), MAX_BUDGET_PER_REQUEST));
(Semaphore::new(permits), permits as u32)
});
let (semaphore, initial_budget) = get_semaphore();

// This would never finish otherwise.
assert!(requested_budget <= *initial_budget);

// Keep permit around.
// On drop it is returned to the semaphore.
let _permit_acq = semaphore.acquire_many(requested_budget).await.unwrap();

callable().await
}

Expand Down

0 comments on commit fe42166

Please sign in to comment.