-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdoc_old_md
101 lines (101 loc) · 3.48 KB
/
doc_old_md
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
//! An implementation of a thread-safe, lock-free and scalable FIFO queue algorithm.
//! Two variants of the algorithm are provided:
//!
//! 1. MPMC (Multi-Producer/Multi-Consumer)
//! 2. MPSC (Multi-Producer/Single-Consumer)
//!
//! The API is similar to that of [`std::sync::mpsc`] with separate types for producer
//! and consumer handles.
//! There is also [`OwnedQueue`], which is a single-threaded queue with an identical
//! internal structure for cheap conversion from/to either of the thread-safe variants
//! or initialization from iterators.
//!
//! The MPMC queue implementation is based on the algorithm in [\[1\]][1] and the MPSC
//! variant is derived from the MPMC algorithm described therein but optimized for the
//! additional constraint of at most a single consumer.
//!
//! One key aspect of the implemented algorithm is, that it does not rely on a dedicated
//! memory reclamation mechanism but comes with its own memory reclamation strategy,
//! that is lock-free, deterministic (rather than lazy/deferred) like reference counting
//! and has very little performance impact.
//! It does come with a limitation, however, in that there is a limit on how many
//! producers and consumers can safely interact with the same queue.
//! These limits are enforced by the implementation and they are encoded in the
//! two constants [`MAX_PRODUCERS`] and [`MAX_CONSUMERS`].
//! When the number of live producer/consumer handles would exceed these safe limits,
//! further attempts to [`clone`](Clone::clone) handles will panic!
//!
//! In the current implementation, there can be **961** producer and **480** consumer
//! handles.
//!
//! [1]: https://ieeexplore.ieee.org/document/9490347
//!
//! # Examples
//!
//! ```
//! # #[cfg(not(miri))]
//! # {
//! use std::sync::{Arc, Barrier, Mutex};
//! use std::thread;
//!
//! const N: usize = 100;
//! const THREADS: usize = 4;
//! const EXP_SUM: usize = (N * (N - 1)) / 2; // Gauss sum [1, 99]
//!
//! let (tx, rx) = loo::mpmc::queue();
//!
//! let barrier = Arc::new(Barrier::new(THREADS * 2));
//! let total_sum = Arc::new(Mutex::new(0));
//!
//! let mut handles = Vec::with_capacity(THREADS * 2);
//!
//! // spawn 4 producer threads, each pushing 100 elements in FIFO order
//! for _ in 0..THREADS {
//! let tx = tx.clone();
//! let barrier = Arc::clone(&barrier);
//!
//! handles.push(thread::spawn(move || {
//! barrier.wait();
//!
//! for i in 0..100 {
//! tx.push_back(i);
//! }
//! }));
//! }
//!
//! // spawn 4 consumer threads, each popping exactly 100 elements from the queue
//! // and summing them up
//! for _ in 0..THREADS {
//! let rx = rx.clone();
//! let barrier = Arc::clone(&barrier);
//! let total_sum = Arc::clone(&total_sum);
//! handles.push(thread::spawn(move || {
//! let mut count = 0;
//! let mut sum = 0;
//!
//! barrier.wait();
//!
//! while count < N { // retrieve exactly N elements
//! if let Some(e) = rx.pop_front() {
//! count += 1;
//! sum += e;
//! }
//! }
//!
//! // add this thread's result to the total sum
//! let mut lock = total_sum.lock().unwrap();
//! *lock += sum;
//! }));
//! }
//!
//! for handle in handles {
//! handle.join().unwrap();
//! }
//!
//! let lock = total_sum.lock().unwrap();
//! let mut count = *lock;
//!
//! assert!(rx.is_empty(), "queue must be empty after all threads are done");
//! assert_eq!(count, THREADS * EXP_SUM);
//! # }
//! ```