From 1800d070bfa7333bb202efc22c03d756e3414347 Mon Sep 17 00:00:00 2001 From: Phuong Nguyen Date: Sat, 3 Aug 2024 13:53:45 +0900 Subject: [PATCH] Triple protocol now gets spunned up as a tokio::task --- .../node/src/protocol/cryptography.rs | 2 +- chain-signatures/node/src/protocol/message.rs | 2 +- chain-signatures/node/src/protocol/mod.rs | 4 +- chain-signatures/node/src/protocol/triple.rs | 152 ++++++++++++------ chain-signatures/node/src/test_utils.rs | 4 +- integration-tests/chain-signatures/Cargo.lock | 60 +------ integration-tests/chain-signatures/Cargo.toml | 4 +- 7 files changed, 117 insertions(+), 111 deletions(-) diff --git a/chain-signatures/node/src/protocol/cryptography.rs b/chain-signatures/node/src/protocol/cryptography.rs index 12f08c811..016d37c39 100644 --- a/chain-signatures/node/src/protocol/cryptography.rs +++ b/chain-signatures/node/src/protocol/cryptography.rs @@ -381,7 +381,7 @@ impl CryptographicProtocol for RunningState { if let Err(err) = triple_manager.stockpile(active, protocol_cfg) { tracing::warn!(?err, "running: failed to stockpile triples"); } - for (p, msg) in triple_manager.poke(protocol_cfg).await { + for (p, msg) in triple_manager.poke().await { let info = self.fetch_participant(&p)?; messages.push(info.clone(), MpcMessage::Triple(msg)); } diff --git a/chain-signatures/node/src/protocol/message.rs b/chain-signatures/node/src/protocol/message.rs index f402dfbe8..4eb7a17f1 100644 --- a/chain-signatures/node/src/protocol/message.rs +++ b/chain-signatures/node/src/protocol/message.rs @@ -267,7 +267,7 @@ impl MessageHandler for RunningState { if let Some(protocol) = protocol { while let Some(message) = queue.pop_front() { - protocol.message(message.from, message.data); + protocol.message(message.from, message.data).await?; } } } diff --git a/chain-signatures/node/src/protocol/mod.rs b/chain-signatures/node/src/protocol/mod.rs index 2570ce471..29909b2b3 100644 --- a/chain-signatures/node/src/protocol/mod.rs +++ b/chain-signatures/node/src/protocol/mod.rs @@ -327,9 +327,7 @@ impl MpcSignProtocol { let message_time = Instant::now(); if let Err(err) = state.handle(&self, &mut queue).await { - tracing::info!("protocol unable to handle messages: {err:?}"); - tokio::time::sleep(Duration::from_millis(100)).await; - continue; + tracing::warn!("protocol unable to handle messages: {err:?}"); } crate::metrics::PROTOCOL_LATENCY_ITER_MESSAGE .with_label_values(&[my_account_id.as_str()]) diff --git a/chain-signatures/node/src/protocol/triple.rs b/chain-signatures/node/src/protocol/triple.rs index bdd9b1a79..d3b9ed732 100644 --- a/chain-signatures/node/src/protocol/triple.rs +++ b/chain-signatures/node/src/protocol/triple.rs @@ -4,10 +4,11 @@ use super::message::TripleMessage; use super::presignature::GenerationError; use crate::gcp::error; use crate::storage::triple_storage::{LockTripleNodeStorageBox, TripleData}; -use crate::types::TripleProtocol; use crate::util::AffinePointExt; -use cait_sith::protocol::{Action, InitializationError, Participant, ProtocolError}; +use cait_sith::protocol::{ + Action, InitializationError, MessageData, Participant, Protocol, ProtocolError, +}; use cait_sith::triples::{TripleGenerationOutput, TriplePub, TripleShare}; use chrono::Utc; use highway::{HighwayHash, HighwayHasher}; @@ -38,28 +39,84 @@ pub struct Triple { pub struct TripleGenerator { pub id: TripleId, pub participants: Vec, - pub protocol: TripleProtocol, + pub threshold: usize, pub timestamp: Option, pub timeout: Duration, + + /// Join handle for spawned task that runs the protocol. + join_handle: tokio::task::JoinHandle>, + /// Message sender for when the node receives a message and needs to forward it to the protocl task. + message_tx: tokio::sync::mpsc::Sender<(Participant, MessageData)>, + /// Message receiver for when the protocol needs to send a message and we should get it back + /// on the main runtime thread for it to be sent to other nodes. + protocol_rx: tokio::sync::mpsc::Receiver>>, } impl TripleGenerator { pub fn new( id: TripleId, + me: Participant, participants: Vec, - protocol: TripleProtocol, + threshold: usize, timeout: u64, - ) -> Self { - Self { + ) -> Result { + let (message_tx, mut message_rx) = tokio::sync::mpsc::channel(2048); + let (protocol_tx, protocol_rx) = tokio::sync::mpsc::channel(2048); + + let mut protocol = + cait_sith::triples::generate_triple::(&participants, me, threshold)?; + + let join_handle = tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_millis(500)); + + loop { + tokio::select! { + Some((from, data)) = message_rx.recv() => { + protocol.message(from, data); + } + _ = interval.tick() => { + loop { + let action = protocol.poke()?; + match &action { + Action::Wait => { + protocol_tx.send(action).await.map_err(|err| { + ProtocolError::Other(Box::new(err)) + })?; + break; + } + Action::Return(_) => { + protocol_tx.send(action).await.map_err(|err| { + ProtocolError::Other(Box::new(err)) + })?; + return Ok(()); + } + Action::SendMany(_) | Action::SendPrivate(_, _) => { + protocol_tx.send(action).await.map_err(|err| { + ProtocolError::Other(Box::new(err)) + })?; + } + } + } + } + } + } + }); + + Ok(Self { id, participants, - protocol, + threshold, timestamp: None, timeout: Duration::from_millis(timeout), - } + join_handle, + message_tx, + protocol_rx, + }) } - pub fn poke(&mut self) -> Result>, ProtocolError> { + pub async fn poke( + &mut self, + ) -> Result>, ProtocolError> { let timestamp = self.timestamp.get_or_insert_with(Instant::now); if timestamp.elapsed() > self.timeout { tracing::info!( @@ -67,12 +124,26 @@ impl TripleGenerator { elapsed = ?timestamp.elapsed(), "triple protocol timed out" ); + self.join_handle.abort(); return Err(ProtocolError::Other( anyhow::anyhow!("triple protocol timed out").into(), )); } - self.protocol.poke() + self.protocol_rx.recv().await.ok_or_else(|| { + ProtocolError::Other(anyhow::anyhow!("action sender has been dropped").into()) + }) + } + + pub async fn message( + &mut self, + from: Participant, + data: MessageData, + ) -> Result<(), ProtocolError> { + self.message_tx + .send((from, data)) + .await + .map_err(|err| ProtocolError::Other(Box::new(err))) } } @@ -222,14 +293,9 @@ impl TripleManager { tracing::debug!(id, "starting protocol to generate a new triple"); let participants: Vec<_> = participants.keys().cloned().collect(); - let protocol: TripleProtocol = Box::new(cait_sith::triples::generate_triple::( - &participants, - self.me, - self.threshold, - )?); self.generators.insert( id, - TripleGenerator::new(id, participants, protocol, timeout), + TripleGenerator::new(id, self.me, participants, self.threshold, timeout)?, ); self.queued.push_back(id); self.introduced.insert(id); @@ -394,7 +460,7 @@ impl TripleManager { id: TripleId, participants: &Participants, cfg: &ProtocolConfig, - ) -> Result, CryptographicError> { + ) -> Result, CryptographicError> { if self.triples.contains_key(&id) || self.gc.contains_key(&id) { Ok(None) } else { @@ -409,24 +475,20 @@ impl TripleManager { tracing::debug!(id, "joining protocol to generate a new triple"); let participants = participants.keys_vec(); - let protocol = Box::new(cait_sith::triples::generate_triple::( - &participants, - self.me, - self.threshold, - )?); let generator = e.insert(TripleGenerator::new( id, + self.me, participants, - protocol, + self.threshold, cfg.triple.generation_timeout, - )); + )?); self.queued.push_back(id); crate::metrics::NUM_TOTAL_HISTORICAL_TRIPLE_GENERATORS .with_label_values(&[self.my_account_id.as_str()]) .inc(); - Ok(Some(&mut generator.protocol)) + Ok(Some(generator)) } - Entry::Occupied(e) => Ok(Some(&mut e.into_mut().protocol)), + Entry::Occupied(e) => Ok(Some(e.into_mut())), } } } @@ -435,27 +497,19 @@ impl TripleManager { /// messages to be sent to the respective participant. /// /// An empty vector means we cannot progress until we receive a new message. - pub async fn poke(&mut self, cfg: &ProtocolConfig) -> Vec<(Participant, TripleMessage)> { - // Add more protocols to the ongoing pool if there is space. - let to_generate_len = cfg.max_concurrent_generation as usize - self.ongoing.len(); - if !self.queued.is_empty() && to_generate_len > 0 { - for _ in 0..to_generate_len { - self.queued.pop_front().map(|id| self.ongoing.insert(id)); - } - } - + pub async fn poke(&mut self) -> Vec<(Participant, TripleMessage)> { let mut messages = Vec::new(); let mut triples_to_insert = Vec::new(); let mut errors = Vec::new(); - self.generators.retain(|id, generator| { - if !self.ongoing.contains(id) { - // If the protocol is not ongoing, we should retain it for the next time - // it is in the ongoing pool. - return true; - } + + let ids = self.generators.keys().into_iter().cloned().collect::>(); + for id in &ids { + let Some((_, mut generator)) = self.generators.remove_entry(id) else { + continue; + }; loop { - let action = match generator.poke() { + let action = match generator.poke().await { Ok(action) => action, Err(e) => { errors.push(e); @@ -466,15 +520,17 @@ impl TripleManager { elapsed = ?generator.timestamp.unwrap().elapsed(), "added {id} to failed triples" ); - break false; + break; } }; match action { Action::Wait => { tracing::trace!("waiting"); - // Retain protocol until we are finished - break true; + + // protocol not done: insert back to our pool of generators. + self.generators.insert(*id, generator); + break; } Action::SendMany(data) => { for p in &generator.participants { @@ -559,12 +615,12 @@ impl TripleManager { // Protocol done, remove it from the ongoing pool. self.ongoing.remove(id); self.introduced.remove(id); - // Do not retain the protocol - break false; + break; } } } - }); + } + // }); self.insert_triples_to_storage(triples_to_insert).await; if !errors.is_empty() { diff --git a/chain-signatures/node/src/test_utils.rs b/chain-signatures/node/src/test_utils.rs index 4771441e9..42ffb5938 100644 --- a/chain-signatures/node/src/test_utils.rs +++ b/chain-signatures/node/src/test_utils.rs @@ -85,7 +85,7 @@ impl TestTripleManagers { async fn poke(&mut self, index: usize) -> Result { let mut quiet = true; - let messages = self.managers[index].poke(&self.config.protocol).await; + let messages = self.managers[index].poke().await; for ( participant, ref tm @ TripleMessage { @@ -101,7 +101,7 @@ impl TestTripleManagers { .get_or_generate(id, &self.participants, &self.config.protocol) .unwrap() { - protocol.message(from, data.to_vec()); + protocol.message(from, data.to_vec()).await.unwrap(); } else { println!("Tried to write to completed mailbox {:?}", tm); } diff --git a/integration-tests/chain-signatures/Cargo.lock b/integration-tests/chain-signatures/Cargo.lock index f920c7a93..395beb2cc 100644 --- a/integration-tests/chain-signatures/Cargo.lock +++ b/integration-tests/chain-signatures/Cargo.lock @@ -3753,8 +3753,8 @@ dependencies = [ "near-crypto 0.23.0", "near-fetch", "near-jsonrpc-client 0.10.1", - "near-lake-framework 0.8.0-beta.3 (git+https://github.com/near/near-lake-framework-rs?branch=node/1.40)", - "near-lake-primitives 0.8.0-beta.3 (git+https://github.com/near/near-lake-framework-rs?branch=node/1.40)", + "near-lake-framework", + "near-lake-primitives", "near-primitives 0.23.0", "near-workspaces", "once_cell", @@ -4293,8 +4293,8 @@ dependencies = [ "near-account-id", "near-crypto 0.23.0", "near-fetch", - "near-lake-framework 0.8.0-beta.3 (git+https://github.com/near/near-lake-framework-rs?branch=node/1.40-and-async-run)", - "near-lake-primitives 0.8.0-beta.3 (git+https://github.com/near/near-lake-framework-rs?branch=node/1.40-and-async-run)", + "near-lake-framework", + "near-lake-primitives", "near-primitives 0.23.0", "near-sdk", "once_cell", @@ -4762,15 +4762,6 @@ dependencies = [ "time", ] -[[package]] -name = "near-lake-context-derive" -version = "0.8.0-beta.3" -source = "git+https://github.com/near/near-lake-framework-rs?branch=node/1.40#f1f69235a901f5fd9ff74632ed20c349411de0d7" -dependencies = [ - "quote", - "syn 2.0.66", -] - [[package]] name = "near-lake-context-derive" version = "0.8.0-beta.3" @@ -4780,29 +4771,6 @@ dependencies = [ "syn 2.0.66", ] -[[package]] -name = "near-lake-framework" -version = "0.8.0-beta.3" -source = "git+https://github.com/near/near-lake-framework-rs?branch=node/1.40#f1f69235a901f5fd9ff74632ed20c349411de0d7" -dependencies = [ - "async-stream", - "async-trait", - "aws-config", - "aws-credential-types", - "aws-sdk-s3", - "aws-types", - "derive_builder", - "futures", - "near-lake-context-derive 0.8.0-beta.3 (git+https://github.com/near/near-lake-framework-rs?branch=node/1.40)", - "near-lake-primitives 0.8.0-beta.3 (git+https://github.com/near/near-lake-framework-rs?branch=node/1.40)", - "serde", - "serde_json", - "thiserror", - "tokio", - "tokio-stream", - "tracing", -] - [[package]] name = "near-lake-framework" version = "0.8.0-beta.3" @@ -4816,8 +4784,8 @@ dependencies = [ "aws-types", "derive_builder", "futures", - "near-lake-context-derive 0.8.0-beta.3 (git+https://github.com/near/near-lake-framework-rs?branch=node/1.40-and-async-run)", - "near-lake-primitives 0.8.0-beta.3 (git+https://github.com/near/near-lake-framework-rs?branch=node/1.40-and-async-run)", + "near-lake-context-derive", + "near-lake-primitives", "serde", "serde_json", "thiserror", @@ -4826,22 +4794,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "near-lake-primitives" -version = "0.8.0-beta.3" -source = "git+https://github.com/near/near-lake-framework-rs?branch=node/1.40#f1f69235a901f5fd9ff74632ed20c349411de0d7" -dependencies = [ - "anyhow", - "near-crypto 0.23.0", - "near-indexer-primitives", - "near-primitives 0.23.0", - "near-primitives-core 0.23.0", - "paste", - "serde", - "serde_json", - "thiserror", -] - [[package]] name = "near-lake-primitives" version = "0.8.0-beta.3" diff --git a/integration-tests/chain-signatures/Cargo.toml b/integration-tests/chain-signatures/Cargo.toml index 2f638cbc0..65e6abf3a 100644 --- a/integration-tests/chain-signatures/Cargo.toml +++ b/integration-tests/chain-signatures/Cargo.toml @@ -38,8 +38,8 @@ near-crypto = "0.23.0" near-fetch = "0.5.0" near-jsonrpc-client = "0.10.1" near-primitives = "0.23.0" -near-lake-framework = { git = "https://github.com/near/near-lake-framework-rs", branch = "node/1.40" } -near-lake-primitives = { git = "https://github.com/near/near-lake-framework-rs", branch = "node/1.40" } +near-lake-framework = { git = "https://github.com/near/near-lake-framework-rs", branch = "node/1.40-and-async-run" } +near-lake-primitives = { git = "https://github.com/near/near-lake-framework-rs", branch = "node/1.40-and-async-run" } near-workspaces = { git = "https://github.com/near/near-workspaces-rs", branch = "node/1.40" } # local chain-signatures depencencies