Skip to content

Commit

Permalink
Improve bitcoin-da service
Browse files Browse the repository at this point in the history
  • Loading branch information
rakanalh committed Sep 23, 2024
1 parent 4a51289 commit 8c63cba
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 114 deletions.
4 changes: 2 additions & 2 deletions bin/citrea/src/rollup/bitcoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ impl RollupBlueprint for BitcoinRollup {
// until forced transactions are implemented,
// require_wallet_check is set false for full nodes.
if require_wallet_check {
// spawn_da_queue only for sequencer and prover
Arc::clone(&service).spawn_da_queue(rx);
// run only for sequencer and prover
Arc::clone(&service).run(rx);
}
Ok(service)
}
Expand Down
2 changes: 1 addition & 1 deletion bin/citrea/tests/bitcoin_e2e/tests/prover_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl TestCase for SkipPreprovenCommitmentsTest {
.await
.unwrap(),
);
bitcoin_da_service.clone().spawn_da_queue(rx);
bitcoin_da_service.clone().run(rx);

// Generate 1 FINALIZED DA block.
da.generate(1 + FINALITY_DEPTH, None).await?;
Expand Down
224 changes: 113 additions & 111 deletions crates/bitcoin-da/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::path::PathBuf;
use std::sync::Arc;

use anyhow::{bail, Context, Result};
// use std::sync::Arc;
use async_trait::async_trait;
use backoff::future::retry as retry_backoff;
use backoff::ExponentialBackoff;
Expand All @@ -26,6 +25,7 @@ use sov_rollup_interface::da::{DaData, DaDataBatchProof, DaDataLightClient, DaSp
use sov_rollup_interface::services::da::{DaService, SenderWithNotifier};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot::channel as oneshot_channel;
use tokio::{select, signal};
use tracing::{debug, error, info, instrument, trace};

use crate::helpers::builders::batch_proof_namespace::{
Expand Down Expand Up @@ -53,16 +53,15 @@ use crate::spec::{BitcoinSpec, RollupParams};
use crate::verifier::BitcoinVerifier;
use crate::REVEAL_OUTPUT_AMOUNT;

/// A service that provides data and data availability proofs for Bitcoin
#[derive(Debug)]
pub struct BitcoinService {
client: Client,
network: bitcoin::Network,
da_private_key: Option<SecretKey>,
reveal_light_client_prefix: Vec<u8>,
reveal_batch_prover_prefix: Vec<u8>,
inscribes_queue: UnboundedSender<Option<SenderWithNotifier<TxidWrapper>>>,
tx_backup_dir: PathBuf,
pub const FINALITY_DEPTH: u64 = 8; // blocks
const POLLING_INTERVAL: u64 = 10; // seconds

#[derive(PartialEq, Eq, PartialOrd, Ord, core::hash::Hash)]
pub struct TxidWrapper(Txid);
impl From<TxidWrapper> for [u8; 32] {
fn from(val: TxidWrapper) -> Self {
val.0.to_byte_array()
}
}

/// Runtime configuration for the DA service
Expand All @@ -83,8 +82,17 @@ pub struct BitcoinServiceConfig {
pub tx_backup_dir: String,
}

pub const FINALITY_DEPTH: u64 = 8; // blocks
const POLLING_INTERVAL: u64 = 10; // seconds
/// A service that provides data and data availability proofs for Bitcoin
#[derive(Debug)]
pub struct BitcoinService {
client: Client,
network: bitcoin::Network,
da_private_key: Option<SecretKey>,
reveal_light_client_prefix: Vec<u8>,
reveal_batch_prover_prefix: Vec<u8>,
inscribes_queue: UnboundedSender<Option<SenderWithNotifier<TxidWrapper>>>,
tx_backup_dir: PathBuf,
}

impl BitcoinService {
// Create a new instance of the DA service from the given configuration.
Expand Down Expand Up @@ -168,88 +176,91 @@ impl BitcoinService {
})
}

pub fn spawn_da_queue(
pub fn run(
self: Arc<Self>,
mut rx: UnboundedReceiver<Option<SenderWithNotifier<TxidWrapper>>>,
) {
// This is a queue of inscribe requests
tokio::task::spawn_blocking(|| {
tokio::runtime::Handle::current().block_on(async move {
let mut prev_utxo = match self.get_prev_utxo().await {
Ok(Some(prev_utxo)) => Some(prev_utxo),
Ok(None) => {
info!("No pending transactions found");
None
}
Err(e) => {
error!(?e, "Failed to get pending transactions");
None
}
};
tokio::spawn(async move {
let mut prev_utxo = match self.get_prev_utxo().await {
Ok(Some(prev_utxo)) => Some(prev_utxo),
Ok(None) => {
info!("No pending transactions found");
None
}
Err(e) => {
error!(?e, "Failed to get pending transactions");
None
}
};

trace!("BitcoinDA queue is initialized. Waiting for the first request...");

// We execute commit and reveal txs one by one to chain them
while let Some(request_opt) = rx.recv().await {
match request_opt {
Some(request) => {
trace!("A new request is received");
let prev = prev_utxo.take();
loop {
// Build and send tx with retries:
let fee_sat_per_vbyte = match self.get_fee_rate().await {
Ok(rate) => rate,
Err(e) => {
error!(?e, "Failed to call get_fee_rate. Retrying...");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};
match self
.send_transaction_with_fee_rate(
prev.clone(),
request.da_data.clone(),
fee_sat_per_vbyte,
)
.await
{
Ok(tx) => {
let tx_id = TxidWrapper(tx.id);
info!(%tx.id, "Sent tx to BitcoinDA");
prev_utxo = Some(UTXO {
tx_id: tx.id,
vout: 0,
script_pubkey: tx.tx.output[0]
.script_pubkey
.to_hex_string(),
address: None,
amount: tx.tx.output[0].value.to_sat(),
confirmations: 0,
spendable: true,
solvable: true,
});

let _ = request.notify.send(Ok(tx_id));
}
Err(e) => {
error!(?e, "Failed to send transaction to DA layer");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
trace!("BitcoinDA queue is initialized. Waiting for the first request...");

loop {
select! {
request_opt = rx.recv() => {
if let Some(request_opt) = request_opt {
match request_opt {
Some(request) => {
trace!("A new request is received");
let prev = prev_utxo.take();
loop {
// Build and send tx with retries:
let fee_sat_per_vbyte = match self.get_fee_rate().await {
Ok(rate) => rate,
Err(e) => {
error!(?e, "Failed to call get_fee_rate. Retrying...");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};
match self
.send_transaction_with_fee_rate(
prev.clone(),
request.da_data.clone(),
fee_sat_per_vbyte,
)
.await
{
Ok(tx) => {
let tx_id = TxidWrapper(tx.id);
info!(%tx.id, "Sent tx to BitcoinDA");
prev_utxo = Some(UTXO {
tx_id: tx.id,
vout: 0,
script_pubkey: tx.tx.output[0].script_pubkey.to_hex_string(),
address: None,
amount: tx.tx.output[0].value.to_sat(),
confirmations: 0,
spendable: true,
solvable: true,
});

let _ = request.notify.send(Ok(tx_id));
}
Err(e) => {
error!(?e, "Failed to send transaction to DA layer");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
}
break;
}
}
break;
}
}

None => {
info!("Shutdown signal received. Stopping BitcoinDA queue.");
break;
None => {
info!("Shutdown signal received. Stopping BitcoinDA queue.");
break;
}
}
}
},
_ = signal::ctrl_c() => {
return;
}
}
}

error!("BitcoinDA queue stopped");
});
error!("BitcoinDA queue stopped");
});
}

Expand Down Expand Up @@ -541,30 +552,6 @@ impl BitcoinService {
}
}

#[derive(PartialEq, Eq, PartialOrd, Ord, core::hash::Hash)]
pub struct TxidWrapper(Txid);
impl From<TxidWrapper> for [u8; 32] {
fn from(val: TxidWrapper) -> Self {
val.0.to_byte_array()
}
}

fn calculate_witness_root(txdata: &[TransactionWrapper]) -> [u8; 32] {
let hashes = txdata
.iter()
.enumerate()
.map(|(i, t)| {
if i == 0 {
// Replace the first hash with zeroes.
Wtxid::all_zeros().to_raw_hash().to_byte_array()
} else {
t.compute_wtxid().to_raw_hash().to_byte_array()
}
})
.collect();
BitcoinMerkleTree::new(hashes).root()
}

#[async_trait]
impl DaService for BitcoinService {
type Spec = BitcoinSpec;
Expand Down Expand Up @@ -977,6 +964,22 @@ pub fn get_relevant_blobs_from_txs(
relevant_txs
}

fn calculate_witness_root(txdata: &[TransactionWrapper]) -> [u8; 32] {
let hashes = txdata
.iter()
.enumerate()
.map(|(i, t)| {
if i == 0 {
// Replace the first hash with zeroes.
Wtxid::all_zeros().to_raw_hash().to_byte_array()
} else {
t.compute_wtxid().to_raw_hash().to_byte_array()
}
})
.collect();
BitcoinMerkleTree::new(hashes).root()
}

#[cfg(test)]
mod tests {
use core::str::FromStr;
Expand Down Expand Up @@ -1043,7 +1046,6 @@ mod tests {
.expect("Error initialazing BitcoinService");

let da_service = Arc::new(da_service);
// da_service.clone().spawn_da_queue(_rx);
#[allow(clippy::let_and_return)]
da_service
}
Expand Down Expand Up @@ -1075,7 +1077,7 @@ mod tests {

let da_service = Arc::new(da_service);

da_service.clone().spawn_da_queue(rx);
da_service.clone().run(rx);

da_service
}
Expand Down Expand Up @@ -1107,7 +1109,7 @@ mod tests {

let da_service = Arc::new(da_service);

da_service.clone().spawn_da_queue(rx);
da_service.clone().run(rx);

da_service
}
Expand Down

0 comments on commit 8c63cba

Please sign in to comment.