A Rust MPMC(multiple producer and multiple consumers) ringbuf queue
single thread
let mut msg_queue:MsgQueue<u8> = MsgQueue::new();
let mut writer1 = msg_queue.add_producer();
let mut read1 = msg_queue.add_consumer();
writer1.write(vec![10;100]);
println!("{}",msg_queue.get_consumer_count());
println!("{}",read1.size());
assert_eq!(read1.size(),100);
let mut read2 = msg_queue.add_consumer();
assert_eq!(read2.size(),0);
assert_eq!(msg_queue.get_consumer_count(),2);
writer1.write(vec![0;100]);
assert_eq!(read1.size(),200);
assert_eq!(read2.size(),100);
read2.read(50);
assert_eq!(read1.size(),200);
assert_eq!(read2.size(),50);
multi-thread
let mut msg_queue: Arc<Mutex<MsgQueue<u8>>> = Arc::new(Mutex::new(MsgQueue::new()));
let m1 = msg_queue.clone();
let m2 = msg_queue.clone();
let mut c1_id = 0;
let mut c2_id = 0;
{
let mut msg_lock = (*msg_queue).lock().unwrap();
let mut c1 = msg_lock.add_consumer();
let mut c2 = msg_lock.add_consumer();
c1_id = c1.id();
c2_id = c2.id();
}
assert_eq!(msg_queue.lock().unwrap().get_consumer_count(),2);
let t1 = thread::spawn(move || {
let mut msg_lock = (*m1).lock().unwrap();
println!("get lock1");
let p = msg_lock.add_producer();
for i in 0..100{
p.write(vec![0;5]);
}
});
let t2 = thread::spawn(move || {
let mut msg_lock = (*m2).lock().unwrap();
println!("get lock1");
let p = msg_lock.add_producer();
for i in 0..100{
p.write(vec![0;5]);
}
});
t1.join();
t2.join();
{
let mut msg_lock = (*msg_queue).lock().unwrap();
assert_eq!(msg_lock.get_consumer_count(),2);
let mut c1 = msg_lock.get_consumer(c1_id);
let mut c2 = msg_lock.get_consumer(c2_id);
println!("size: {} {}",c1.size(),c2.size());
assert_eq!(c1.size(),1000);
assert_eq!(c2.size(),1000);
}
fixed mode: fixed data block size and with data overwrite
dynamic mode: dynamic data block size