Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Background block fetch #495

Merged
merged 7 commits into from
Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 0 additions & 65 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ lazy_static = "1.4.0"
log = "0.4.14"
mime = "0.3.16"
mime_guess = "2.0.4"
rayon = "1.5.1"
redb = "0.8.0"
regex = "1.6.0"
reqwest = { version = "0.11.10", features = ["blocking"] }
Expand Down
43 changes: 11 additions & 32 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use {
bitcoincore_rpc::{json::GetBlockHeaderResult, Auth, Client},
indicatif::{ProgressBar, ProgressStyle},
log::log_enabled,
rayon::iter::{IntoParallelRefIterator, ParallelIterator},
redb::{
Database, MultimapTableDefinition, ReadableMultimapTable, ReadableTable, Table,
TableDefinition, WriteStrategy, WriteTransaction,
Expand Down Expand Up @@ -46,13 +45,15 @@ fn encode_satpoint(satpoint: SatPoint) -> [u8; 44] {
}

pub(crate) struct Index {
auth: Auth,
client: Client,
database: Database,
database_path: PathBuf,
genesis_block_coinbase_transaction: Transaction,
genesis_block_coinbase_txid: Txid,
height_limit: Option<u64>,
reorged: AtomicBool,
genesis_block_coinbase_txid: Txid,
genesis_block_coinbase_transaction: Transaction,
rpc_url: String,
}

#[derive(Debug, PartialEq)]
Expand Down Expand Up @@ -114,8 +115,9 @@ impl Index {
cookie_file.display()
);

let client = Client::new(&rpc_url, Auth::CookieFile(cookie_file))
.context("failed to connect to RPC URL")?;
let auth = Auth::CookieFile(cookie_file);

let client = Client::new(&rpc_url, auth.clone()).context("failed to connect to RPC URL")?;

let data_dir = options.data_dir()?;

Expand Down Expand Up @@ -161,13 +163,15 @@ impl Index {
options.chain.genesis_block().coinbase().unwrap().clone();

Ok(Self {
genesis_block_coinbase_txid: genesis_block_coinbase_transaction.txid(),
auth,
client,
database,
database_path,
genesis_block_coinbase_transaction,
height_limit: options.height_limit,
reorged: AtomicBool::new(false),
genesis_block_coinbase_txid: genesis_block_coinbase_transaction.txid(),
genesis_block_coinbase_transaction,
rpc_url,
})
}

Expand Down Expand Up @@ -327,31 +331,6 @@ impl Index {
)
}

pub(crate) fn block_with_retries(&self, height: u64) -> Result<Option<Block>> {
let mut errors = 0;
loop {
match self.block(height) {
Err(err) => {
if cfg!(test) {
return Err(err);
}

errors += 1;
let seconds = 1 << errors;
log::error!("failed to fetch block {height}, retrying in {seconds}s: {err}");

if seconds > 120 {
log::error!("would sleep for more than 120s, giving up");
return Err(err);
}

thread::sleep(Duration::from_secs(seconds));
}
Ok(result) => return Ok(result),
}
}
}

pub(crate) fn block_header(&self, hash: BlockHash) -> Result<Option<BlockHeader>> {
self.client.get_block_header(&hash).into_option()
}
Expand Down
134 changes: 101 additions & 33 deletions src/index/updater.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::*;
use {super::*, std::sync::mpsc};

pub struct Updater {
cache: HashMap<[u8; 36], Vec<u8>>,
Expand Down Expand Up @@ -47,35 +47,43 @@ impl Updater {
Some(progress_bar)
};

let rx = Self::fetch_blocks_from(
index,
wtx
.open_table(super::HEIGHT_TO_BLOCK_HASH)?
.range(0..)?
.rev()
.next()
.map(|(height, _hash)| height + 1)
.unwrap_or(0),
)?;

let mut uncomitted = 0;
for i in 0.. {
if let Some(height_limit) = index.height_limit {
if self.height > height_limit {
break;
}
}
let block = match rx.recv() {
Ok(block) => block,
Err(mpsc::RecvError) => break,
};

let done = self.index_block(index, &mut wtx)?;
self.index_block(index, &mut wtx, block)?;

if !done {
if let Some(progress_bar) = &mut progress_bar {
progress_bar.inc(1);
if let Some(progress_bar) = &mut progress_bar {
progress_bar.inc(1);

if progress_bar.position() > progress_bar.length().unwrap() {
progress_bar.set_length(index.client.get_block_count()?);
}
if progress_bar.position() > progress_bar.length().unwrap() {
progress_bar.set_length(index.client.get_block_count()?);
}

uncomitted += 1;
}

if uncomitted > 0 && i % 5000 == 0 {
uncomitted += 1;

if i % 5000 == 0 {
self.commit(wtx)?;
wtx = index.begin_write()?;
uncomitted = 0;
}

if done || INTERRUPTS.load(atomic::Ordering::Relaxed) > 0 {
if INTERRUPTS.load(atomic::Ordering::Relaxed) > 0 {
break;
}
}
Expand All @@ -91,7 +99,76 @@ impl Updater {
Ok(())
}

pub(crate) fn index_block(&mut self, index: &Index, wtx: &mut WriteTransaction) -> Result<bool> {
fn fetch_blocks_from(index: &Index, mut height: u64) -> Result<mpsc::Receiver<Block>> {
let (tx, rx) = mpsc::sync_channel(32);

let height_limit = index.height_limit;

let client =
Client::new(&index.rpc_url, index.auth.clone()).context("failed to connect to RPC URL")?;

thread::spawn(move || loop {
if let Some(height_limit) = height_limit {
if height > height_limit {
break;
}
}

match Self::get_block_with_retries(&client, height) {
Ok(Some(block)) => {
if let Err(err) = tx.send(block) {
log::info!("Block receiver disconnected: {err}");
break;
}
height += 1;
}
Ok(None) => break,
Err(err) => {
log::error!("Failed to fetch block {height}: {err}");
break;
}
}
});

Ok(rx)
}

pub(crate) fn get_block_with_retries(client: &Client, height: u64) -> Result<Option<Block>> {
let mut errors = 0;
loop {
match client
.get_block_hash(height)
.into_option()?
.map(|hash| client.get_block(&hash))
.transpose()
{
Err(err) => {
if cfg!(test) {
return Err(err.into());
}

errors += 1;
let seconds = 1 << errors;
log::error!("failed to fetch block {height}, retrying in {seconds}s: {err}");

if seconds > 120 {
log::error!("would sleep for more than 120s, giving up");
return Err(err.into());
}

thread::sleep(Duration::from_secs(seconds));
}
Ok(result) => return Ok(result),
}
}
}

pub(crate) fn index_block(
&mut self,
index: &Index,
wtx: &mut WriteTransaction,
block: Block,
) -> Result<()> {
let mut height_to_block_hash = wtx.open_table(HEIGHT_TO_BLOCK_HASH)?;
let mut ordinal_to_satpoint = wtx.open_table(ORDINAL_TO_SATPOINT)?;
let mut outpoint_to_ordinal_ranges = wtx.open_table(OUTPOINT_TO_ORDINAL_RANGES)?;
Expand All @@ -100,11 +177,6 @@ impl Updater {
let mut ordinal_ranges_written = 0;
let mut outputs_in_block = 0;

let block = match index.block_with_retries(self.height)? {
Some(block) => block,
None => return Ok(true),
};

let time = Utc.timestamp(block.header.time as i64, 0);

log::info!(
Expand All @@ -131,13 +203,9 @@ impl Updater {
coinbase_inputs.push_front((start.n(), (start + h.subsidy()).n()));
}

let txdata = block
.txdata
.par_iter()
.map(|tx| (tx.txid(), tx))
.collect::<Vec<(Txid, &Transaction)>>();
for (tx_offset, tx) in block.txdata.iter().enumerate().skip(1) {
let txid = tx.txid();

for (tx_offset, (txid, tx)) in txdata.iter().enumerate().skip(1) {
log::trace!("Indexing transaction {tx_offset}…");

let mut input_ordinal_ranges = VecDeque::new();
Expand All @@ -163,7 +231,7 @@ impl Updater {
}

self.index_transaction(
*txid,
txid,
tx,
&mut ordinal_to_satpoint,
&mut input_ordinal_ranges,
Expand All @@ -174,9 +242,9 @@ impl Updater {
coinbase_inputs.extend(input_ordinal_ranges);
}

if let Some((txid, tx)) = txdata.first() {
if let Some(tx) = block.coinbase() {
self.index_transaction(
*txid,
tx.txid(),
tx,
&mut ordinal_to_satpoint,
&mut coinbase_inputs,
Expand All @@ -195,7 +263,7 @@ impl Updater {
(Instant::now() - start).as_millis(),
);

Ok(false)
Ok(())
}

pub(crate) fn index_transaction(
Expand Down