Skip to content

Commit

Permalink
Sarthak | Adds an implementation of SPSC queue
Browse files Browse the repository at this point in the history
  • Loading branch information
SarthakMakhija committed Jul 14, 2024
1 parent 7c77341 commit 1d8f6f0
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 1 deletion.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
bytes = "1.6.1"
bytes = "1.6.1"
crossbeam-utils = "0.8.20"
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub(crate) mod memory;
pub mod key_value;
mod queue;

fn main() {
println!("Hello, world!");
Expand Down
1 change: 1 addition & 0 deletions src/queue/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub(crate) mod spsc;
107 changes: 107 additions & 0 deletions src/queue/spsc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
use std::sync::atomic::{AtomicUsize, Ordering};

use crossbeam_utils::CachePadded;

pub(crate) struct SPSCQueue<T> {
head: CachePadded<AtomicUsize>,
tail: CachePadded<AtomicUsize>,
elements: Vec<T>,
}

impl<T> SPSCQueue<T> {
pub(crate) fn new(capacity: usize) -> Self {
SPSCQueue {
head: CachePadded::new(AtomicUsize::new(0)),
tail: CachePadded::new(AtomicUsize::new(0)),
elements: Vec::with_capacity(capacity),
}
}

pub(crate) fn is_empty(&self) -> bool {
self.head.load(Ordering::Acquire) == self.tail.load(Ordering::Acquire)
}

pub(crate) fn try_enqueue(&mut self, element: T) -> bool {
let tail = self.tail.load(Ordering::Relaxed);
let mut next_tail = tail + 1;
if next_tail == self.elements.capacity() + 1 {
next_tail = 0;
}
if next_tail == self.head.load(Ordering::Acquire) {
return false;
}
self.elements.insert(tail, element);
self.tail.store(next_tail, Ordering::Release);
return true;
}

pub(crate) fn try_get_front(&self) -> Option<&T> {
let head = self.head.load(Ordering::Relaxed);
if self.tail.load(Ordering::Acquire) == head {
return None;
}
return Some(&self.elements[head]);
}

pub(crate) fn pop(&self) {
let head = self.head.load(Ordering::Relaxed);
let mut next_head = head + 1;
if next_head == self.elements.capacity() {
next_head = 0;
}
self.head.store(next_head, Ordering::Release);
}
}

#[cfg(test)]
mod tests {
use std::sync::atomic::Ordering;
use crate::queue::spsc::SPSCQueue;

#[test]
fn is_empty_queue() {
let queue: SPSCQueue<usize> = SPSCQueue::new(2);
assert_eq!(true, queue.is_empty());
}

#[test]
fn try_enqueue_and_get_front() {
let mut queue = SPSCQueue::new(2);
assert_eq!(true, queue.try_enqueue(10));
assert_eq!(true, queue.try_enqueue(20));

assert_eq!(&10, queue.try_get_front().unwrap());
queue.pop();

assert_eq!(&20, queue.try_get_front().unwrap());
queue.pop();
}

#[test]
fn can_not_enqueue_in_a_full_queue() {
let mut queue = SPSCQueue::new(2);
assert_eq!(true, queue.try_enqueue(10));
assert_eq!(true, queue.try_enqueue(20));
assert_eq!(false, queue.try_enqueue(30));
}

#[test]
fn can_not_get_front_from_an_empty_queue() {
let queue: SPSCQueue<usize> = SPSCQueue::new(2);

assert_eq!(None, queue.try_get_front());
}

#[test]
fn pop_in_a_queue() {
let mut queue = SPSCQueue::new(2);
assert_eq!(true, queue.try_enqueue(10));
assert_eq!(true, queue.try_enqueue(20));

queue.pop();
assert_eq!(1, queue.head.load(Ordering::SeqCst));

queue.pop();
assert_eq!(0, queue.head.load(Ordering::SeqCst));
}
}

0 comments on commit 1d8f6f0

Please sign in to comment.