Skip to content

Commit

Permalink
add(scan): Implement DeleteKeys ScanService request (#8217)
Browse files Browse the repository at this point in the history
* handle RemoveKeys command in ScanTask

* implements DeleteKeys request

* uses spawn_blocking and updates comments

* removes spawn_init fn

* adds test for new Storage method

* adds fake_sapling_results helper function

* adds test for scan service DeleteKeys request

* adds TODO for unwrap_or_clone

* checks the keys sent to the scan task for removal

* moves message processing to its own function

* adds a timeout for the scanner task response

* hide mock() methods behind cfg(test) or feature

* adds MAX_REQUEST_KEYS constant

* updates test to insert and delete results for a second key

* Update zebra-scan/src/init.rs

Co-authored-by: Alfredo Garcia <[email protected]>

* test that the expected number of results are in the db

* fix unused import lint

* fixes unused imports

---------

Co-authored-by: Alfredo Garcia <[email protected]>
  • Loading branch information
arya2 and oxarbitrage authored Jan 31, 2024
1 parent 768eb90 commit 80827f5
Show file tree
Hide file tree
Showing 13 changed files with 369 additions and 61 deletions.
4 changes: 2 additions & 2 deletions zebra-node-services/src/scan_service/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ pub enum Request {
/// TODO: Accept `ViewingKeyWithHash`es and return Ok(()) if successful or an error
RegisterKeys(Vec<()>),

/// TODO: Accept `KeyHash`es and return Ok(`Vec<KeyHash>`) with hashes of deleted keys
DeleteKeys(Vec<()>),
/// Deletes viewing keys and their results from the database.
DeleteKeys(Vec<String>),

/// TODO: Accept `KeyHash`es and return `Transaction`s
Results(Vec<()>),
Expand Down
3 changes: 3 additions & 0 deletions zebra-node-services/src/scan_service/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub enum Response {
/// Response to Results request
Results(Vec<Transaction>),

/// Response to DeleteKeys request
DeletedKeys,

/// Response to SubscribeResults request
SubscribeResults(mpsc::Receiver<Arc<Transaction>>),
}
1 change: 1 addition & 0 deletions zebra-scan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ categories = ["cryptography::cryptocurrencies"]
[[bin]] # Bin to run the Scanner gRPC server
name = "scanner-grpc-server"
path = "src/bin/rpc_server.rs"
required-features = ["proptest-impl"]

[features]

Expand Down
9 changes: 5 additions & 4 deletions zebra-scan/src/bin/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
use tower::ServiceBuilder;

use zebra_scan::service::ScanService;
use zebra_scan::{service::ScanService, storage::Storage};

#[tokio::main]
/// Runs an RPC server with a mock ScanTask
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (config, network) = Default::default();
let scan_service = ServiceBuilder::new()
.buffer(10)
.service(ScanService::new_with_mock_scanner(&config, network));

let (scan_service, _cmd_receiver) =
ScanService::new_with_mock_scanner(Storage::new(&config, network, false));
let scan_service = ServiceBuilder::new().buffer(10).service(scan_service);

// Start the gRPC server.
zebra_grpc::server::init(scan_service).await?;
Expand Down
62 changes: 37 additions & 25 deletions zebra-scan/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub enum ScanTaskCommand {
done_tx: oneshot::Sender<()>,

/// Key hashes that are to be removed
key_hashes: Vec<()>,
keys: Vec<String>,
},

/// Start sending results for key hashes to `result_sender`
Expand All @@ -36,25 +36,29 @@ pub enum ScanTaskCommand {
},
}

#[derive(Debug)]
#[derive(Debug, Clone)]
/// Scan task handle and command channel sender
pub struct ScanTask {
/// [`JoinHandle`] of scan task
pub handle: JoinHandle<Result<(), Report>>,
pub handle: Arc<JoinHandle<Result<(), Report>>>,

/// Task command channel sender
cmd_sender: mpsc::Sender<ScanTaskCommand>,
pub cmd_sender: mpsc::Sender<ScanTaskCommand>,
}

impl ScanTask {
/// Spawns a new [`ScanTask`] for tests.
pub fn mock() -> Self {
let (cmd_sender, _cmd_receiver) = mpsc::channel();

Self {
handle: tokio::spawn(std::future::pending()),
cmd_sender,
}
#[cfg(any(test, feature = "proptest-impl"))]
pub fn mock() -> (Self, mpsc::Receiver<ScanTaskCommand>) {
let (cmd_sender, cmd_receiver) = mpsc::channel();

(
Self {
handle: Arc::new(tokio::spawn(std::future::pending())),
cmd_sender,
},
cmd_receiver,
)
}

/// Spawns a new [`ScanTask`].
Expand All @@ -64,11 +68,16 @@ impl ScanTask {
state: scan::State,
chain_tip_change: ChainTipChange,
) -> Self {
// TODO: Pass `_cmd_receiver` to `scan::start()` to pass it new keys after it's been spawned
let (cmd_sender, _cmd_receiver) = mpsc::channel();
let (cmd_sender, cmd_receiver) = mpsc::channel();

Self {
handle: scan::spawn_init(config, network, state, chain_tip_change),
handle: Arc::new(scan::spawn_init(
config,
network,
state,
chain_tip_change,
cmd_receiver,
)),
cmd_sender,
}
}
Expand All @@ -80,18 +89,21 @@ impl ScanTask {
) -> Result<(), mpsc::SendError<ScanTaskCommand>> {
self.cmd_sender.send(command)
}
}

/// Initialize the scanner based on its config, and spawn a task for it.
///
/// TODO: add a test for this function.
pub fn spawn_init(
config: &Config,
network: Network,
state: scan::State,
chain_tip_change: ChainTipChange,
) -> JoinHandle<Result<(), Report>> {
scan::spawn_init(config, network, state, chain_tip_change)
/// Sends a message to the scan task to remove the provided viewing keys.
pub fn remove_keys(
&mut self,
keys: &[String],
) -> Result<oneshot::Receiver<()>, mpsc::SendError<ScanTaskCommand>> {
let (done_tx, done_rx) = oneshot::channel();

self.send(ScanTaskCommand::RemoveKeys {
keys: keys.to_vec(),
done_tx,
})?;

Ok(done_rx)
}
}

/// Initialize [`ScanService`] based on its config.
Expand Down
2 changes: 1 addition & 1 deletion zebra-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ pub mod service;
pub mod tests;

pub use config::Config;
pub use init::{init, spawn_init};
pub use init::{init, ScanTask};
69 changes: 64 additions & 5 deletions zebra-scan/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
sync::{
mpsc::{Receiver, TryRecvError},
Arc,
},
time::Duration,
};

Expand Down Expand Up @@ -37,8 +40,9 @@ use zebra_chain::{
use zebra_state::{ChainTipChange, SaplingScannedResult, TransactionIndex};

use crate::{
init::ScanTaskCommand,
storage::{SaplingScanningKey, Storage},
Config,
Config, ScanTask,
};

/// The generic state type used by the scanner.
Expand Down Expand Up @@ -66,6 +70,7 @@ pub async fn start(
state: State,
chain_tip_change: ChainTipChange,
storage: Storage,
cmd_receiver: Receiver<ScanTaskCommand>,
) -> Result<(), Report> {
let network = storage.network();
let sapling_activation_height = storage.min_sapling_birthday_height();
Expand Down Expand Up @@ -102,12 +107,14 @@ pub async fn start(
Ok::<_, Report>((key.clone(), parsed_keys))
})
.try_collect()?;
let parsed_keys = Arc::new(parsed_keys);
let mut parsed_keys = Arc::new(parsed_keys);

// Give empty states time to verify some blocks before we start scanning.
tokio::time::sleep(INITIAL_WAIT).await;

loop {
parsed_keys = ScanTask::process_msgs(&cmd_receiver, parsed_keys)?;

let scanned_height = scan_height_and_store_results(
height,
state.clone(),
Expand All @@ -130,6 +137,56 @@ pub async fn start(
}
}

impl ScanTask {
/// Accepts the scan task's `parsed_key` collection and a reference to the command channel receiver
///
/// Processes messages in the scan task channel, updating `parsed_keys` if required.
///
/// Returns the updated `parsed_keys`
fn process_msgs(
cmd_receiver: &Receiver<ScanTaskCommand>,
mut parsed_keys: Arc<
HashMap<SaplingScanningKey, (Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>)>,
>,
) -> Result<
Arc<HashMap<SaplingScanningKey, (Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>)>>,
Report,
> {
loop {
let cmd = match cmd_receiver.try_recv() {
Ok(cmd) => cmd,

Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
// Return early if the sender has been dropped.
return Err(eyre!("command channel disconnected"));
}
};

match cmd {
ScanTaskCommand::RemoveKeys { done_tx, keys } => {
// TODO: Replace with Arc::unwrap_or_clone() when it stabilises:
// https://github.com/rust-lang/rust/issues/93610
let mut updated_parsed_keys =
Arc::try_unwrap(parsed_keys).unwrap_or_else(|arc| (*arc).clone());

for key in keys {
updated_parsed_keys.remove(&key);
}

parsed_keys = Arc::new(updated_parsed_keys);

// Ignore send errors for the done notification
let _ = done_tx.send(());
}

_ => continue,
}
}

Ok(parsed_keys)
}
}
/// Get the block at `height` from `state`, scan it with the keys in `parsed_keys`, and store the
/// results in `storage`. If `height` is lower than the `key_birthdays` for that key, skip it.
///
Expand Down Expand Up @@ -445,11 +502,12 @@ pub fn spawn_init(
network: Network,
state: State,
chain_tip_change: ChainTipChange,
cmd_receiver: Receiver<ScanTaskCommand>,
) -> JoinHandle<Result<(), Report>> {
let config = config.clone();

// TODO: spawn an entirely new executor here, to avoid timing attacks.
tokio::spawn(init(config, network, state, chain_tip_change).in_current_span())
tokio::spawn(init(config, network, state, chain_tip_change, cmd_receiver).in_current_span())
}

/// Initialize the scanner based on its config.
Expand All @@ -460,11 +518,12 @@ pub async fn init(
network: Network,
state: State,
chain_tip_change: ChainTipChange,
cmd_receiver: Receiver<ScanTaskCommand>,
) -> Result<(), Report> {
let storage = tokio::task::spawn_blocking(move || Storage::new(&config, network, false))
.wait_for_panics()
.await;

// TODO: add more tasks here?
start(state, chain_tip_change, storage).await
start(state, chain_tip_change, storage, cmd_receiver).await
}
62 changes: 51 additions & 11 deletions zebra-scan/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! [`tower::Service`] for zebra-scan.
use std::{future::Future, pin::Pin, task::Poll};
use std::{future::Future, pin::Pin, task::Poll, time::Duration};

use futures::future::FutureExt;
use tower::Service;
Expand All @@ -10,16 +10,25 @@ use zebra_state::ChainTipChange;

use crate::{init::ScanTask, scan, storage::Storage, Config, Request, Response};

#[cfg(test)]
mod tests;

/// Zebra-scan [`tower::Service`]
#[derive(Debug)]
pub struct ScanService {
/// On-disk storage
db: Storage,
pub db: Storage,

/// Handle to scan task that's responsible for writing results
scan_task: ScanTask,
}

/// A timeout applied to `DeleteKeys` requests.
const DELETE_KEY_TIMEOUT: Duration = Duration::from_secs(15);

/// The maximum number of keys that may be included in a request to the scan service
const MAX_REQUEST_KEYS: usize = 1000;

impl ScanService {
/// Create a new [`ScanService`].
pub fn new(
Expand All @@ -35,11 +44,15 @@ impl ScanService {
}

/// Create a new [`ScanService`] with a mock `ScanTask`
pub fn new_with_mock_scanner(config: &Config, network: Network) -> Self {
Self {
db: Storage::new(config, network, false),
scan_task: ScanTask::mock(),
}
#[cfg(any(test, feature = "proptest-impl"))]
pub fn new_with_mock_scanner(
db: Storage,
) -> (
Self,
std::sync::mpsc::Receiver<crate::init::ScanTaskCommand>,
) {
let (scan_task, cmd_receiver) = ScanTask::mock();
(Self { db, scan_task }, cmd_receiver)
}
}

Expand Down Expand Up @@ -84,10 +97,37 @@ impl Service<Request> for ScanService {
// - send new keys to scan task
}

Request::DeleteKeys(_key_hashes) => {
// TODO:
// - delete these keys and their results from db
// - send deleted keys to scan task
Request::DeleteKeys(keys) => {
let mut db = self.db.clone();
let mut scan_task = self.scan_task.clone();

return async move {
if keys.len() > MAX_REQUEST_KEYS {
return Err(format!(
"maximum number of keys per request is {MAX_REQUEST_KEYS}"
)
.into());
}

// Wait for a message to confirm that the scan task has removed the key up to `DELETE_KEY_TIMEOUT`
let remove_keys_result =
tokio::time::timeout(DELETE_KEY_TIMEOUT, scan_task.remove_keys(&keys)?)
.await
.map_err(|_| "timeout waiting for delete keys done notification");

// Delete the key from the database after either confirmation that it's been removed from the scan task, or
// waiting `DELETE_KEY_TIMEOUT`.
let delete_key_task = tokio::task::spawn_blocking(move || {
db.delete_sapling_results(keys);
});

// Return timeout errors or `RecvError`s, or wait for the key to be deleted from the database.
remove_keys_result??;
delete_key_task.await?;

Ok(Response::DeletedKeys)
}
.boxed();
}

Request::Results(_key_hashes) => {
Expand Down
Loading

0 comments on commit 80827f5

Please sign in to comment.