You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I want to read very big file. It possibly doesn't fit in memory. In my particular case file size is 10 GiB and my memory size is 32 GiB. It fits in memory in this case, but it is still possible that in some point of future file will be too big. So, I want to read the file, split it into reasonably-sized chunks, say, 4 MiB, process them in parallel (this processing is CPU-bounded, not I/O-bounded) and then write results to some (small) destination file in correct order
I want to read small file, split it into very small chunks, process them in parallel and write the results to very big file in correct order. I. e. the same as previous point, but now small-to-big, not big-to-small
It seems rayon doesn't support these cases directly. This is how I managed to do the first (big-to-small) task:
structChunks{input: std::fs::File,
...
}// My own hand-rolled sequential iterator for chunks. It reads chunks from file. Remember: it is potentially not possible to store all chunks in the same time in memoryimplIteratorforChunks{typeItem = Vec<u8>;fnnext(&mutself) -> Option<Vec<u8>>{
...}}fnmain(){use rayon::prelude::*;// We rely on this: https://github.com/rayon-rs/rayon/issues/1068letmut hash_vec = Chunks{input: std::fs::File::open("/dev/stdin").unwrap(), .... }.enumerate().par_bridge().map(|(chunk_n, chunk)| {
...return(chunk_n, ...);}).collect::<Vec<_>>();
hash_vec.sort_by_key(|(n, _)|*n);for(_, chunk)in hash_vec {
index_file.write_all(&chunk).unwrap();}}
As well as I understand, par_bridge().map(...).collect() doesn't preserve order. (This is my interpretation of phrase "
The resulting iterator is not guaranteed to keep the order of the original iterator" in https://docs.rs/rayon/1.7.0/rayon/iter/trait.ParallelBridge.html .) So I have to do that sort_by_key trick. The code seems error-prone. What if I forget to call sort_by_key?
Okay, so I decided to stick with this solution. If you know better, please, let me know.
Now to small-to-big task. I spend a lot of time reading rayon documentation and still it seems that this is not possible to solve this problem using rayon! But it seems the problem can be solved using tokio and futures::stream::FuturesOrdered ( https://docs.rs/futures/0.3.28/futures/stream/struct.FuturesOrdered.html ). FuturesOrdered by itself runs everything in single thread, so it seems I should also use tokio::task::spawn_blocking ( https://docs.rs/tokio/1.29.1/tokio/task/fn.spawn_blocking.html ). I think the code should look like so (not tested):
let input_chunks:Vec<_> = ..;letmut futures = FuturesOrdered::new();for chunk in input_chunks {
futures.push_back(moveasync{spawn_blocking(...).await});}whileletSome(output_chunk) = futures.next().await{/* write output_chunk to file */}
But my problem is CPU-bounded, not I/O bounded! And everyone says that we should use threads and rayon for CPU-bounded tasks and async for I/O-bounded tasks. So, it seems everyone lies. It seems I should use async, despite my problem is CPU-bounded.
It seems that FuturesOrdered will go for big-to-small task, too.
Another possible solution for small-to-big task is so: mmap output file to memory, then write to it using rayon and my own collect_into_vec-like function ( https://docs.rs/rayon/1.7.0/rayon/iter/trait.IndexedParallelIterator.html#method.collect_into_vec ). (I cannot use collect_into_vec directly, because I want to collect parallel iterator of Vec<u8> into single &mut [u8], i. e. I don't need merely collect_into_vec, I need flatten_and_collect_into_vec, or, more precisely, flatten_and_collect_into_mut_slice.)
Such solution will be idiomatic rayon code. Unfortunately, it will move all hard work to OS. It will generate too much pressure for page cache. And I know that this is bad, and that for performance we should not put too much information to page cache (see borgbackup/borg#7674 (comment) ). So I think FuturesOrdered solution will be better.
Conclusions/questions:
How should I solve my problem? Are my proposed solutions best?
I think some solutions to my problems (i. e. something similar to FuturesOrdered) should be added to rayon. Or at least rayon docs should tell user what he should do in such situation, i. e. to tell him that if his data doesn't fit to memory, then he probably should use some solution X
Is well-known principle "Use rayon and threads for CPU-bounded tasks and async for I/O bounded tasks" still true? If not, then please write to rayon docs what user should use. For example, write something like this: "If your data doesn't fit to memory, then use X even if you task is CPU-bounded"
The text was updated successfully, but these errors were encountered:
I think it would be much easier if you used positioned I/O, i.e. just iterate over 0..num_chunks using par_iter().for_each(..) and compute the correct offset to read from based on the chunk number to then call read_exact_at. This should also parallelize your I/O requests somewhat giving the kernel's I/O scheduler more parallelism to work with. The number of chunks and their offsets do depend only on the length of the file after all.
Here is my tasks:
(My OS is Linux.)
(In fact my actual problem is this: borgbackup/borg#7674 (comment) . See there sequential Rust solution.)
It seems rayon doesn't support these cases directly. This is how I managed to do the first (big-to-small) task:
Unfortunately, I see multiple problems here:
par_bridge
will not store too many items in memory. @cuviper gave me such guarantee here: Does rayon have guarantee that .par_bridge().map().collect() will not store too many "Item"s in mem? #1068 (comment) , but it is limited to current version, so I have to make sure to never upgrade rayon, unless I checked that guarantee still holds. This is ugly. (Okay, it seems that current decision is to stick to this guarantee, this is good.)par_bridge().map(...).collect()
doesn't preserve order. (This is my interpretation of phrase "The resulting iterator is not guaranteed to keep the order of the original iterator" in https://docs.rs/rayon/1.7.0/rayon/iter/trait.ParallelBridge.html .) So I have to do that
sort_by_key
trick. The code seems error-prone. What if I forget to callsort_by_key
?Okay, so I decided to stick with this solution. If you know better, please, let me know.
Now to small-to-big task. I spend a lot of time reading rayon documentation and still it seems that this is not possible to solve this problem using rayon! But it seems the problem can be solved using tokio and
futures::stream::FuturesOrdered
( https://docs.rs/futures/0.3.28/futures/stream/struct.FuturesOrdered.html ). FuturesOrdered by itself runs everything in single thread, so it seems I should also usetokio::task::spawn_blocking
( https://docs.rs/tokio/1.29.1/tokio/task/fn.spawn_blocking.html ). I think the code should look like so (not tested):But my problem is CPU-bounded, not I/O bounded! And everyone says that we should use threads and rayon for CPU-bounded tasks and async for I/O-bounded tasks. So, it seems everyone lies. It seems I should use async, despite my problem is CPU-bounded.
How we got here? How it is became possible that rayon, which designed for CPU-bounded problems, perform on them worse than async programming, which has whole book on how bad it is ( https://rust-lang.github.io/wg-async/vision/submitted_stories/status_quo.html )?
It seems that FuturesOrdered will go for big-to-small task, too.
Another possible solution for small-to-big task is so: mmap output file to memory, then write to it using rayon and my own
collect_into_vec
-like function ( https://docs.rs/rayon/1.7.0/rayon/iter/trait.IndexedParallelIterator.html#method.collect_into_vec ). (I cannot usecollect_into_vec
directly, because I want to collect parallel iterator ofVec<u8>
into single&mut [u8]
, i. e. I don't need merelycollect_into_vec
, I needflatten_and_collect_into_vec
, or, more precisely,flatten_and_collect_into_mut_slice
.)Such solution will be idiomatic rayon code. Unfortunately, it will move all hard work to OS. It will generate too much pressure for page cache. And I know that this is bad, and that for performance we should not put too much information to page cache (see borgbackup/borg#7674 (comment) ). So I think FuturesOrdered solution will be better.
Conclusions/questions:
The text was updated successfully, but these errors were encountered: