From 81277aa90942709ff44b78339bb82dfd64c28abb Mon Sep 17 00:00:00 2001 From: qqwy Date: Fri, 23 Feb 2024 21:28:03 +0100 Subject: [PATCH 1/9] Fix non-sync version of sequence generation --- src/lib.rs | 55 ++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index f73df39..2116a62 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -348,24 +348,39 @@ impl Generator { where T: Snowflake, { - let sequence = self.components.take_sequence(); + use std::cmp::Ordering; - let timestamp; - loop { - let now = self.epoch.elapsed().unwrap().as_millis() as u64; - - if sequence != 4095 || now > self.components.timestamp() { + let sequence; + let mut now = self.epoch.elapsed().unwrap().as_millis() as u64; + let instance = self.components.instance(); + match now.cmp(&self.components.timestamp()) { + Ordering::Less => { + panic!("Clock has moved backwards! This is not supported."); + }, + Ordering::Greater => { + self.components.reset_sequence(); self.components.set_timestamp(now); - timestamp = now; - break; + T::from_parts(now, instance, 0) + }, + Ordering::Equal => { + sequence = self.components.take_sequence(); + if sequence == 0 { + now = self.wait_until_next_millisecond(now); + } + self.components.set_timestamp(now); + T::from_parts(now, instance, sequence) } + } + } + fn wait_until_next_millisecond(&mut self, last_millisecond: u64) -> u64 { + loop { + let now = self.epoch.elapsed().unwrap().as_millis() as u64; + if now > last_millisecond { + return now; + } std::hint::spin_loop(); } - - let instance = self.components.instance(); - - T::from_parts(timestamp, instance, sequence) } } @@ -410,6 +425,10 @@ impl Components { *self = Self(timestamp + instance + seq) } + pub(crate) fn reset_sequence(&mut self) { + self.set_sequence(4095) + } + pub(crate) fn set_timestamp(&mut self, ts: u64) { let timestamp = ts << 22; let instance = (self.instance() << 12) & BITMASK_INSTANCE; @@ -477,21 +496,17 @@ mod tests { } #[test] - fn test_generate() { + fn test_generate_ordered() { const INSTANCE: u64 = 0; - let mut sequence = 0; + let mut last_id = None; let mut generator = Generator::new_unchecked(INSTANCE as u16); for _ in 0..10_000 { let id: u64 = generator.generate(); assert_eq!(id.instance(), INSTANCE); - assert_eq!(id.sequence(), sequence); - - match sequence { - 4095 => sequence = 0, - _ => sequence += 1, - } + assert!(last_id < Some(id), "expected {:?} to be less than {:?}", last_id, Some(id)); + last_id = Some(id); } } From e30a1b424fce519e3b216e2d176a4a7f2f92eb75 Mon Sep 17 00:00:00 2001 From: qqwy Date: Fri, 23 Feb 2024 21:57:27 +0100 Subject: [PATCH 2/9] Fix sync version of sequence generation --- src/lib.rs | 3 +- src/sync.rs | 79 +++++++++++++++++++++++++++++------------------------ 2 files changed, 45 insertions(+), 37 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 2116a62..21eb0a2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -350,7 +350,6 @@ impl Generator { { use std::cmp::Ordering; - let sequence; let mut now = self.epoch.elapsed().unwrap().as_millis() as u64; let instance = self.components.instance(); match now.cmp(&self.components.timestamp()) { @@ -363,7 +362,7 @@ impl Generator { T::from_parts(now, instance, 0) }, Ordering::Equal => { - sequence = self.components.take_sequence(); + let sequence = self.components.take_sequence(); if sequence == 0 { now = self.wait_until_next_millisecond(now); } diff --git a/src/sync.rs b/src/sync.rs index f0d0f8b..5436139 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -14,7 +14,6 @@ //! } //! ``` -use std::mem::MaybeUninit; use std::time::SystemTime; use crate::builder::Builder; @@ -147,7 +146,7 @@ impl Generator { where T: Snowflake, { - self.internal.generate(std::hint::spin_loop) + self.internal.generate(&std::hint::spin_loop) } } @@ -224,46 +223,60 @@ where self.epoch } - fn generate(&self, tick_wait: F) -> S + fn generate(&self, tick_wait: &F) -> S where S: Snowflake, F: Fn(), { - // Even thought we only assign this value once we need to assign this value to - // something before passing it (reference) into the closure. - // This value is safe to read after the closure completes. - let mut id = MaybeUninit::uninit(); + use std::cmp; + + // Since `fetch_update` doesn't return a result, + // we store the result in this mutable variable. + // Note that using MaybeUninit is not necessary + // as the compiler is smart enough to elide the Option for us. + let mut id = None; let _ = self .components .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |bits| { let mut components = Components::from_bits(bits); - - let sequence = components.take_sequence(); - - let timestamp; - loop { - let now = self.epoch.elapsed().as_millis() as u64; - - if sequence != 4095 || now > components.timestamp() { + let mut now = self.epoch.elapsed().as_millis() as u64; + let instance = components.instance(); + match now.cmp(&components.timestamp()) { + cmp::Ordering::Less => { + panic!("Clock has moved backwards! This is not supported"); + }, + cmp::Ordering::Greater => { + components.reset_sequence(); components.set_timestamp(now); - timestamp = now; - break; + id = Some(S::from_parts(now, instance, 0)); + Some(components.to_bits()) + }, + cmp::Ordering::Equal => { + let sequence = components.take_sequence(); + if sequence == 0 { + now = Self::wait_until_next_millisecond(&self.epoch, now, tick_wait); + } + components.set_timestamp(now); + id = Some(S::from_parts(now, instance, sequence)); + Some(components.to_bits()) } - - tick_wait(); } - - let instance = components.instance(); - - id.write(S::from_parts(timestamp, instance, sequence)); - - Some(components.to_bits()) }); + id.expect("ID should have been set within the fetch_update closure.") + } - // SAFETY: The call to `fetch_update` only completes once the closure ran fully. - // At this point `id` has been initialized from within the closure. - unsafe { id.assume_init() } + fn wait_until_next_millisecond(epoch: &T, last_millisecond: u64, tick_wait: F) -> u64 + where + F: Fn(), + { + loop { + let now = epoch.elapsed().as_millis() as u64; + if now > last_millisecond { + return now; + } + tick_wait(); + } } } @@ -279,18 +292,14 @@ mod tests { fn test_generate() { const INSTANCE: u64 = 0; - let mut sequence = 0; + let mut last_id = None; let generator = Generator::new_unchecked(INSTANCE as u16); for _ in 0..10_000 { let id: u64 = generator.generate(); assert_eq!(id.instance(), INSTANCE); - assert_eq!(id.sequence(), sequence); - - match sequence { - 4095 => sequence = 0, - _ => sequence += 1, - } + assert!(last_id < Some(id), "expected {:?} to be less than {:?}", last_id, Some(id)); + last_id = Some(id); } } From 287b6a9d9d1ea48cfc6b37f81054fce2dac70a9d Mon Sep 17 00:00:00 2001 From: qqwy Date: Sat, 24 Feb 2024 12:04:12 +0100 Subject: [PATCH 3/9] Cargo fmt --- src/lib.rs | 11 ++++++++--- src/sync.rs | 32 +++++++++++++++++++------------- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 21eb0a2..20daeb3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -355,12 +355,12 @@ impl Generator { match now.cmp(&self.components.timestamp()) { Ordering::Less => { panic!("Clock has moved backwards! This is not supported."); - }, + } Ordering::Greater => { self.components.reset_sequence(); self.components.set_timestamp(now); T::from_parts(now, instance, 0) - }, + } Ordering::Equal => { let sequence = self.components.take_sequence(); if sequence == 0 { @@ -504,7 +504,12 @@ mod tests { for _ in 0..10_000 { let id: u64 = generator.generate(); assert_eq!(id.instance(), INSTANCE); - assert!(last_id < Some(id), "expected {:?} to be less than {:?}", last_id, Some(id)); + assert!( + last_id < Some(id), + "expected {:?} to be less than {:?}", + last_id, + Some(id) + ); last_id = Some(id); } } diff --git a/src/sync.rs b/src/sync.rs index 5436139..bbea7d3 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -245,13 +245,13 @@ where match now.cmp(&components.timestamp()) { cmp::Ordering::Less => { panic!("Clock has moved backwards! This is not supported"); - }, + } cmp::Ordering::Greater => { components.reset_sequence(); components.set_timestamp(now); id = Some(S::from_parts(now, instance, 0)); Some(components.to_bits()) - }, + } cmp::Ordering::Equal => { let sequence = components.take_sequence(); if sequence == 0 { @@ -266,7 +266,7 @@ where id.expect("ID should have been set within the fetch_update closure.") } - fn wait_until_next_millisecond(epoch: &T, last_millisecond: u64, tick_wait: F) -> u64 + fn wait_until_next_millisecond(epoch: &T, last_millisecond: u64, tick_wait: F) -> u64 where F: Fn(), { @@ -298,7 +298,12 @@ mod tests { for _ in 0..10_000 { let id: u64 = generator.generate(); assert_eq!(id.instance(), INSTANCE); - assert!(last_id < Some(id), "expected {:?} to be less than {:?}", last_id, Some(id)); + assert!( + last_id < Some(id), + "expected {:?} to be less than {:?}", + last_id, + Some(id) + ); last_id = Some(id); } } @@ -423,17 +428,18 @@ mod loom_tests { let generator = Arc::new(InternalGenerator::::new_unchecked(0)); let (tx, rx) = mpsc::channel(); - let threads: Vec<_> = (0..THREADS) - .map(|_| { - let generator = generator.clone(); - let tx = tx.clone(); + let threads: Vec<_> = + (0..THREADS) + .map(|_| { + let generator = generator.clone(); + let tx = tx.clone(); - thread::spawn(move || { - let id: u64 = generator.generate(panic_on_wait); - tx.send(id).unwrap(); + thread::spawn(move || { + let id: u64 = generator.generate(panic_on_wait); + tx.send(id).unwrap(); + }) }) - }) - .collect(); + .collect(); for th in threads { th.join().unwrap(); From e9030c1fd0bf8d94aa7bc7ad554bf1fde41701ef Mon Sep 17 00:00:00 2001 From: qqwy Date: Sat, 24 Feb 2024 12:08:55 +0100 Subject: [PATCH 4/9] Pass on 'tick_wait' by value in the sync generator. --- src/sync.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/sync.rs b/src/sync.rs index bbea7d3..d723d4c 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -146,7 +146,7 @@ impl Generator { where T: Snowflake, { - self.internal.generate(&std::hint::spin_loop) + self.internal.generate(std::hint::spin_loop) } } @@ -223,7 +223,7 @@ where self.epoch } - fn generate(&self, tick_wait: &F) -> S + fn generate(&self, tick_wait: F) -> S where S: Snowflake, F: Fn(), @@ -255,7 +255,7 @@ where cmp::Ordering::Equal => { let sequence = components.take_sequence(); if sequence == 0 { - now = Self::wait_until_next_millisecond(&self.epoch, now, tick_wait); + now = Self::wait_until_next_millisecond(&self.epoch, now, &tick_wait); } components.set_timestamp(now); id = Some(S::from_parts(now, instance, sequence)); From f025708f4edd565c48a8568db0ce6b71e5a39ce5 Mon Sep 17 00:00:00 2001 From: qqwy Date: Sat, 24 Feb 2024 12:10:49 +0100 Subject: [PATCH 5/9] Properly reset_sequence such that the whole range can be re-used --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 20daeb3..b6329f2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -425,7 +425,7 @@ impl Components { } pub(crate) fn reset_sequence(&mut self) { - self.set_sequence(4095) + self.set_sequence(1) } pub(crate) fn set_timestamp(&mut self, ts: u64) { From d1ffe6024a3ea0dc3c711862ab199768db10aaae Mon Sep 17 00:00:00 2001 From: qqwy Date: Sat, 24 Feb 2024 12:27:32 +0100 Subject: [PATCH 6/9] Ensure the sequence is initialized at state '1' properly. --- src/lib.rs | 3 ++- src/sync.rs | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index b6329f2..bb1e35e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -400,7 +400,8 @@ pub(crate) struct Components(u64); impl Components { #[inline] pub(crate) const fn new(instance: u64) -> Self { - Self((instance << 12) & BITMASK_INSTANCE) + // Fill in the given instance, and set the sequence at '1' + Self((instance << 12) & BITMASK_INSTANCE | 1) } #[inline] diff --git a/src/sync.rs b/src/sync.rs index d723d4c..fe91bf4 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -254,6 +254,7 @@ where } cmp::Ordering::Equal => { let sequence = components.take_sequence(); + dbg!((now, instance, sequence)); if sequence == 0 { now = Self::wait_until_next_millisecond(&self.epoch, now, &tick_wait); } @@ -271,6 +272,7 @@ where F: Fn(), { loop { + println!("Waiting until the next millisecond from {}", last_millisecond); let now = epoch.elapsed().as_millis() as u64; if now > last_millisecond { return now; From f61231bcc8760902d278af92e539c70706243731 Mon Sep 17 00:00:00 2001 From: qqwy Date: Sat, 24 Feb 2024 17:20:04 +0100 Subject: [PATCH 7/9] Re-run cargo fmt and fix `lib::test_components` test --- src/lib.rs | 2 +- src/sync.rs | 24 +++++++++++++----------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index bb1e35e..bd8b70a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -478,7 +478,7 @@ mod tests { #[test] fn test_components() { let mut components = Components::new(0); - assert_eq!(components.sequence(), 0); + assert_eq!(components.sequence(), 1); assert_eq!(components.timestamp(), 0); components.set_sequence(1024); diff --git a/src/sync.rs b/src/sync.rs index fe91bf4..952e287 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -272,7 +272,10 @@ where F: Fn(), { loop { - println!("Waiting until the next millisecond from {}", last_millisecond); + println!( + "Waiting until the next millisecond from {}", + last_millisecond + ); let now = epoch.elapsed().as_millis() as u64; if now > last_millisecond { return now; @@ -430,18 +433,17 @@ mod loom_tests { let generator = Arc::new(InternalGenerator::::new_unchecked(0)); let (tx, rx) = mpsc::channel(); - let threads: Vec<_> = - (0..THREADS) - .map(|_| { - let generator = generator.clone(); - let tx = tx.clone(); + let threads: Vec<_> = (0..THREADS) + .map(|_| { + let generator = generator.clone(); + let tx = tx.clone(); - thread::spawn(move || { - let id: u64 = generator.generate(panic_on_wait); - tx.send(id).unwrap(); - }) + thread::spawn(move || { + let id: u64 = generator.generate(panic_on_wait); + tx.send(id).unwrap(); }) - .collect(); + }) + .collect(); for th in threads { th.join().unwrap(); From 25e895e1efe2453d18d22ba1c60988343f056673 Mon Sep 17 00:00:00 2001 From: Qqwy / Marten Date: Sat, 24 Feb 2024 17:35:04 +0100 Subject: [PATCH 8/9] Update src/sync.rs Co-authored-by: MrGunflame --- src/sync.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/sync.rs b/src/sync.rs index 952e287..c677108 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -254,7 +254,6 @@ where } cmp::Ordering::Equal => { let sequence = components.take_sequence(); - dbg!((now, instance, sequence)); if sequence == 0 { now = Self::wait_until_next_millisecond(&self.epoch, now, &tick_wait); } From 2a8441c3cb7a93357ff725c8423535d2cb311665 Mon Sep 17 00:00:00 2001 From: Qqwy / Marten Date: Sat, 24 Feb 2024 17:35:11 +0100 Subject: [PATCH 9/9] Update src/sync.rs Co-authored-by: MrGunflame --- src/sync.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/sync.rs b/src/sync.rs index c677108..1c24101 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -271,10 +271,6 @@ where F: Fn(), { loop { - println!( - "Waiting until the next millisecond from {}", - last_millisecond - ); let now = epoch.elapsed().as_millis() as u64; if now > last_millisecond { return now;