Skip to content

Commit

Permalink
Merge pull request #242 from andrewwhitehead/feat/fast-catchup
Browse files Browse the repository at this point in the history
Faster pool refresh
  • Loading branch information
swcurran authored Dec 1, 2023
2 parents a241ca4 + 23df22c commit 9f2ac86
Show file tree
Hide file tree
Showing 23 changed files with 382 additions and 294 deletions.
14 changes: 8 additions & 6 deletions indy-vdr-proxy/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use super::AppState;
use indy_vdr::common::error::prelude::*;
use indy_vdr::ledger::identifiers::{CredentialDefinitionId, RevocationRegistryId, SchemaId};
use indy_vdr::pool::helpers::{perform_get_txn, perform_ledger_request};
use indy_vdr::pool::{LedgerType, Pool, PreparedRequest, RequestResult, TimingResult};
use indy_vdr::pool::{
LedgerType, Pool, PreparedRequest, RequestResult, RequestResultMeta, TimingResult,
};
use indy_vdr::resolver::did::DidUrl;
use indy_vdr::resolver::PoolResolver as Resolver;
use indy_vdr::utils::did::DidValue;
Expand All @@ -34,16 +36,16 @@ enum ResponseType {
Resolver(String),
}

impl<T> From<(RequestResult<T>, Option<TimingResult>)> for ResponseType
impl<T> From<(RequestResult<T>, RequestResultMeta)> for ResponseType
where
T: std::fmt::Display,
{
fn from(result: (RequestResult<T>, Option<TimingResult>)) -> ResponseType {
fn from(result: (RequestResult<T>, RequestResultMeta)) -> ResponseType {
match result {
(RequestResult::Reply(message), timing) => {
ResponseType::RequestReply(message.to_string(), timing)
(RequestResult::Reply(message), meta) => {
ResponseType::RequestReply(message.to_string(), meta.timing)
}
(RequestResult::Failed(err), timing) => ResponseType::RequestFailed(err, timing),
(RequestResult::Failed(err), meta) => ResponseType::RequestFailed(err, meta.timing),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion indy-vdr-proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ async fn refresh_pool(
tokio::time::sleep(Duration::from_secs((delay_mins * 60 / n_pools) as u64)).await
}

let (txns, _timing) = perform_refresh(pool).await?;
let (txns, _meta) = perform_refresh(pool).await?;

let cloned_state = state.clone();
let pool_states = &cloned_state.borrow().pool_states;
Expand Down
2 changes: 1 addition & 1 deletion libindy_vdr/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "indy-vdr"
version = "0.4.0"
version = "0.4.1"
authors = [
"Hyperledger Indy Contributors <[email protected]>",
]
Expand Down
6 changes: 3 additions & 3 deletions libindy_vdr/src/config/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use once_cell::sync::Lazy;

use crate::pool::ProtocolVersion;

pub const DEFAULT_ACK_TIMEOUT: i64 = 20;
pub const DEFAULT_REPLY_TIMEOUT: i64 = 60;
pub const DEFAULT_ACK_TIMEOUT: i64 = 5;
pub const DEFAULT_REPLY_TIMEOUT: i64 = 30;
pub const DEFAULT_CONN_ACTIVE_TIMEOUT: i64 = 5;
pub const DEFAULT_CONN_REQUEST_LIMIT: usize = 5;
pub const DEFAULT_CONN_REQUEST_LIMIT: usize = 10;
pub const DEFAULT_REQUEST_READ_NODES: usize = 2;
pub const DEFAULT_FRESHNESS_TIMEOUT: u64 = 300;
pub const DEFAULT_PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::Node1_4;
Expand Down
8 changes: 4 additions & 4 deletions libindy_vdr/src/ffi/pool.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::common::error::prelude::*;
use crate::common::handle::ResourceHandle;
use crate::pool::{
PoolBuilder, PoolRunner, PoolTransactions, RequestMethod, RequestResult, TimingResult,
PoolBuilder, PoolRunner, PoolTransactions, RequestMethod, RequestResult, RequestResultMeta,
};

use std::collections::{btree_map::Entry, BTreeMap, HashMap};
Expand Down Expand Up @@ -107,7 +107,7 @@ pub extern "C" fn indy_vdr_pool_refresh(
pool.refresh(Box::new(
move |result| {
let errcode = match result {
Ok((old_txns, new_txns, _timing)) => {
Ok((old_txns, new_txns, _meta)) => {
if let Some(new_txns) = new_txns {
// We must spawn a new thread here because this callback
// is being run in the PoolRunner's thread, and if we drop
Expand Down Expand Up @@ -227,10 +227,10 @@ pub extern "C" fn indy_vdr_pool_get_verifiers(
}

fn handle_request_result(
result: VdrResult<(RequestResult<String>, Option<TimingResult>)>,
result: VdrResult<(RequestResult<String>, RequestResultMeta)>,
) -> (ErrorCode, String) {
match result {
Ok((reply, _timing)) => match reply {
Ok((reply, _meta)) => match reply {
RequestResult::Reply(body) => (ErrorCode::Success, body),
RequestResult::Failed(err) => {
let code = ErrorCode::from(err.kind());
Expand Down
2 changes: 1 addition & 1 deletion libindy_vdr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
//! // Create a new GET_TXN request and dispatch it
//! let ledger_type = 1; // 1 identifies the Domain ledger, see pool::LedgerType
//! let seq_no = 1; // Transaction sequence number
//! let (result, _timing) = block_on(perform_get_txn(&pool, ledger_type, seq_no)).unwrap();
//! let (result, _meta) = block_on(perform_get_txn(&pool, ledger_type, seq_no)).unwrap();
#![cfg_attr(feature = "fatal_warnings", deny(warnings))]
#![recursion_limit = "1024"] // for select! macro usage
Expand Down
4 changes: 3 additions & 1 deletion libindy_vdr/src/pool/genesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ impl std::fmt::Debug for PoolTransactions {

impl std::fmt::Display for PoolTransactions {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let vec_json = unwrap_or_return!(self.encode_json(), Err(std::fmt::Error {}));
let Ok(vec_json) = self.encode_json() else {
return Err(std::fmt::Error {});
};
let txns = SJsonValue::from(vec_json);
write!(f, "{}", txns)
}
Expand Down
53 changes: 22 additions & 31 deletions libindy_vdr/src/pool/handlers/catchup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,58 +4,49 @@ use crate::common::error::prelude::*;
use crate::common::merkle_tree::MerkleTree;

use super::types::Message;
use super::{check_cons_proofs, PoolRequest, RequestEvent, RequestResult, TimingResult};
use super::{check_cons_proofs, PoolRequest, RequestEvent, RequestResult, RequestResultMeta};

pub async fn handle_catchup_request<R: PoolRequest>(
request: &mut R,
merkle_tree: MerkleTree,
target_mt_root: Vec<u8>,
target_mt_size: usize,
) -> VdrResult<(RequestResult<Vec<Vec<u8>>>, Option<TimingResult>)> {
) -> VdrResult<(RequestResult<Vec<Vec<u8>>>, RequestResultMeta)> {
trace!("catchup request");
let config = request.pool_config();
let ack_timeout = config.ack_timeout;
request.send_to_any(config.request_read_nodes, ack_timeout)?;
loop {
match request.next().await {
Some(RequestEvent::Received(node_alias, _message, parsed)) => {
match parsed {
Message::CatchupRep(cr) => {
match process_catchup_reply(
&merkle_tree,
&target_mt_root,
target_mt_size,
cr.load_txns()?,
cr.consProof.clone(),
) {
Ok(txns) => {
return Ok((RequestResult::Reply(txns), request.get_timing()))
}
Err(_) => {
request.clean_timeout(node_alias)?;
request.send_to_any(1, ack_timeout)?;
}
Some(RequestEvent::Received(node_alias, _message, parsed)) => match parsed {
Message::CatchupRep(cr) => {
match process_catchup_reply(
&merkle_tree,
&target_mt_root,
target_mt_size,
cr.load_txns()?,
cr.consProof.clone(),
) {
Ok(txns) => return Ok((RequestResult::Reply(txns), request.get_meta())),
Err(_) => {
request.clean_timeout(node_alias)?;
request.send_to_any(1, ack_timeout)?;
}
}
_ => {
// FIXME could be more tolerant of ReqNACK etc
return Ok((
RequestResult::Failed(err_msg(
VdrErrorKind::Connection,
"Unexpected response",
)),
request.get_timing(),
));
}
}
}
_ => {
debug!("Unexpected reply from {}", &node_alias);
request.clean_timeout(node_alias)?;
request.send_to_any(1, ack_timeout)?;
}
},
Some(RequestEvent::Timeout(_node_alias)) => {
request.send_to_any(1, ack_timeout)?;
}
None => {
return Ok((
RequestResult::Failed(VdrErrorKind::PoolTimeout.into()),
request.get_timing(),
request.get_meta(),
))
}
}
Expand Down
71 changes: 38 additions & 33 deletions libindy_vdr/src/pool/handlers/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::utils::base64;
use super::types::Message;
use super::{
min_consensus, ConsensusState, HashableValue, PoolRequest, ReplyState, RequestEvent,
RequestResult, TimingResult,
RequestResult, RequestResultMeta,
};

pub async fn handle_consensus_request<R: PoolRequest>(
Expand All @@ -21,7 +21,7 @@ pub async fn handle_consensus_request<R: PoolRequest>(
state_proof_timestamps: (Option<u64>, Option<u64>),
as_read_request: bool,
custom_state_proof_parser: Option<&BoxedSPParser>,
) -> VdrResult<(RequestResult<String>, Option<TimingResult>)> {
) -> VdrResult<(RequestResult<String>, RequestResultMeta)> {
trace!("consensus request");
let config = request.pool_config();
let node_keys = request.node_keys();
Expand Down Expand Up @@ -72,40 +72,45 @@ pub async fn handle_consensus_request<R: PoolRequest>(
.clone(),
)
};
if cnt > f
|| (request_with_state_proof
&& check_state_proof(
result,
f,
&DEFAULT_GENERATOR,
&node_keys,
&raw_msg,
state_proof_key.as_deref(),
state_proof_timestamps,
last_write_time,
config.freshness_threshold,
custom_state_proof_parser,
))
{
if state_proof_key.is_some() {
if request_with_state_proof {
let sp_result = check_state_proof(
result,
f,
&DEFAULT_GENERATOR,
&node_keys,
&raw_msg,
state_proof_key.as_deref(),
state_proof_timestamps,
last_write_time,
config.freshness_threshold,
custom_state_proof_parser,
);
let verified = sp_result.is_verified();
request.set_state_proof_result(node_alias.clone(), sp_result);
if verified {
debug!(
"State proof verification succeeded for node: {}, sp_key: '{}'",
node_alias,
base64::encode(state_proof_key.as_ref().unwrap()),
);
} else {
debug!(
"State proof verification failed for node: {}, sp_key: '{}'",
node_alias,
base64::encode(state_proof_key.as_ref().unwrap()),
);
}
return Ok((
RequestResult::Reply(if cnt > f { soonest } else { raw_msg }),
request.get_timing(),
));
} else if state_proof_key.is_some() {
debug!(
"State proof verification failed for node: {}, sp_key: '{}'",
node_alias,
base64::encode(state_proof_key.as_ref().unwrap()),
);
request.clean_timeout(node_alias)?;
true
if verified || cnt > f {
return Ok((
RequestResult::Reply(if verified { raw_msg } else { soonest }),
request.get_meta(),
));
} else {
request.clean_timeout(node_alias)?;
true
}
} else if cnt > f {
return Ok((RequestResult::Reply(soonest), request.get_meta()));
} else {
false
}
Expand All @@ -132,7 +137,7 @@ pub async fn handle_consensus_request<R: PoolRequest>(
RequestResult::Failed(
VdrErrorKind::PoolRequestFailed(raw_msg).into(),
),
request.get_timing(),
request.get_meta(),
));
}
}
Expand All @@ -156,14 +161,14 @@ pub async fn handle_consensus_request<R: PoolRequest>(
VdrErrorKind::PoolTimeout,
"Request was interrupted",
)),
request.get_timing(),
request.get_meta(),
))
}
};
let total_replies = replies.len();
if total_replies >= total_nodes_count {
let err = replies.get_error();
return Ok((RequestResult::Failed(err), request.get_timing()));
return Ok((RequestResult::Failed(err), request.get_meta()));
}
if resend {
request.send_to_any(1, config.ack_timeout)?;
Expand Down
8 changes: 4 additions & 4 deletions libindy_vdr/src/pool/handlers/full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use futures_util::stream::StreamExt;

use crate::common::error::prelude::*;

use super::types::{Message, NodeReplies, RequestResult, TimingResult};
use super::types::{Message, NodeReplies, RequestResult, RequestResultMeta};
use super::{PoolRequest, ReplyState, RequestEvent};

pub async fn handle_full_request<R: PoolRequest>(
request: &mut R,
nodes_to_send: Option<Vec<String>>,
local_timeout: Option<i64>,
) -> VdrResult<(RequestResult<NodeReplies<String>>, Option<TimingResult>)> {
) -> VdrResult<(RequestResult<NodeReplies<String>>, RequestResultMeta)> {
trace!("full request");
let timeout = local_timeout.unwrap_or(request.pool_config().reply_timeout);
let req_reply_count = if let Some(nodes) = nodes_to_send {
Expand Down Expand Up @@ -46,12 +46,12 @@ pub async fn handle_full_request<R: PoolRequest>(
VdrErrorKind::PoolTimeout,
"Request was interrupted",
)),
request.get_timing(),
request.get_meta(),
))
}
};
if replies.len() == req_reply_count {
return Ok((RequestResult::Reply(replies.result()), request.get_timing()));
return Ok((RequestResult::Reply(replies.result()), request.get_meta()));
}
}
}
11 changes: 4 additions & 7 deletions libindy_vdr/src/pool/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::utils::{base58, ValidationError};
use super::requests::{PoolRequest, RequestEvent};
use super::types::{
self, CatchupReq, LedgerStatus, LedgerType, Message, NodeReplies, ProtocolVersion,
RequestResult, SingleReply, TimingResult,
RequestResult, RequestResultMeta, SingleReply,
};

mod catchup;
Expand Down Expand Up @@ -119,16 +119,13 @@ impl<K: Eq + Hash, T: Eq + Hash> ConsensusState<K, T> {
}
}

fn max_entry(&self) -> Option<(&K, usize)> {
self.inner
.iter()
.map(|(key, set)| (key, set.len()))
.max_by_key(|entry| entry.1)
fn max_entry(&self) -> Option<(&K, &HashSet<T>)> {
self.inner.iter().max_by_key(|entry| entry.1.len())
}

#[allow(dead_code)]
fn max_len(&self) -> usize {
self.max_entry().map(|entry| entry.1).unwrap_or(0)
self.max_entry().map(|entry| entry.1.len()).unwrap_or(0)
}

pub fn insert(&mut self, key: K, reply: T) -> &mut HashSet<T> {
Expand Down
Loading

0 comments on commit 9f2ac86

Please sign in to comment.