Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sequence generation to be always increasing #6

Merged
merged 9 commits into from
Feb 24, 2024
64 changes: 42 additions & 22 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,24 +348,38 @@ impl Generator {
where
T: Snowflake,
{
let sequence = self.components.take_sequence();
use std::cmp::Ordering;

let timestamp;
loop {
let now = self.epoch.elapsed().unwrap().as_millis() as u64;

if sequence != 4095 || now > self.components.timestamp() {
let mut now = self.epoch.elapsed().unwrap().as_millis() as u64;
let instance = self.components.instance();
match now.cmp(&self.components.timestamp()) {
Ordering::Less => {
panic!("Clock has moved backwards! This is not supported.");
}
Ordering::Greater => {
self.components.reset_sequence();
self.components.set_timestamp(now);
timestamp = now;
break;
T::from_parts(now, instance, 0)
}
Ordering::Equal => {
let sequence = self.components.take_sequence();
if sequence == 0 {
now = self.wait_until_next_millisecond(now);
}
self.components.set_timestamp(now);
T::from_parts(now, instance, sequence)
}
}
}

fn wait_until_next_millisecond(&mut self, last_millisecond: u64) -> u64 {
loop {
let now = self.epoch.elapsed().unwrap().as_millis() as u64;
if now > last_millisecond {
return now;
}
std::hint::spin_loop();
}

let instance = self.components.instance();

T::from_parts(timestamp, instance, sequence)
}
}

Expand All @@ -386,7 +400,8 @@ pub(crate) struct Components(u64);
impl Components {
#[inline]
pub(crate) const fn new(instance: u64) -> Self {
Self((instance << 12) & BITMASK_INSTANCE)
// Fill in the given instance, and set the sequence at '1'
Self((instance << 12) & BITMASK_INSTANCE | 1)
}

#[inline]
Expand All @@ -410,6 +425,10 @@ impl Components {
*self = Self(timestamp + instance + seq)
}

pub(crate) fn reset_sequence(&mut self) {
self.set_sequence(1)
}

pub(crate) fn set_timestamp(&mut self, ts: u64) {
let timestamp = ts << 22;
let instance = (self.instance() << 12) & BITMASK_INSTANCE;
Expand Down Expand Up @@ -459,7 +478,7 @@ mod tests {
#[test]
fn test_components() {
let mut components = Components::new(0);
assert_eq!(components.sequence(), 0);
assert_eq!(components.sequence(), 1);
assert_eq!(components.timestamp(), 0);

components.set_sequence(1024);
Expand All @@ -477,21 +496,22 @@ mod tests {
}

#[test]
fn test_generate() {
fn test_generate_ordered() {
const INSTANCE: u64 = 0;

let mut sequence = 0;
let mut last_id = None;
let mut generator = Generator::new_unchecked(INSTANCE as u16);

for _ in 0..10_000 {
let id: u64 = generator.generate();
assert_eq!(id.instance(), INSTANCE);
assert_eq!(id.sequence(), sequence);

match sequence {
4095 => sequence = 0,
_ => sequence += 1,
}
assert!(
last_id < Some(id),
"expected {:?} to be less than {:?}",
last_id,
Some(id)
);
last_id = Some(id);
}
}

Expand Down
80 changes: 47 additions & 33 deletions src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
//! }
//! ```

use std::mem::MaybeUninit;
use std::time::SystemTime;

use crate::builder::Builder;
Expand Down Expand Up @@ -229,41 +228,55 @@ where
S: Snowflake,
F: Fn(),
{
// Even thought we only assign this value once we need to assign this value to
// something before passing it (reference) into the closure.
// This value is safe to read after the closure completes.
let mut id = MaybeUninit::uninit();
use std::cmp;

// Since `fetch_update` doesn't return a result,
// we store the result in this mutable variable.
// Note that using MaybeUninit is not necessary
// as the compiler is smart enough to elide the Option for us.
let mut id = None;

let _ = self
.components
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |bits| {
let mut components = Components::from_bits(bits);

let sequence = components.take_sequence();

let timestamp;
loop {
let now = self.epoch.elapsed().as_millis() as u64;

if sequence != 4095 || now > components.timestamp() {
let mut now = self.epoch.elapsed().as_millis() as u64;
let instance = components.instance();
match now.cmp(&components.timestamp()) {
cmp::Ordering::Less => {
panic!("Clock has moved backwards! This is not supported");
}
cmp::Ordering::Greater => {
components.reset_sequence();
components.set_timestamp(now);
timestamp = now;
break;
id = Some(S::from_parts(now, instance, 0));
Some(components.to_bits())
}
cmp::Ordering::Equal => {
let sequence = components.take_sequence();
if sequence == 0 {
now = Self::wait_until_next_millisecond(&self.epoch, now, &tick_wait);
}
components.set_timestamp(now);
id = Some(S::from_parts(now, instance, sequence));
Some(components.to_bits())
}

tick_wait();
}

let instance = components.instance();

id.write(S::from_parts(timestamp, instance, sequence));

Some(components.to_bits())
});
id.expect("ID should have been set within the fetch_update closure.")
}

// SAFETY: The call to `fetch_update` only completes once the closure ran fully.
// At this point `id` has been initialized from within the closure.
unsafe { id.assume_init() }
fn wait_until_next_millisecond<F>(epoch: &T, last_millisecond: u64, tick_wait: F) -> u64
where
F: Fn(),
{
loop {
let now = epoch.elapsed().as_millis() as u64;
if now > last_millisecond {
return now;
}
tick_wait();
}
}
}

Expand All @@ -279,18 +292,19 @@ mod tests {
fn test_generate() {
const INSTANCE: u64 = 0;

let mut sequence = 0;
let mut last_id = None;
let generator = Generator::new_unchecked(INSTANCE as u16);

for _ in 0..10_000 {
let id: u64 = generator.generate();
assert_eq!(id.instance(), INSTANCE);
assert_eq!(id.sequence(), sequence);

match sequence {
4095 => sequence = 0,
_ => sequence += 1,
}
assert!(
last_id < Some(id),
"expected {:?} to be less than {:?}",
last_id,
Some(id)
);
last_id = Some(id);
}
}

Expand Down
Loading