Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: signature request order and retrying #779

Merged
merged 6 commits into from
Jul 31, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chain-signatures/contract/src/config/impls.rs
Original file line number Diff line number Diff line change
@@ -68,6 +68,7 @@ impl Default for SignatureConfig {
fn default() -> Self {
Self {
generation_timeout: secs_to_ms(45),
generation_timeout_total: secs_to_ms(200),
garbage_timeout: hours_to_ms(24),

other: Default::default(),
5 changes: 5 additions & 0 deletions chain-signatures/contract/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -83,6 +83,10 @@ pub struct PresignatureConfig {
pub struct SignatureConfig {
/// Timeout for signature generation in milliseconds.
pub generation_timeout: u64,
/// Timeout for signature generation in milliseconds. This is the total timeout for
/// the signature generation process. Mainly used to include the whole generation of
/// signatures including their retries up till this timeout.
pub generation_timeout_total: u64,
/// Garbage collection timeout in milliseconds for signatures generated.
pub garbage_timeout: u64,

@@ -115,6 +119,7 @@ mod tests {
},
"signature": {
"generation_timeout": 10000,
"generation_timeout_total": 1000000,
"garbage_timeout": 10000000
},
"string": "value",
114 changes: 63 additions & 51 deletions chain-signatures/node/src/indexer.rs
Original file line number Diff line number Diff line change
@@ -3,7 +3,6 @@ use crate::gcp::GcpService;
use crate::kdf;
use crate::protocol::{SignQueue, SignRequest};
use crate::types::LatestBlockHeight;
use anyhow::Context as _;
use crypto_shared::{derive_epsilon, ScalarExt};
use k256::Scalar;
use near_account_id::AccountId;
@@ -150,62 +149,75 @@ async fn handle_block(
let mut pending_requests = Vec::new();
for action in block.actions().cloned().collect::<Vec<_>>() {
if action.receiver_id() == ctx.mpc_contract_id {
let receipt =
anyhow::Context::with_context(block.receipt_by_id(&action.receipt_id()), || {
format!(
"indexer unable to find block for receipt_id={}",
action.receipt_id()
)
})?;
let Some(receipt) = block.receipt_by_id(&action.receipt_id()) else {
let err = format!(
"indexer unable to find block for receipt_id={}",
action.receipt_id()
);
tracing::warn!("{err}");
anyhow::bail!(err);
};
let ExecutionStatus::SuccessReceiptId(receipt_id) = receipt.status() else {
continue;
};
if let Some(function_call) = action.as_function_call() {
if function_call.method_name() == "sign" {
if let Ok(arguments) =
serde_json::from_slice::<'_, SignArguments>(function_call.args())
{
if receipt.logs().is_empty() {
tracing::warn!("`sign` did not produce entropy");
let Some(function_call) = action.as_function_call() else {
continue;
};
if function_call.method_name() == "sign" {
let arguments =
match serde_json::from_slice::<'_, SignArguments>(function_call.args()) {
Ok(arguments) => arguments,
Err(err) => {
tracing::warn!(%err, "failed to parse `sign` arguments");
continue;
}
let payload = Scalar::from_bytes(arguments.request.payload)
.context("Payload cannot be converted to scalar, not in k256 field")?;
let Ok(entropy) = serde_json::from_str::<'_, [u8; 32]>(&receipt.logs()[1])
else {
tracing::warn!(
"`sign` did not produce entropy correctly: {:?}",
receipt.logs()[0]
);
continue;
};
let epsilon =
derive_epsilon(&action.predecessor_id(), &arguments.request.path);
let delta = kdf::derive_delta(receipt_id, entropy);
tracing::info!(
receipt_id = %receipt_id,
caller_id = receipt.predecessor_id().to_string(),
our_account = ctx.node_account_id.to_string(),
payload = hex::encode(arguments.request.payload),
key_version = arguments.request.key_version,
entropy = hex::encode(entropy),
"indexed new `sign` function call"
);
let request = ContractSignRequest {
payload,
path: arguments.request.path,
key_version: arguments.request.key_version,
};
pending_requests.push(SignRequest {
receipt_id,
request,
epsilon,
delta,
entropy,
time_added: Instant::now(),
});
}
};

if receipt.logs().is_empty() {
tracing::warn!("`sign` did not produce entropy");
continue;
}

let Some(payload) = Scalar::from_bytes(arguments.request.payload) else {
tracing::warn!(
"`sign` did not produce payload correctly: {:?}",
arguments.request.payload,
);
continue;
};

let Ok(entropy) = serde_json::from_str::<'_, [u8; 32]>(&receipt.logs()[1]) else {
tracing::warn!(
"`sign` did not produce entropy correctly: {:?}",
receipt.logs()[0]
);
continue;
};
let epsilon = derive_epsilon(&action.predecessor_id(), &arguments.request.path);
let delta = kdf::derive_delta(receipt_id, entropy);
tracing::info!(
receipt_id = %receipt_id,
caller_id = receipt.predecessor_id().to_string(),
our_account = ctx.node_account_id.to_string(),
payload = hex::encode(arguments.request.payload),
key_version = arguments.request.key_version,
entropy = hex::encode(entropy),
"indexed new `sign` function call"
);
let request = ContractSignRequest {
payload,
path: arguments.request.path,
key_version: arguments.request.key_version,
};
pending_requests.push(SignRequest {
receipt_id,
request,
epsilon,
delta,
entropy,
// TODO: use indexer timestamp instead.
time_added: Instant::now(),
});
}
}
}
6 changes: 4 additions & 2 deletions chain-signatures/node/src/protocol/cryptography.rs
Original file line number Diff line number Diff line change
@@ -438,8 +438,10 @@ impl CryptographicProtocol for RunningState {
crate::metrics::SIGN_QUEUE_SIZE
.with_label_values(&[my_account_id.as_str()])
.set(sign_queue.len() as i64);
sign_queue.organize(self.threshold, &stable, ctx.me().await, &my_account_id);
let my_requests = sign_queue.my_requests(ctx.me().await);
let me = ctx.me().await;
sign_queue.organize(self.threshold, &stable, me, &my_account_id);

let my_requests = sign_queue.my_requests(me);
crate::metrics::SIGN_QUEUE_MINE_SIZE
.with_label_values(&[my_account_id.as_str()])
.set(my_requests.len() as i64);
168 changes: 114 additions & 54 deletions chain-signatures/node/src/protocol/signature.rs
Original file line number Diff line number Diff line change
@@ -35,10 +35,43 @@ pub struct SignRequest {
pub time_added: Instant,
}

/// Type that preserves the insertion order of requests.
#[derive(Default)]
pub struct ParticipantRequests {
requests: HashMap<ReceiptId, SignRequest>,
order: VecDeque<ReceiptId>,
}

impl ParticipantRequests {
fn insert(&mut self, receipt_id: ReceiptId, request: SignRequest) {
self.requests.insert(receipt_id, request);
self.order.push_back(receipt_id);
}

fn contains_key(&self, receipt_id: &ReceiptId) -> bool {
self.requests.contains_key(receipt_id)
}

pub fn len(&self) -> usize {
self.requests.len()
}

pub fn is_empty(&self) -> bool {
self.len() == 0
}

pub fn pop_front(&mut self) -> Option<(ReceiptId, SignRequest)> {
let receipt_id = self.order.pop_front()?;
self.requests
.remove(&receipt_id)
.map(|req| (receipt_id, req))
}
}

#[derive(Default)]
pub struct SignQueue {
unorganized_requests: Vec<SignRequest>,
requests: HashMap<Participant, HashMap<ReceiptId, SignRequest>>,
requests: HashMap<Participant, ParticipantRequests>,
}

impl SignQueue {
@@ -116,7 +149,7 @@ impl SignQueue {
participant_requests.contains_key(&receipt_id)
}

pub fn my_requests(&mut self, me: Participant) -> &mut HashMap<ReceiptId, SignRequest> {
pub fn my_requests(&mut self, me: Participant) -> &mut ParticipantRequests {
self.requests.entry(me).or_default()
}
}
@@ -133,6 +166,7 @@ pub struct SignatureGenerator {
pub sign_request_timestamp: Instant,
pub generator_timestamp: Instant,
pub timeout: Duration,
pub timeout_total: Duration,
}

impl SignatureGenerator {
@@ -146,7 +180,7 @@ impl SignatureGenerator {
epsilon: Scalar,
delta: Scalar,
sign_request_timestamp: Instant,
timeout: u64,
cfg: &ProtocolConfig,
) -> Self {
Self {
protocol,
@@ -158,15 +192,22 @@ impl SignatureGenerator {
delta,
sign_request_timestamp,
generator_timestamp: Instant::now(),
timeout: Duration::from_millis(timeout),
timeout: Duration::from_millis(cfg.signature.generation_timeout),
timeout_total: Duration::from_millis(cfg.signature.generation_timeout_total),
}
}

pub fn poke(&mut self) -> Result<Action<FullSignature<Secp256k1>>, ProtocolError> {
if self.sign_request_timestamp.elapsed() > self.timeout_total {
return Err(ProtocolError::Other(
anyhow::anyhow!("signature protocol timeout completely").into(),
));
}

if self.generator_timestamp.elapsed() > self.timeout {
tracing::info!(self.presignature_id, "signature protocol timed out");
tracing::warn!(self.presignature_id, "signature protocol timed out");
return Err(ProtocolError::Other(
anyhow::anyhow!("signature protocol timed out").into(),
anyhow::anyhow!("signature protocol timeout").into(),
));
}

@@ -247,14 +288,15 @@ impl SignatureManager {
}

#[allow(clippy::too_many_arguments)]
#[allow(clippy::result_large_err)]
fn generate_internal(
participants: &Participants,
me: Participant,
public_key: PublicKey,
presignature: Presignature,
req: GenerationRequest,
timeout: u64,
) -> Result<SignatureGenerator, InitializationError> {
cfg: &ProtocolConfig,
) -> Result<SignatureGenerator, (Presignature, InitializationError)> {
let participants = participants.keys_vec();
let GenerationRequest {
proposer,
@@ -270,49 +312,55 @@ impl SignatureManager {
k: k * delta.invert().unwrap(),
sigma: (sigma + epsilon * k) * delta.invert().unwrap(),
};
let protocol = Box::new(cait_sith::sign(
&participants,
me,
derive_key(public_key, epsilon),
output,
request.payload,
)?);
let presignature_id = presignature.id;
let protocol = Box::new(
cait_sith::sign(
&participants,
me,
derive_key(public_key, epsilon),
output,
request.payload,
)
.map_err(|err| (presignature, err))?,
);
Ok(SignatureGenerator::new(
protocol,
participants,
proposer,
presignature.id,
presignature_id,
request,
epsilon,
delta,
sign_request_timestamp,
timeout,
cfg,
))
}

#[allow(clippy::result_large_err)]
fn retry_failed_generation(
&mut self,
receipt_id: ReceiptId,
req: GenerationRequest,
presignature: Presignature,
participants: &Participants,
timeout: u64,
) -> Result<(), InitializationError> {
cfg: &ProtocolConfig,
) -> Result<(), (Presignature, InitializationError)> {
tracing::info!(receipt_id = %receipt_id, participants = ?participants.keys_vec(), "restarting failed protocol to generate signature");
let generator = Self::generate_internal(
participants,
self.me,
self.public_key,
presignature,
req,
timeout,
cfg,
)?;
self.generators.insert(receipt_id, generator);
Ok(())
}

/// Starts a new presignature generation protocol.
#[allow(clippy::too_many_arguments)]
#[allow(clippy::result_large_err)]
pub fn generate(
&mut self,
participants: &Participants,
@@ -322,8 +370,8 @@ impl SignatureManager {
epsilon: Scalar,
delta: Scalar,
sign_request_timestamp: Instant,
timeout: u64,
) -> Result<(), InitializationError> {
cfg: &ProtocolConfig,
) -> Result<(), (Presignature, InitializationError)> {
tracing::info!(
%receipt_id,
me = ?self.me,
@@ -343,7 +391,7 @@ impl SignatureManager {
delta,
sign_request_timestamp,
},
timeout,
cfg,
)?;
self.generators.insert(receipt_id, generator);
Ok(())
@@ -392,7 +440,7 @@ impl SignatureManager {
Err(err) => return Err(err),
};
tracing::info!(me = ?self.me, presignature_id, "found presignature: ready to start signature generation");
let generator = Self::generate_internal(
let generator = match Self::generate_internal(
participants,
self.me,
self.public_key,
@@ -404,8 +452,15 @@ impl SignatureManager {
delta,
sign_request_timestamp: Instant::now(),
},
cfg.signature.generation_timeout,
)?;
cfg,
) {
Ok(generator) => generator,
Err((presignature, err @ InitializationError::BadParameters(_))) => {
presignature_manager.insert_mine(presignature);
tracing::warn!(%receipt_id, presignature_id, ?err, "failed to start signature generation");
return Err(GenerationError::CaitSithInitializationError(err));
}
};
let generator = entry.insert(generator);
Ok(&mut generator.protocol)
}
@@ -424,20 +479,25 @@ impl SignatureManager {
let action = match generator.poke() {
Ok(action) => action,
Err(err) => {
tracing::warn!(?err, "signature failed to be produced; pushing request back into failed queue");
if generator.proposer == self.me {
// only retry the signature generation if it was initially proposed by us. We do not
// want any nodes to be proposing the same signature multiple times.
self.failed.push_back((
*receipt_id,
GenerationRequest {
proposer: generator.proposer,
request: generator.request.clone(),
epsilon: generator.epsilon,
delta: generator.delta,
sign_request_timestamp: generator.sign_request_timestamp
},
));
if generator.sign_request_timestamp.elapsed() < generator.timeout_total {
tracing::warn!(?err, "signature failed to be produced; pushing request back into failed queue");
// only retry the signature generation if it was initially proposed by us. We do not
// want any nodes to be proposing the same signature multiple times.
self.failed.push_back((
*receipt_id,
GenerationRequest {
proposer: generator.proposer,
request: generator.request.clone(),
epsilon: generator.epsilon,
delta: generator.delta,
sign_request_timestamp: generator.sign_request_timestamp
},
));
} else {
self.completed.insert(*receipt_id, Instant::now());
tracing::warn!(?err, "signature failed to be produced; trashing request");
}
}
break false;
}
@@ -513,7 +573,7 @@ impl SignatureManager {
&mut self,
threshold: usize,
stable: &Participants,
my_requests: &mut HashMap<ReceiptId, SignRequest>,
my_requests: &mut ParticipantRequests,
presignature_manager: &mut PresignatureManager,
cfg: &ProtocolConfig,
) {
@@ -550,14 +610,17 @@ impl SignatureManager {
// when the request made it into the NEAR network.
// issue: https://github.com/near/mpc-recovery/issues/596
if let Some((receipt_id, failed_req)) = self.failed.pop_front() {
if let Err(err) = self.retry_failed_generation(
receipt_id,
failed_req,
presignature,
&sig_participants,
cfg.signature.generation_timeout,
) {
if let Err((presignature, InitializationError::BadParameters(err))) = self
.retry_failed_generation(
receipt_id,
failed_req,
presignature,
&sig_participants,
cfg,
)
{
tracing::warn!(%receipt_id, presig_id, ?err, "failed to retry signature generation: trashing presignature");
failed_presigs.push(presignature);
continue;
}

@@ -568,24 +631,21 @@ impl SignatureManager {
}
}

let Some(receipt_id) = my_requests.keys().next().cloned() else {
let Some((receipt_id, my_request)) = my_requests.pop_front() else {
failed_presigs.push(presignature);
continue;
};
let Some(my_request) = my_requests.remove(&receipt_id) else {
failed_presigs.push(presignature);
continue;
};
if let Err(err) = self.generate(
if let Err((presignature, InitializationError::BadParameters(err))) = self.generate(
&sig_participants,
receipt_id,
presignature,
my_request.request,
my_request.epsilon,
my_request.delta,
my_request.time_added,
cfg.signature.generation_timeout,
cfg,
) {
failed_presigs.push(presignature);
tracing::warn!(%receipt_id, presig_id, ?err, "failed to start signature generation: trashing presignature");
continue;
}