Skip to content

Commit

Permalink
Applies suggestions from code review
Browse files Browse the repository at this point in the history
  • Loading branch information
arya2 committed Feb 6, 2024
1 parent 3484c69 commit 217fcda
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 13 deletions.
2 changes: 1 addition & 1 deletion zebra-scan/src/init.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Initializing the scanner and RPC server.
//! Initializing the scanner and gRPC server.
use color_eyre::Report;
use tower::ServiceBuilder;
Expand Down
8 changes: 5 additions & 3 deletions zebra-scan/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ pub mod scan_task;

pub use scan_task::{ScanTask, ScanTaskCommand};

#[cfg(any(test, feature = "proptest-impl"))]
use std::sync::mpsc::Receiver;

/// Zebra-scan [`tower::Service`]
#[derive(Debug)]
pub struct ScanService {
Expand Down Expand Up @@ -46,10 +49,9 @@ impl ScanService {
}

/// Create a new [`ScanService`] with a mock `ScanTask`
// TODO: Move this to tests behind `cfg(any(test, feature = "proptest-impl"))`
#[cfg(any(test, feature = "proptest-impl"))]
pub fn new_with_mock_scanner(
db: Storage,
) -> (Self, std::sync::mpsc::Receiver<ScanTaskCommand>) {
pub fn new_with_mock_scanner(db: Storage) -> (Self, Receiver<ScanTaskCommand>) {
let (scan_task, cmd_receiver) = ScanTask::mock();
(Self { db, scan_task }, cmd_receiver)
}
Expand Down
1 change: 1 addition & 0 deletions zebra-scan/src/service/scan_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ impl ScanTask {
state: scan::State,
chain_tip_change: ChainTipChange,
) -> Self {
// TODO: Use a bounded channel or move this logic to the scan service or another service.
let (cmd_sender, cmd_receiver) = mpsc::channel();

Self {
Expand Down
5 changes: 3 additions & 2 deletions zebra-scan/src/service/scan_task/commands.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Types and method implementations for [`ScanTask`]
//! Types and method implementations for [`ScanTaskCommand`]
use std::{
collections::HashMap,
Expand Down Expand Up @@ -39,12 +39,13 @@ pub enum ScanTaskCommand {
},

/// Start sending results for key hashes to `result_sender`
// TODO: Implement this command (#8206)
SubscribeResults {
/// Sender for results
result_sender: mpsc::Sender<Arc<Transaction>>,

/// Key hashes to send the results of to result channel
key_hashes: Vec<()>,
keys: Vec<String>,
},
}

Expand Down
12 changes: 8 additions & 4 deletions zebra-scan/src/service/scan_task/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,31 @@
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use tokio::{
sync::mpsc::{UnboundedReceiver, UnboundedSender},
sync::mpsc::{Receiver, Sender},
task::JoinHandle,
};
use tracing::Instrument;
use zebra_chain::BoxError;

use super::scan::ScanRangeTaskBuilder;

const EXECUTOR_BUFFER_SIZE: usize = 100;

pub fn spawn_init() -> (
UnboundedSender<ScanRangeTaskBuilder>,
Sender<ScanRangeTaskBuilder>,
JoinHandle<Result<(), BoxError>>,
) {
let (scan_task_sender, scan_task_receiver) = tokio::sync::mpsc::unbounded_channel();
// TODO: Use a bounded channel.
let (scan_task_sender, scan_task_receiver) = tokio::sync::mpsc::channel(EXECUTOR_BUFFER_SIZE);

(
scan_task_sender,
tokio::spawn(scan_task_executor(scan_task_receiver).in_current_span()),
)
}

pub async fn scan_task_executor(
mut scan_task_receiver: UnboundedReceiver<ScanRangeTaskBuilder>,
mut scan_task_receiver: Receiver<ScanRangeTaskBuilder>,
) -> Result<(), BoxError> {
let mut scan_range_tasks = FuturesUnordered::new();

Expand Down
3 changes: 2 additions & 1 deletion zebra-scan/src/service/scan_task/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ pub async fn start(

scan_task_sender
.send(ScanRangeTaskBuilder::new(height, new_keys, state, storage))
.expect("scan_until_task channel should not be full");
.await
.expect("scan_until_task channel should not be closed");
}

let scanned_height = scan_height_and_store_results(
Expand Down
4 changes: 2 additions & 2 deletions zebra-scan/src/service/scan_task/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async fn scan_task_processes_messages_correctly() -> Result<(), Report> {
.collect(),
)?;

// Check that it updated parsed_keys correctly and returned the right new keys when starting with an empty state
// Check that no key should be added if they are all already known and the heights are the same

let new_keys = ScanTask::process_messages(&cmd_receiver, &mut parsed_keys)?;

Expand Down Expand Up @@ -104,7 +104,7 @@ async fn scan_task_processes_messages_correctly() -> Result<(), Report> {

let new_keys = ScanTask::process_messages(&cmd_receiver, &mut parsed_keys)?;

// Check that it sends the done notification successfully before returning and dropping `done_tx``
// Check that it sends the done notification successfully before returning and dropping `done_tx`
done_rx.await?;

assert!(
Expand Down
1 change: 1 addition & 0 deletions zebrad/tests/acceptance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2965,6 +2965,7 @@ fn scan_start_where_left() -> Result<()> {
/// Test successful registration of a new key in the scan task.
///
/// See [`common::shielded_scan::register_key`] for more information.
// TODO: Add this test to CI (#8236)
#[tokio::test]
#[ignore]
#[cfg(feature = "shielded-scan")]
Expand Down

0 comments on commit 217fcda

Please sign in to comment.