-
-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(rust): Add NontemporalArithmeticChunked #17708
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #17708 +/- ##
==========================================
- Coverage 80.37% 80.27% -0.10%
==========================================
Files 1500 1500
Lines 196605 197179 +574
Branches 2793 2793
==========================================
+ Hits 158016 158291 +275
- Misses 38076 38375 +299
Partials 513 513 ☔ View full report in Codecov by Sentry. |
We are working on a new streaming engine which has much smaller chunks of data (1-5MiB, roughly), which will cause the kernels to operate mostly on in-cache data. We've already seen benchmarks where it's 3x faster than the current streaming engine due to this. Since in the future we want to run most queries using this method, it wouldn't really help a lot of things to also have the nontemporal operations (in fact using them in the new streaming engine would be significantly worse), but they do add quite a bit of code to maintain, also making it harder to add new arithmetic operations/kernels. Finally the Rust docs specifically state that you should NOT use the function: Can you run a benchmark how much faster it is for the current engine? If it speeds up our current operations by a lot I could consider it but if it's only a small benefit that vanishes as soon as we turn on the new streaming engine by default, I'm not that excited for it. |
1-5MiB is still larger than L1 and L2 cache in most CPUs. And this PR aims to provide an option. The new streaming engine is not used everywhere. When using rust polars, for example pyo3 extensions and plugins, we usually directly operate on arrays. (And the
You can have a look at the corresponding issue (rust-lang/rust#114582). They mainly focus on the memory model (i.e. load/store ordering), about which we do not care in numeric computation.
Yes, I'm going to. |
I've wrote a simple benchmark: use std::hint::black_box;
use std::time::Instant;
use mimalloc::MiMalloc;
use polars::chunked_array::arithmetic::NontemporalArithmeticChunked;
use polars::prelude::*;
use rand::RngCore;
#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
const N: usize = 100000000;
const ITERATIONS: usize = 100;
fn temporal_bench(mut a: UInt32Chunked, mut b: UInt32Chunked) -> UInt32Chunked {
for i in 1..=ITERATIONS {
eprint!("{i:03}\r");
let c = (&a).wrapping_add(&b);
std::mem::swap(&mut a, &mut b);
let _ = std::mem::replace(&mut b, c);
}
b
}
fn nontemporal_bench(mut a: UInt32Chunked, mut b: UInt32Chunked) -> UInt32Chunked {
for i in 1..=ITERATIONS {
eprint!("{i:03}\r");
let c = (&a).wrapping_add_nontemporal(&b);
std::mem::swap(&mut a, &mut b);
let _ = std::mem::replace(&mut b, c);
}
b
}
fn main() {
unsafe {
use libmimalloc_sys::*;
// allow large os pages
mi_option_set_enabled(mi_option_large_os_pages, true);
// disable mi_option_purge_delay
// we deallocate frequently, and purging is slow
mi_option_set(15, -1);
}
let mut rng = rand::thread_rng();
let mut a = Vec::with_capacity(N);
let mut b = Vec::with_capacity(N);
unsafe { a.set_len(N) };
unsafe { b.set_len(N) };
rng.fill_bytes(unsafe { std::mem::transmute(a.as_mut_slice()) });
rng.fill_bytes(unsafe { std::mem::transmute(b.as_mut_slice()) });
let a = UInt32Chunked::from_vec("a", a);
let b = UInt32Chunked::from_vec("b", b);
// warm up rounds
let _ = black_box(temporal_bench(black_box(a.clone()), black_box(b.clone())));
let start = Instant::now();
let r1 = black_box(temporal_bench(black_box(a.clone()), black_box(b.clone())));
let end = Instant::now();
let duration_temporal = (end - start).as_millis();
let start = Instant::now();
let r2 = black_box(nontemporal_bench(black_box(a.clone()), black_box(b.clone())));
let end = Instant::now();
let duration_nontemporal = (end - start).as_millis();
for v1 in r1.into_no_null_iter().take(10) {
print!("{v1} ");
}
println!();
for v2 in r2.into_no_null_iter().take(10) {
print!("{v2} ");
}
println!();
println!("temporal:\t{duration_temporal} ms");
println!("nontemporal:\t{duration_nontemporal} ms");
} On my machine, the result is:
Nontemporal version is ~8% faster. You can also have a try. |
I've wrote a bench that matches the case in new stream engine as well. In this bench N is 1000000, so the array is ~4MB: use std::hint::black_box;
use std::time::Instant;
use mimalloc::MiMalloc;
use polars::chunked_array::arithmetic::NontemporalArithmeticChunked;
use polars::prelude::*;
use rand::RngCore;
use rayon::prelude::*;
#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
const N: usize = 1000000;
const ITERATIONS: usize = 1000;
fn temporal_bench(a: UInt32Chunked, b: UInt32Chunked) {
for _ in 1..=ITERATIONS {
let _ = black_box(black_box(&a).wrapping_add(black_box(&b)));
}
}
fn nontemporal_bench(a: UInt32Chunked, b: UInt32Chunked) {
for _ in 1..=ITERATIONS {
let _ = black_box(black_box(&a).wrapping_add_nontemporal(black_box(&b)));
}
}
fn main() {
unsafe {
use libmimalloc_sys::*;
// allow large os pages
mi_option_set_enabled(mi_option_large_os_pages, true);
// disable mi_option_purge_delay
// we deallocate frequently, and purging is slow
mi_option_set(15, -1);
}
let mut rng = rand::thread_rng();
let mut a = Vec::with_capacity(N);
let mut b = Vec::with_capacity(N);
unsafe { a.set_len(N) };
unsafe { b.set_len(N) };
rng.fill_bytes(unsafe { std::mem::transmute(a.as_mut_slice()) });
rng.fill_bytes(unsafe { std::mem::transmute(b.as_mut_slice()) });
let a = UInt32Chunked::from_vec("a", a);
let b = UInt32Chunked::from_vec("b", b);
// warm up rounds
(0..16).into_par_iter().for_each(|_| {
temporal_bench(a.clone(), b.clone());
});
let start = Instant::now();
(0..32).into_par_iter().for_each(|_| {
temporal_bench(a.clone(), b.clone());
});
let end = Instant::now();
let duration_temporal = (end - start).as_millis();
let start = Instant::now();
(0..32).into_par_iter().for_each(|_| {
nontemporal_bench(a.clone(), b.clone());
});
let end = Instant::now();
let duration_nontemporal = (end - start).as_millis();
println!("temporal:\t{duration_temporal} ms");
println!("nontemporal:\t{duration_nontemporal} ms");
}
I can conclude that for simple operations under the situations:
the nontemporal version has better performance. |
@ruihe774 Thanks for your benchmarks, I will take a more detailed look when I'm back at work on Monday. However, from a quick peek I don't think that your latter benchmark matches the typical usage in the new streaming engine. You're not reading back the results at all, you're just discarding them: let _ = black_box(black_box(&a).wrapping_add_nontemporal(black_box(&b))); In the new streaming engine the result from the fn temporal_bench(mut a: UInt32Chunked, b: UInt32Chunked) {
for _ in 1..=ITERATIONS {
a = a.wrapping_add(black_box(&b)));
}
black_box(a);
}
fn nontemporal_bench(mut a: UInt32Chunked, b: UInt32Chunked) {
for _ in 1..=ITERATIONS {
a = a.wrapping_add_nontermporal(black_box(&b)));
}
black_box(a);
} |
Is it also true for single expressions like And what about the situation that one operation has multiple (e.g., two) inputs? If input A is ready and B is not yet calculated, doesn't it have to wait for B before A gets used? I'm not familiar with the new stream engine. Please correct me if I'm wrong. |
@ruihe774 Yes, both cases will work in a streaming fashion. It will support arbitrary directed acyclic graphs.
Yes, the new engine uses |
Pray tell, what numerical algorithm are you executing that causes you to write data and then never access it again from any core on the machine? Because I think you have slightly misunderstood the issue if you think this is confined to usage of atomics (and rayon seems to use quite a bit of multithreaded access to slices). You see, a nontemporal store is not ordered with respect to any other store on the machine. That includes |
@orlp On x86, this code incurs actual unquestionable hardware UB if anyone ever loads or stores to that region of memory again. The classic issue with nontemporal stores is precisely in libraries like polars where people update large slices and then pass a reference to that slice off to somewhere else, which does something like, oh, I don't know... parr.slice().par_chunks().for_each(|chunk| {
// It literally does not matter what goes into this block,
// because it is already undefined behavior.
}); That's UB on x86. You cannot assume that the writes retire before any of the subsequent operations, even the implicit ones. You cannot even assume e.g. that setting an The entire problem with |
I am not as knowledgeable as @orlp or @workingjubilee on this point, but I am in general very much against these kinds of changes that are too compiler specific or behavior that is very UB sensitive. At least before we can show that it significantly speeds up something or makes something easier. This seems like this would require a lot of inconsideration from all developers into the future to keep this from turning into UB or from completely negating any performance improvements that it might cause now. Especially, if we start considering that this needs to interact with both the streaming and in-memory engine. |
The appropriate way to implement this PR is using things like Once this conga-line of PRs lands, this one will do nothing on architectures where LLVM does not implement a coherent non-temporal store semantics: |
@workingjubilee Thank you for your great explanation. I'm now convinced that |
And before using those, please read the documentation here. The PR was indeed actively unsound due to not using |
For arrays that are larger than cache size, arithmetic is usually memory bound. Temporal store (
std::ptr::write
) used inArithmeticChunked
will quickly fill up the cache, discarding previously cached data. What's worse, because we usually perform operations from the head of an array to the tail, after temporal store fills up the cache, the following operations will have to re-read the array from the memory to cache, which is a significant performance penalty.This PR adds
NontemporalArithmeticChunked
that uses nontemporal store (std::intrinsics::nontemporal_store
), which 1) does not allocate new lines in the cache, instead storing directly to the memory with higher bandwidth, 2) is not ordered with regard to other loads/stores, giving the CPU more flexibility in hiding memory latency. There are public benchmarks w.r.t. temporal store and nontemporal store, demonstrating the performance improvement of nontemporal store on large amount of data.NontemporalArithmeticChunked
is only enabled when the featurenontemporal
is enabled, which is enabled by the featurenightly
. The Rust doc saysstd::intrinsics::nontemporal_store
will probably never become stable, soNontemporalArithmeticChunked
will never, either. The doc also saysstd::intrinsics::nontemporal_store
violates the memory model of Rust; we do not care about that; after all, no one will put atomic integers in aPrimitiveArray
.This PR only adds a trait in Rust. It is not used in lazy and is not exposed to Python, yet. In the future, optimizations like automatically choosing nontemporal arithmetic when operands are large arrays can be added.