Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread throttling #1167

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open

Conversation

floffel
Copy link

@floffel floffel commented May 14, 2024

I have to implement something like a thread throttling approach.
Threads should be able to stop and start as needed (or, as my implementation suggest, can be set to sleep and wake up as needed from client libraries).
This could help f.e. for further developments regarding this issue: #319

Implementation details in which I don't know if I went the right way:

  • I implement a new Latch, a toggling latch for a global state in each thread.
  • For using the newly implemented methods to block/unblock (=sleep/unsleep) I need to get access to the global Registry and I therefore made the Registry struct public.

I would love to hear, if this is a feature rayon should support and if I got the implementation right.

@floffel floffel changed the title Thread trotteling Thread throttling May 15, 2024
@cuviper
Copy link
Member

cuviper commented May 20, 2024

I think it's going to be very hard to write robust code if current_num_threads can change at any time. We do have some hedging for this possibility in the documentation, but only for the default, whereas explicit ThreadPoolBuilder::num_threads is supposed to lock that down.

Can you describe a bit more about your use case? Maybe we can find different approaches that will accomplish that.

@floffel
Copy link
Author

floffel commented May 23, 2024

Thank you for your thoughts! And sorry for the late replay (I had to check with my mentor what I am allowed to write).

My work is for an unpublished paper and I like to explore if dynamic resource management of the concurrent running threads makes sense for heterogeneous CPUs in terms of performance and/or energy efficiency gains.
I therefore want to scale the number of threads an application uses at runtime. As I found the above-mentioned issue, It thought that this feature might be something more Rayon users wish for.
I will provide another example, more real-life-use-case, which is easily testable, below.

I want to first answer on your concerns regarding current_num_threads, I will then go into the overall use case:
In my opinion, current_num_threads is the current size of the threadpool. This size will only change if it is explicitly called to do so (by enlarging / shrinking the threadpool size) and should then act accordingly.
Rayon also uses thread::available_parallelism() (f.e. to determine the default size of its threadpool, which can, as you wrote, be changed). But thread::available_parallelism() is not going to change. This method can therefore always be used to determine the amount of threads provided by the operating system.

As I see it, the only information that is therefore missing, is how many threads rayon started with.
This is (in my opinion) not a real problem, as it can be gathered by calling current_num_threads right after the initialization of the threadpool or any time before shrinking/expanding the threadpool size. Another solution would be to implement something like started_num_threads


Main Idea: Have an adjustable threadpool while running.

Solutions:
Three solutions would be possible as far as I know:

  • Spawn new threads to grow the threadpool and join threads to shrink it or
  • send already spawned threads to sleep to shrink it and wake them up to grow the threadpool
  • destruct and reinitialize the whole threadpool

As rayon spawns (as default) one thread for each OS provided thread on startup the second approach seemed promising. (Also, it was far easier to implement.)
Additionally, I think that sending threads to sleep costs less performance than creating/joining (I will look this up and comment on this). So I went with the second approach.
The downside of my approach compared to the first approach is, that I do not spawn additional threads.
The downside of overall implementing this is performance. The first approach costs more performance when adjusting the threadpool while the second approach costs more performance while adding jobs to the queues. Though I couldn't really study the real impact of the first approach as it would consist of major changes to the overall code and structure. And the last approach is not possible as there is no way to destruct the global threadpool (?).

What can be accomplished with this solution:
Rayon has no possible solution to adjust the threadpool as soon as it is initialized. It is therefore not possible, to react f.e. on limit changes via cgroups.

An example use case would be the changes of cgroups limitations.
Testcode which fails in original rayon and cannot be made work, but could be adjusted to succeed with the proposed change (Linux only, uses cgroups and therefore root priv.):
Testcode:

use std::sync::atomic::{AtomicI32, Ordering};
use std::{process, thread};
use std::time::Duration;
use cgroups_rs::error::ErrorKind;
use cgroups_rs::cgroup_builder::*;
use rayon::broadcast;

fn wait_for_esc() {
    let countdown = 3;
    for i in 0..countdown {
        thread::sleep(Duration::from_secs(1));
        println!("{}...", countdown-i);
    }
}

fn test_available_threads_through_spawn() -> i32 {
    let n = AtomicI32::new(0);

    broadcast(
        |_| {
            n.fetch_add(1, Ordering::SeqCst); // todo: SeqCst?
        }
    );

    n.load(Ordering::SeqCst)
}

// test cpu limit through applying restrictions of how much time
// the process is allowed to use
// see: https://docs.kernel.org/scheduler/sched-bwc.html#examples and https://github.com/rust-lang/rust/pull/92697
fn test_cpu_limit() {
    println!("***** TESTING CPU LIMI VIA CPU quota & period *****");
    let ch = cgroups_rs::hierarchies::auto();

    // get available threads
    let available_threads = thread::available_parallelism().unwrap().get();

    // create and configure the cgroup
    let cg = match CgroupBuilder::new("rayonlimit")
        //.cpu()
        .build(ch) {
            Ok(cg) => cg,
            Err(e) if *e.kind() == ErrorKind::Other => {
                panic!("{}\nPlease ensure that you have root privileges", e)
            },
            Err(e) => panic!("{}", e)
        };
    let cpus: &cgroups_rs::cpu::CpuController = cg.controller_of().unwrap();

    // add self to cgroup
    let own_pid =  u64::from(process::id());
    println!("moving myself into the cgroup, my pid: {}", own_pid);
    cg.add_task_by_tgid(cgroups_rs::CgroupPid { pid: own_pid }).unwrap();

    let mut current_cpu_limit: i64 = 2; // limit to 2 cpus and go up from there to not block the main thread
    while current_cpu_limit <= available_threads.try_into().unwrap() { // todo: this will fail for machines with loads of threads...
        println!("...testing with {} worth of cpu times... ", current_cpu_limit);

        cpus.set_cfs_quota_and_period(
            Option::Some(200000*current_cpu_limit), // todo: this will fail for machines with loads of threads...
            Option::Some(200000)
        ).unwrap();

        let thread_count_after_cgroup = thread::available_parallelism().unwrap().get();
        if thread_count_after_cgroup == current_cpu_limit.try_into().unwrap() { // todo: this will fail for machines with loads of threads...
            println!("available_parallelism() SUCCEEDED!");
        } else {
            println!("available_parallelism() FAILED! Rayon shoud have run {} threads but run {}", current_cpu_limit, thread_count_after_cgroup);
        }

        let rayon_real_threads = test_available_threads_through_spawn();
        if rayon_real_threads == current_cpu_limit.try_into().unwrap() {
            println!("...rayon SUCCEEDED! really spawned {rayon_real_threads} threads");
        } else {
            println!("...RAYON FAILED! Rayon shoud have run {} threads but run {}", current_cpu_limit, rayon_real_threads);
        }

        current_cpu_limit += 1;
    }

    // cleanup
    cg.remove_task_by_tgid(cgroups_rs::CgroupPid { pid: own_pid }).unwrap();
    cg.delete().unwrap();

}

// test cpu limit through applying restrictions of how many processes
// are allowed to be created
fn test_pid_limit() {
    println!("***** TESTING CPU LIMI VIA PID LIMIT *****");
    let ch = cgroups_rs::hierarchies::auto();

    // get available threads
    let available_threads = thread::available_parallelism().unwrap().get();

    // create and configure the cgroup
    let cg = match CgroupBuilder::new("rayonlimit")
        .pid().maximum_number_of_processes(cgroups_rs::MaxValue::Value(12)).done()
        .build(ch) {
            Ok(cg) => cg,
            Err(e) if *e.kind() == ErrorKind::Other => {
                panic!("{}\nPlease ensure that you have root privileges", e)
            },
            Err(e) => panic!("{}", e)
        };
    let pid_controller: &cgroups_rs::pid::PidController = cg.controller_of().unwrap();

    // add self to cgroup
    let own_pid =  u64::from(process::id());
    println!("moving myself into the cgroup, my pid: {}", own_pid);
    cg.add_task_by_tgid(cgroups_rs::CgroupPid { pid: own_pid }).unwrap();

    let mut current_cpu_limit: i64 = 2; // limit to 2 cpus and go up from there to not block the main thread
    while current_cpu_limit <= available_threads.try_into().unwrap() { // todo: this will fail for machines with loads of threads...
        println!("...testing with {} worth of cpu times... ", current_cpu_limit);

        pid_controller.set_pid_max(cgroups_rs::MaxValue::Value(current_cpu_limit)).unwrap();

        let thread_count_after_cgroup = thread::available_parallelism().unwrap().get();
        if thread_count_after_cgroup == current_cpu_limit.try_into().unwrap() { // todo: this will fail for machines with loads of threads...
            println!("available_parallelism() SUCCEEDED!");
        } else {
            println!("available_parallelism() FAILED! Rayon shoud have run {} threads but run {}", current_cpu_limit, thread_count_after_cgroup);
        }

        let rayon_real_threads = test_available_threads_through_spawn();
        if rayon_real_threads == current_cpu_limit.try_into().unwrap() {
            println!("...rayon SUCCEEDED! really spawned {rayon_real_threads} threads");
        } else {
            println!("...RAYON FAILED! Rayon shoud have run {} threads but run {}", current_cpu_limit, rayon_real_threads);
        }

        current_cpu_limit += 1;
    }

    // cleanup
    cg.remove_task_by_tgid(cgroups_rs::CgroupPid { pid: own_pid }).unwrap();
    cg.delete().unwrap();
}

fn main() {
    let count = thread::available_parallelism().unwrap().get();
    println!("*** parrallel avail. threads at start: {}", count);

    print!("Construction cgroup now - exit NOW if you don't want it to be constructed!");
    wait_for_esc();

    test_cpu_limit();
    test_pid_limit();
}

** Additional thoughts:
If I had implemented it in the first way (described above), the following code would maybe lead to a panic in runtime and therefore additional checks would be necessary. I think the first way is therefore much more error-prone.
Testcode (replace main of above code with this function):

fn main() {
    let ch = cgroups_rs::hierarchies::auto();

    // create and configure the cgroup
    let cg = match CgroupBuilder::new("rayonlimit")
        .pid().maximum_number_of_processes(cgroups_rs::MaxValue::Value(12)).done()
        .build(ch) {
            Ok(cg) => cg,
            Err(e) if *e.kind() == ErrorKind::Other => {
                panic!("{}\nPlease ensure that you have root privileges", e)
            },
            Err(e) => panic!("{}", e)
        };
    let pid_controller: &cgroups_rs::pid::PidController = cg.controller_of().unwrap();

    // add self to cgroup
    let own_pid =  u64::from(process::id());
    println!("moving myself into the cgroup, my pid: {}", own_pid);
    cg.add_task_by_tgid(cgroups_rs::CgroupPid { pid: own_pid }).unwrap();

    rayon::ThreadPoolBuilder::new().build_global().unwrap();

    let count = thread::available_parallelism().unwrap().get();
    println!("*** parrallel avail. threads at start: {}", count);

    let count = test_available_threads_through_spawn();
    println!("*** parrallel running threads at start: {}", count);

    cg.remove_task_by_tgid(cgroups_rs::CgroupPid { pid: own_pid }).unwrap();
    cg.delete().unwrap();
}

Sorry for the wall of text. I hope it is somewhat understandable.

@adamreichold
Copy link
Collaborator

I think it's going to be very hard to write robust code if current_num_threads can change at any time.

I wonder if this is really necessary? If we just put the threads to sleep without making this observable by users of the thread pool, isn't this the same as if those threads were currently busy running other tasks? Meaning that current_num_threads would stay a hint for useful amounts of parallelism when manually splitting tasks, but I don't think we really rely on making that amount of parallel progress anywhere (because other tasks could always be limiting us).

So if those throttled threads would just not pull work from the global queue and go to sleep from the perspective of the OS, wouldn't that be enough for the heterogenous CPU use case? We would still need to ensure that broadcast etc. reaches all of them, but if they do not wake up and participate in the actual work, that should suffice for OS scheduler, shouldn't it?

@floffel
Copy link
Author

floffel commented May 25, 2024

I do think Rayon would benefit from adjusting current_num_threads, but maybe I am overseeing something. I am seeing a benefit from not adjusting it as well so I will try to outline my thoughts on both cases.
But, to put this first: Yes, for my use case would it be enough.

As I understood rayon, it is not the same as those threads were currently busy running other tasks, because they never finish with running these tasks.
Rayon uses the stealing approach, so every thread has its own queue. When adding new tasks, f.e. via broadcast, they will be filled. One can use methods like par_chunks to finer distribute them, which I think is what you mean, that current_num_threads should be a hint for.

If those threads are busy and another one isn't, it will steal tasks from the busy ones. I would assume, that this introduces some overhead.
So having a thread which will never be able to work on his queue, is simply another queue for the other threads. But this would undermine the thought, that a programmer knows how many objects should be passed to one thread queue at broadcasting.
If, on the other side, a programmer would know how many threads are active, an optimal amount of tasks could be transferred to the active threads while the others get no work at all.

Of course, if the threads would be woken up at a later time, they would need to steal work from other queues. But my maybe naive thought would be, that we have more power available (that we can then use to perform steal(s)) after waking threads up than we have after sending them sleep.

My thought was, that one can always call thread::available_parallelism() to get the "real" amount of threads and current_num_threads to get the actual number of threads which are ready to work. But please, if you have the time to correct me, I would be very pleased as I might just oversee things...

@adamreichold
Copy link
Collaborator

My thought was, that one can always call thread::available_parallelism() to get the "real" amount of threads and current_num_threads to get the actual number of threads which are ready to work.

For one thing, Rayon thread pools are not bound to the available parallelism but can be constructed with a given fixed number of threads.

@floffel
Copy link
Author

floffel commented May 25, 2024

Yes you are right. And the current_num_threads will reflect this, until {,un}block_threads or adjust_blocked_threads will be called. Is this a problem?

@adamreichold
Copy link
Collaborator

Yes, downstream code might have sized data structures using current_num_threads and get panics by using the updated values after throttling has been requested. That code might not even be aware of these requests being in some other dependency.

To be honest, I would prefer focusing on the throttling issue since

But, to put this first: Yes, for my use case would it be enough.

and postponing discussions on current_num_threads if it is not actually holding up your work.

@floffel
Copy link
Author

floffel commented May 26, 2024

Would be okay for me.
So I guess I'll revert my changes to current_num_threads then?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants