Skip to content

Commit

Permalink
Make get_blocks() and get_block() yieldy
Browse files Browse the repository at this point in the history
When these methods reach out to Blockstore, yield the thread
  • Loading branch information
steveluscher committed Dec 13, 2024
1 parent 326c8f6 commit 710f9c6
Showing 1 changed file with 47 additions and 16 deletions.
63 changes: 47 additions & 16 deletions rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1182,15 +1182,19 @@ impl JsonRpcRequestProcessor {
Ok(())
}

fn check_slot_cleaned_up<T>(
async fn check_slot_cleaned_up<T>(
&self,
result: &std::result::Result<T, BlockstoreError>,
slot: Slot,
) -> Result<()> {
let first_available_block = self
.blockstore
.get_first_available_block()
.unwrap_or_default();
.runtime
.spawn_blocking({
let blockstore = Arc::clone(&self.blockstore);
move || blockstore.get_first_available_block().unwrap_or_default()
})
.await
.expect("Failed to spawn blocking task");
let err: Error = RpcCustomError::BlockCleanedUp {
slot,
first_available_block,
Expand Down Expand Up @@ -1255,7 +1259,14 @@ impl JsonRpcRequestProcessor {
.highest_super_majority_root()
{
self.check_blockstore_writes_complete(slot)?;
let result = self.blockstore.get_rooted_block(slot, true);
let result = self
.runtime
.spawn_blocking({
let blockstore = Arc::clone(&self.blockstore);
move || blockstore.get_rooted_block(slot, true)
})
.await
.expect("Failed to spawn blocking task");
self.check_blockstore_root(&result, slot)?;
let encode_block = |confirmed_block: ConfirmedBlock| -> Result<UiConfirmedBlock> {
let mut encoded_block = confirmed_block
Expand All @@ -1275,7 +1286,7 @@ impl JsonRpcRequestProcessor {
return bigtable_result.ok().map(encode_block).transpose();
}
}
self.check_slot_cleaned_up(&result, slot)?;
self.check_slot_cleaned_up(&result, slot).await?;
return result
.ok()
.map(ConfirmedBlock::from)
Expand All @@ -1286,7 +1297,14 @@ impl JsonRpcRequestProcessor {
let confirmed_bank = self.bank(Some(CommitmentConfig::confirmed()));
if confirmed_bank.status_cache_ancestors().contains(&slot) {
self.check_blockstore_writes_complete(slot)?;
let result = self.blockstore.get_complete_block(slot, true);
let result = self
.runtime
.spawn_blocking({
let blockstore = Arc::clone(&self.blockstore);
move || blockstore.get_complete_block(slot, true)
})
.await
.expect("Failed to spawn blocking task");
return result
.ok()
.map(ConfirmedBlock::from)
Expand Down Expand Up @@ -1361,9 +1379,13 @@ impl JsonRpcRequestProcessor {
}

let lowest_blockstore_slot = self
.blockstore
.get_first_available_block()
.unwrap_or_default();
.runtime
.spawn_blocking({
let blockstore = Arc::clone(&self.blockstore);
move || blockstore.get_first_available_block().unwrap_or_default()
})
.await
.expect("Failed to spawn blocking task");
if start_slot < lowest_blockstore_slot {
// If the starting slot is lower than what's available in blockstore assume the entire
// [start_slot..end_slot] can be fetched from BigTable. This range should not ever run
Expand All @@ -1387,11 +1409,20 @@ impl JsonRpcRequestProcessor {

// Finalized blocks
let mut blocks: Vec<_> = self
.blockstore
.rooted_slot_iterator(max(start_slot, lowest_blockstore_slot))
.map_err(|_| Error::internal_error())?
.filter(|&slot| slot <= end_slot && slot <= highest_super_majority_root)
.collect();
.runtime
.spawn_blocking({
let blockstore = Arc::clone(&self.blockstore);
move || -> Result<Vec<_>> {
let blocks = blockstore
.rooted_slot_iterator(max(start_slot, lowest_blockstore_slot))
.map_err(|_| Error::internal_error())?
.filter(|&slot| slot <= end_slot && slot <= highest_super_majority_root)
.collect();
Ok(blocks)
}
})
.await
.expect("Failed to spawn blocking task")?;
let last_element = blocks
.last()
.cloned()
Expand Down Expand Up @@ -1514,7 +1545,7 @@ impl JsonRpcRequestProcessor {
.and_then(|confirmed_block| confirmed_block.block_time));
}
}
self.check_slot_cleaned_up(&result, slot)?;
self.check_slot_cleaned_up(&result, slot).await?;
Ok(result.ok())
} else {
let r_bank_forks = self.bank_forks.read().unwrap();
Expand Down

0 comments on commit 710f9c6

Please sign in to comment.