Skip to content

Commit

Permalink
feat(main_loop): Add scheduled task to upgrade tx-proofs in mempool
Browse files Browse the repository at this point in the history
Add a scheduled task that upgrades transaction-proofs for transactions
in the mempool. Only runs on machines that are deemed powerful enough to
generate SingleProofs. Proof collections are upgraded to two single proofs
and single proofs are merged into one single proof.

Cf. 206.
  • Loading branch information
Sword-Smith committed Oct 21, 2024
1 parent 9dd6a14 commit 701b130
Show file tree
Hide file tree
Showing 3 changed files with 243 additions and 19 deletions.
137 changes: 119 additions & 18 deletions src/main_loop.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod proof_upgrader;

use std::collections::HashMap;
use std::net::SocketAddr;
use std::thread::sleep;
Expand All @@ -6,6 +8,7 @@ use std::time::SystemTime;

use anyhow::Result;
use itertools::Itertools;
use proof_upgrader::get_transaction_upgrade_task;
use rand::prelude::IteratorRandom;
use rand::prelude::SliceRandom;
use rand::thread_rng;
Expand Down Expand Up @@ -39,6 +42,8 @@ use crate::models::peer::transaction_notification::TransactionNotification;
use crate::models::peer::HandshakeData;
use crate::models::peer::PeerInfo;
use crate::models::peer::PeerSynchronizationState;
use crate::models::state::tx_proving_capability::TxProvingCapability;
use crate::models::state::GlobalState;
use crate::models::state::GlobalStateLock;
use crate::prelude::twenty_first;

Expand All @@ -48,6 +53,11 @@ const MEMPOOL_PRUNE_INTERVAL_IN_SECS: u64 = 30 * 60; // 30mins
const MP_RESYNC_INTERVAL_IN_SECS: u64 = 59;
const EXPECTED_UTXOS_PRUNE_INTERVAL_IN_SECS: u64 = 19 * 60; // 19 mins

/// Interval for when transaction-upgrade checker is run. Note that this does
/// *not* define how often a transaction-proof upgrade is actually performed.
/// Only how often we check if we're ready to perform an upgrade.
const TRANSACTION_UPGRADE_CHECK_INTERVAL_IN_SECONDS: u64 = 60; // 1 minute

const SANCTION_PEER_TIMEOUT_FACTOR: u64 = 40;
const POTENTIAL_PEER_MAX_COUNT_AS_A_FACTOR_OF_MAX_PEERS: usize = 20;
const STANDARD_BATCH_BLOCK_LOOKBEHIND_SIZE: usize = 100;
Expand All @@ -61,24 +71,6 @@ pub struct MainLoopHandler {
main_to_miner_tx: watch::Sender<MainToMiner>,
}

impl MainLoopHandler {
pub(crate) fn new(
incoming_peer_listener: TcpListener,
global_state_lock: GlobalStateLock,
main_to_peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
peer_task_to_main_tx: mpsc::Sender<PeerTaskToMain>,
main_to_miner_tx: watch::Sender<MainToMiner>,
) -> Self {
Self {
incoming_peer_listener,
global_state_lock,
main_to_miner_tx,
main_to_peer_broadcast_tx,
peer_task_to_main_tx,
}
}
}

/// The mutable part of the main loop function
struct MutableMainLoopState {
sync_state: SyncState,
Expand Down Expand Up @@ -310,6 +302,22 @@ fn stay_in_sync_mode(
}

impl MainLoopHandler {
pub(crate) fn new(
incoming_peer_listener: TcpListener,
global_state_lock: GlobalStateLock,
main_to_peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
peer_task_to_main_tx: mpsc::Sender<PeerTaskToMain>,
main_to_miner_tx: watch::Sender<MainToMiner>,
) -> Self {
Self {
incoming_peer_listener,
global_state_lock,
main_to_miner_tx,
main_to_peer_broadcast_tx,
peer_task_to_main_tx,
}
}

/// Locking:
/// * acquires `global_state_lock` for write
async fn handle_miner_task_message(&mut self, msg: MinerToMain) -> Result<()> {
Expand Down Expand Up @@ -815,6 +823,84 @@ impl MainLoopHandler {
Ok(())
}

/// Scheduled task for upgrading the proofs of transactions in the mempool.
///
/// Will either perform a merge of two transactions supported with single
/// proofs, or will upgrade a transaction proof of the type
/// `ProofCollection` to `SingleProof`.
pub(crate) async fn proof_upgrader(&mut self) -> Result<()> {
fn attempt_upgrade(
global_state_lock: &GlobalState,
now: SystemTime,
tx_upgrade_interval: Option<Duration>,
) -> Result<bool> {
let duration_since_last_upgrade =
now.duration_since(global_state_lock.net.last_tx_proof_upgrade)?;
Ok(!global_state_lock.net.syncing
&& global_state_lock.net.tx_proving_capability == TxProvingCapability::SingleProof
&& tx_upgrade_interval
.is_some_and(|upgrade_interval| duration_since_last_upgrade > upgrade_interval))
}

// Check if it's time to run the proof-upgrader, and if we're capable
// of upgrading a transaction proof.
let tx_upgrade_interval = self.global_state_lock.cli().tx_upgrade_interval();

let new_tx = {
let global_state = self.global_state_lock.lock_guard().await;
let now = SystemTime::now();
if !attempt_upgrade(&global_state, now, tx_upgrade_interval)? {
return Ok(());
}

debug!("Attempting to run transaction-proof-upgrade");

// Find a candidate for proof upgrade
let Some(upgrade_candidate) = get_transaction_upgrade_task(&global_state) else {
debug!("Found no transaction-proof to upgrade");
return Ok(());
};

let affected_txids = upgrade_candidate.affected_txids();
info!(
"Attempting to upgrade transaction proofs of: {}",
affected_txids.iter().join("; ")
);

// Perform the upgrade (expensive)
upgrade_candidate.upgrade().await
};

// Insert the upgraded transactions into the mempool
{
let mut global_state = self.global_state_lock.lock_guard_mut().await;
// Did we receive a new block while proving? If so, throw away the
// result, as it is wasted (it would need an update).

if new_tx.kernel.mutator_set_hash
!= global_state
.chain
.light_state()
.body()
.mutator_set_accumulator
.hash()
{
warn!("Got new block while proving. Discarding result.");
return Ok(());
}

global_state.mempool.insert(&new_tx);
}

// Inform all peers about our hard work
self.main_to_peer_broadcast_tx
.send(MainToPeerTask::TransactionNotification(
(&new_tx).try_into().unwrap(),
))?;

Ok(())
}

pub async fn run(
&mut self,
mut peer_task_to_main_rx: mpsc::Receiver<PeerTaskToMain>,
Expand Down Expand Up @@ -851,6 +937,12 @@ impl MainLoopHandler {
let mp_resync_timer = time::sleep(mp_resync_timer_interval);
tokio::pin!(mp_resync_timer);

// Set transasction-proof-upgrade-checker to run every R secnods.
let tx_proof_upgrade_interval =
Duration::from_secs(TRANSACTION_UPGRADE_CHECK_INTERVAL_IN_SECONDS);
let tx_proof_upgrade_timer = time::sleep(tx_proof_upgrade_interval);
tokio::pin!(tx_proof_upgrade_timer);

// Spawn tasks to monitor for SIGTERM, SIGINT, and SIGQUIT. These
// signals are only used on Unix systems.
let (_tx_term, mut rx_term): (mpsc::Sender<()>, mpsc::Receiver<()>) =
Expand Down Expand Up @@ -1027,6 +1119,15 @@ impl MainLoopHandler {

mp_resync_timer.as_mut().reset(tokio::time::Instant::now() + mp_resync_timer_interval);
}

// Check if it's time to run the proof upgrader
_ = &mut tx_proof_upgrade_timer => {
trace!("Timer: tx-proof-upgrader");
self.proof_upgrader().await?;

tx_proof_upgrade_timer.as_mut().reset(tokio::time::Instant::now() + tx_proof_upgrade_interval);
}

}
}

Expand Down
123 changes: 123 additions & 0 deletions src/main_loop/proof_upgrader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use rand::rngs::StdRng;
use rand::Rng;
use rand::SeedableRng;
use tasm_lib::triton_vm::proof::Proof;
use tracing::error;
use tracing::info;

use crate::models::blockchain::transaction::transaction_kernel::TransactionKernel;
use crate::models::blockchain::transaction::validity::proof_collection::ProofCollection;
use crate::models::blockchain::transaction::validity::single_proof::SingleProof;
use crate::models::blockchain::transaction::validity::single_proof::SingleProofWitness;
use crate::models::blockchain::transaction::Transaction;
use crate::models::blockchain::transaction::TransactionProof;
use crate::models::proof_abstractions::tasm::program::ConsensusProgram;
use crate::models::proof_abstractions::SecretWitness;
use crate::models::state::transaction_kernel_id::TransactionKernelId;
use crate::models::state::GlobalState;

pub(super) enum UpgradeDecision {
ProduceSingleProof {
kernel: TransactionKernel,
proof: ProofCollection,
},
Merge {
left_kernel: TransactionKernel,
single_proof_left: Proof,
right_kernel: TransactionKernel,
single_proof_right: Proof,
shuffle_seed: [u8; 32],
},
}

impl UpgradeDecision {
/// Retrun a list of the transaction IDs that will have their proofs
/// upgraded with this decision.
pub(super) fn affected_txids(&self) -> Vec<TransactionKernelId> {
match self {
UpgradeDecision::ProduceSingleProof { kernel, proof: _ } => vec![kernel.txid()],
UpgradeDecision::Merge {
left_kernel,
single_proof_left: _,
right_kernel,
single_proof_right: _,
shuffle_seed: _,
} => vec![left_kernel.txid(), right_kernel.txid()],
}
}

pub(super) async fn upgrade(&self) -> Transaction {
match self {
UpgradeDecision::ProduceSingleProof { kernel, proof } => {
let single_proof_witness = SingleProofWitness::from_collection(proof.to_owned());
let claim = single_proof_witness.claim();
let nondeterminism = single_proof_witness.nondeterminism();
info!("Proof-upgrader: Start generate single proof");
let single_proof = SingleProof.prove(&claim, nondeterminism).await;
info!("Proof-upgrader: Done");

Transaction {
kernel: kernel.to_owned(),
proof: TransactionProof::SingleProof(single_proof),
}
}
UpgradeDecision::Merge {
left_kernel,
single_proof_left,
right_kernel,
single_proof_right,
shuffle_seed,
} => {
let left = Transaction {
kernel: left_kernel.to_owned(),
proof: TransactionProof::SingleProof(single_proof_left.to_owned()),
};
let right = Transaction {
kernel: right_kernel.to_owned(),
proof: TransactionProof::SingleProof(single_proof_right.to_owned()),
};
info!("Proof-upgrader: Start merging");
let ret = Transaction::merge_with(left, right, shuffle_seed.to_owned()).await;
info!("Proof-upgrader: Done");
ret
}
}
}
}

/// Return an [UpgradeTask] that describes work that can be done to upgrade the
/// proof-quality of a transaction.
pub(super) fn get_transaction_upgrade_task(global_state: &GlobalState) -> Option<UpgradeDecision> {
// Do we have any `ProofCollection`s?
if let Some((kernel, proof)) = global_state.mempool.most_dense_proof_collection() {
let upgrade_decision = UpgradeDecision::ProduceSingleProof {
kernel: kernel.to_owned(),
proof: proof.to_owned(),
};

return Some(upgrade_decision);
}

// Can we merge two single proofs?
if let Some([(left_kernel, left_single_proof), (right_kernel, right_single_proof)]) =
global_state.mempool.most_dense_single_proof_pair()
{
let mut rng: StdRng = SeedableRng::from_seed(global_state.shuffle_seed());
let upgrade_decision = UpgradeDecision::Merge {
left_kernel: left_kernel.to_owned(),
single_proof_left: left_single_proof.to_owned(),
right_kernel: right_kernel.to_owned(),
single_proof_right: right_single_proof.to_owned(),
shuffle_seed: rng.gen(),
};

if left_kernel.mutator_set_hash != right_kernel.mutator_set_hash {
error!("Deprecated transaction found in mempool. Has SingleProof in need of updating. Consider clearing mempool.");
return None;
}

return Some(upgrade_decision);
}

None
}
2 changes: 1 addition & 1 deletion src/models/state/tx_proving_capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::str::FromStr;
use clap::error::ErrorKind;
use clap::Parser;

#[derive(Parser, Debug, Clone, Copy)]
#[derive(Parser, Debug, Clone, Copy, PartialEq, Eq)]
pub enum TxProvingCapability {
LockScript,
ProofCollection,
Expand Down

0 comments on commit 701b130

Please sign in to comment.