Skip to content

fakecore/mpmc-ringbuf

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

28 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

mpmc-ringbuf

A Rust MPMC(multiple producer and multiple consumers) ringbuf queue

Usage

single thread

let mut msg_queue:MsgQueue<u8> = MsgQueue::new();
let mut writer1 = msg_queue.add_producer();
let mut read1 = msg_queue.add_consumer();
writer1.write(vec![10;100]);
println!("{}",msg_queue.get_consumer_count());
println!("{}",read1.size());
assert_eq!(read1.size(),100);
let mut read2 = msg_queue.add_consumer();
assert_eq!(read2.size(),0);
assert_eq!(msg_queue.get_consumer_count(),2);
writer1.write(vec![0;100]);
assert_eq!(read1.size(),200);
assert_eq!(read2.size(),100);
read2.read(50);
assert_eq!(read1.size(),200);
assert_eq!(read2.size(),50);

multi-thread

let mut msg_queue: Arc<Mutex<MsgQueue<u8>>> = Arc::new(Mutex::new(MsgQueue::new()));
let m1 = msg_queue.clone();
let m2 = msg_queue.clone();
let mut c1_id = 0;
let mut c2_id = 0;
{
let mut msg_lock = (*msg_queue).lock().unwrap();
let mut c1 = msg_lock.add_consumer();
let mut c2 = msg_lock.add_consumer();
c1_id = c1.id();
c2_id = c2.id();
}
assert_eq!(msg_queue.lock().unwrap().get_consumer_count(),2);
let t1 = thread::spawn(move || {
let mut msg_lock = (*m1).lock().unwrap();
println!("get lock1");
let p = msg_lock.add_producer();
for i in 0..100{
p.write(vec![0;5]);
}
});

let t2 = thread::spawn(move || {
let mut msg_lock = (*m2).lock().unwrap();
println!("get lock1");
let p = msg_lock.add_producer();
for i in 0..100{
p.write(vec![0;5]);
}
});
t1.join();
t2.join();
{
let mut msg_lock = (*msg_queue).lock().unwrap();
assert_eq!(msg_lock.get_consumer_count(),2);
let mut c1 = msg_lock.get_consumer(c1_id);
let mut c2 = msg_lock.get_consumer(c2_id);
println!("size: {} {}",c1.size(),c2.size());
assert_eq!(c1.size(),1000);
assert_eq!(c2.size(),1000);
}

feature

fixed mode: fixed data block size and with data overwrite

dynamic mode: dynamic data block size

About

A Rust MPMC ringbuf queue with topic subscription

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages