diff --git a/zebra-scan/src/init.rs b/zebra-scan/src/init.rs index 0cdfb176fa3..360014348cd 100644 --- a/zebra-scan/src/init.rs +++ b/zebra-scan/src/init.rs @@ -1,4 +1,4 @@ -//! Initializing the scanner and RPC server. +//! Initializing the scanner and gRPC server. use color_eyre::Report; use tower::ServiceBuilder; diff --git a/zebra-scan/src/service.rs b/zebra-scan/src/service.rs index 0ff52c6c01a..3dee20f787d 100644 --- a/zebra-scan/src/service.rs +++ b/zebra-scan/src/service.rs @@ -18,6 +18,9 @@ pub mod scan_task; pub use scan_task::{ScanTask, ScanTaskCommand}; +#[cfg(any(test, feature = "proptest-impl"))] +use std::sync::mpsc::Receiver; + /// Zebra-scan [`tower::Service`] #[derive(Debug)] pub struct ScanService { @@ -46,10 +49,9 @@ impl ScanService { } /// Create a new [`ScanService`] with a mock `ScanTask` + // TODO: Move this to tests behind `cfg(any(test, feature = "proptest-impl"))` #[cfg(any(test, feature = "proptest-impl"))] - pub fn new_with_mock_scanner( - db: Storage, - ) -> (Self, std::sync::mpsc::Receiver) { + pub fn new_with_mock_scanner(db: Storage) -> (Self, Receiver) { let (scan_task, cmd_receiver) = ScanTask::mock(); (Self { db, scan_task }, cmd_receiver) } diff --git a/zebra-scan/src/service/scan_task.rs b/zebra-scan/src/service/scan_task.rs index e72e22a490f..f149b3834c0 100644 --- a/zebra-scan/src/service/scan_task.rs +++ b/zebra-scan/src/service/scan_task.rs @@ -37,6 +37,7 @@ impl ScanTask { state: scan::State, chain_tip_change: ChainTipChange, ) -> Self { + // TODO: Use a bounded channel or move this logic to the scan service or another service. let (cmd_sender, cmd_receiver) = mpsc::channel(); Self { diff --git a/zebra-scan/src/service/scan_task/commands.rs b/zebra-scan/src/service/scan_task/commands.rs index c52976b55b3..58d9bd88d7e 100644 --- a/zebra-scan/src/service/scan_task/commands.rs +++ b/zebra-scan/src/service/scan_task/commands.rs @@ -1,4 +1,4 @@ -//! Types and method implementations for [`ScanTask`] +//! Types and method implementations for [`ScanTaskCommand`] use std::{ collections::HashMap, @@ -39,12 +39,13 @@ pub enum ScanTaskCommand { }, /// Start sending results for key hashes to `result_sender` + // TODO: Implement this command (#8206) SubscribeResults { /// Sender for results result_sender: mpsc::Sender>, /// Key hashes to send the results of to result channel - key_hashes: Vec<()>, + keys: Vec, }, } diff --git a/zebra-scan/src/service/scan_task/executor.rs b/zebra-scan/src/service/scan_task/executor.rs index fcd983e88af..4f0c0086099 100644 --- a/zebra-scan/src/service/scan_task/executor.rs +++ b/zebra-scan/src/service/scan_task/executor.rs @@ -2,7 +2,7 @@ use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; use tokio::{ - sync::mpsc::{UnboundedReceiver, UnboundedSender}, + sync::mpsc::{Receiver, Sender}, task::JoinHandle, }; use tracing::Instrument; @@ -10,11 +10,15 @@ use zebra_chain::BoxError; use super::scan::ScanRangeTaskBuilder; +const EXECUTOR_BUFFER_SIZE: usize = 100; + pub fn spawn_init() -> ( - UnboundedSender, + Sender, JoinHandle>, ) { - let (scan_task_sender, scan_task_receiver) = tokio::sync::mpsc::unbounded_channel(); + // TODO: Use a bounded channel. + let (scan_task_sender, scan_task_receiver) = tokio::sync::mpsc::channel(EXECUTOR_BUFFER_SIZE); + ( scan_task_sender, tokio::spawn(scan_task_executor(scan_task_receiver).in_current_span()), @@ -22,7 +26,7 @@ pub fn spawn_init() -> ( } pub async fn scan_task_executor( - mut scan_task_receiver: UnboundedReceiver, + mut scan_task_receiver: Receiver, ) -> Result<(), BoxError> { let mut scan_range_tasks = FuturesUnordered::new(); diff --git a/zebra-scan/src/service/scan_task/scan.rs b/zebra-scan/src/service/scan_task/scan.rs index 8a1ae5b447a..9af1e3ef97d 100644 --- a/zebra-scan/src/service/scan_task/scan.rs +++ b/zebra-scan/src/service/scan_task/scan.rs @@ -135,7 +135,8 @@ pub async fn start( scan_task_sender .send(ScanRangeTaskBuilder::new(height, new_keys, state, storage)) - .expect("scan_until_task channel should not be full"); + .await + .expect("scan_until_task channel should not be closed"); } let scanned_height = scan_height_and_store_results( diff --git a/zebra-scan/src/service/scan_task/tests/vectors.rs b/zebra-scan/src/service/scan_task/tests/vectors.rs index 17bdcea2cca..2d9d46063e2 100644 --- a/zebra-scan/src/service/scan_task/tests/vectors.rs +++ b/zebra-scan/src/service/scan_task/tests/vectors.rs @@ -44,7 +44,7 @@ async fn scan_task_processes_messages_correctly() -> Result<(), Report> { .collect(), )?; - // Check that it updated parsed_keys correctly and returned the right new keys when starting with an empty state + // Check that no key should be added if they are all already known and the heights are the same let new_keys = ScanTask::process_messages(&cmd_receiver, &mut parsed_keys)?; @@ -104,7 +104,7 @@ async fn scan_task_processes_messages_correctly() -> Result<(), Report> { let new_keys = ScanTask::process_messages(&cmd_receiver, &mut parsed_keys)?; - // Check that it sends the done notification successfully before returning and dropping `done_tx`` + // Check that it sends the done notification successfully before returning and dropping `done_tx` done_rx.await?; assert!( diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index 16807638ecb..91ea1c8ac71 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -2965,6 +2965,7 @@ fn scan_start_where_left() -> Result<()> { /// Test successful registration of a new key in the scan task. /// /// See [`common::shielded_scan::register_key`] for more information. +// TODO: Add this test to CI (#8236) #[tokio::test] #[ignore] #[cfg(feature = "shielded-scan")]