From 51a8fde6b1ff24e5007740b341cdd5bf34177d04 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Fri, 18 Mar 2022 13:44:31 -0300 Subject: [PATCH 1/2] feat: Implement CLI arg for overriding daemon cursor --- src/bin/oura/daemon.rs | 47 ++++++++++++++++++++++++++++++++++-------- src/utils/cursor.rs | 21 +++++++++++++++++-- 2 files changed, 57 insertions(+), 11 deletions(-) diff --git a/src/bin/oura/daemon.rs b/src/bin/oura/daemon.rs index d902f418..9f2986ad 100644 --- a/src/bin/oura/daemon.rs +++ b/src/bin/oura/daemon.rs @@ -10,7 +10,7 @@ use oura::{ BootstrapResult, FilterProvider, PartialBootstrapResult, SinkProvider, SourceProvider, StageReceiver, }, - sources::MagicArg, + sources::{MagicArg, PointArg}, utils::{cursor, metrics, ChainWellKnownInfo, Utils, WithUtils}, Error, }; @@ -178,6 +178,16 @@ fn define_chain_info( } } +fn define_cursor( + explicit: Option, + config: Option, +) -> Option { + match (explicit, config) { + (Some(x), _) => Some(cursor::Config::Memory(x)), + (_, x) => x, + } +} + fn bootstrap_utils( chain: ChainWellKnownInfo, cursor: Option, @@ -197,7 +207,10 @@ fn bootstrap_utils( } /// Sets up the whole pipeline from configuration -fn bootstrap(config: ConfigRoot) -> Result>, Error> { +fn bootstrap( + config: ConfigRoot, + explicit_cursor: Option, +) -> Result>, Error> { let ConfigRoot { source, filters, @@ -211,6 +224,8 @@ fn bootstrap(config: ConfigRoot) -> Result>, Error> { let chain = define_chain_info(chain, &magic)?; + let cursor = define_cursor(explicit_cursor, cursor); + let utils = Arc::new(bootstrap_utils(chain, cursor, metrics)); let mut threads = Vec::with_capacity(10); @@ -240,11 +255,16 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> { false => None, }; + let explicit_cursor = match args.is_present("cursor") { + true => Some(args.value_of_t("cursor")?), + false => None, + }; + let root = ConfigRoot::new(explicit_config)?; debug!("daemon starting with this config: {:?}", root); - let threads = bootstrap(root)?; + let threads = bootstrap(root, explicit_cursor)?; // TODO: refactor into new loop that monitors thread health for handle in threads { @@ -256,10 +276,19 @@ pub fn run(args: &ArgMatches) -> Result<(), Error> { /// Creates the clap definition for this sub-command pub(crate) fn command_definition<'a>() -> clap::Command<'a> { - clap::Command::new("daemon").arg( - clap::Arg::new("config") - .long("config") - .takes_value(true) - .help("config file to load by the daemon"), - ) + clap::Command::new("daemon") + .arg( + clap::Arg::new("config") + .long("config") + .takes_value(true) + .help("config file to load by the daemon"), + ) + .arg( + clap::Arg::new("cursor") + .long("cursor") + .takes_value(true) + .help( + "initial chain cursor, overrides configuration file, expects format `slot,hex-hash`", + ), + ) } diff --git a/src/utils/cursor.rs b/src/utils/cursor.rs index f64583a6..81cd6e40 100644 --- a/src/utils/cursor.rs +++ b/src/utils/cursor.rs @@ -30,22 +30,26 @@ pub struct FileConfig { /// A cursor provider that uses the file system as the source for persistence pub(crate) struct FileStorage(FileConfig); -// TODO: over-engineering a little bit here, leaving room for other -// types of cursor persistence (probably Redis) +/// An ephemeral cursor that lives only in memory +pub(crate) struct MemoryStorage(PointArg); + enum Storage { File(FileStorage), + Memory(MemoryStorage), } impl CanStore for Storage { fn read_cursor(&self) -> Result { match self { Storage::File(x) => x.read_cursor(), + Storage::Memory(x) => x.read_cursor(), } } fn write_cursor(&self, point: PointArg) -> Result<(), Error> { match self { Storage::File(x) => x.write_cursor(point), + Storage::Memory(x) => x.write_cursor(point), } } } @@ -54,6 +58,7 @@ impl CanStore for Storage { #[serde(tag = "type")] pub enum Config { File(FileConfig), + Memory(PointArg), } #[derive(Clone)] @@ -74,6 +79,7 @@ impl Provider { state: RwLock::new(State::Unknown), storage: match config { Config::File(x) => Storage::File(FileStorage(x)), + Config::Memory(x) => Storage::Memory(MemoryStorage(x)), }, } } @@ -147,3 +153,14 @@ impl CanStore for FileStorage { Ok(()) } } + +impl CanStore for MemoryStorage { + fn read_cursor(&self) -> Result { + Ok(self.0.clone()) + } + + fn write_cursor(&self, _point: PointArg) -> Result<(), Error> { + // No operation, memory storage doesn't persist anything + Ok(()) + } +} From a466a0aebcf8cb1f90e21e7d46b3ed53ebb7a276 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Sat, 19 Mar 2022 12:02:26 -0300 Subject: [PATCH 2/2] Add docs for daemon cursor arg --- book/src/usage/daemon.md | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/book/src/usage/daemon.md b/book/src/usage/daemon.md index 19997ba8..c28d99f8 100644 --- a/book/src/usage/daemon.md +++ b/book/src/usage/daemon.md @@ -10,14 +10,30 @@ To start _Oura_ in _daemon mode_, use the following command: oura dameon ``` -By default, _Oura_ will load the configuration values from `/etc/oura/daemon.toml`. +Available options: -If you need to specify a different configuration path, use the following command: +- `--config`: path of a custom toml configuration file to use. If not specified, configuration will be loaded from `/etc/oura/daemon.toml`. +- `--cursor`: a custom point in the chain to use as starting point. Expects format: `slot,hex-hash`. If not specified, it will look for the [cursor](../advanced/stateful_cursor.md) section available via toml configuration or fallback to the [intersect options](../advanced/intersect_options.md) of the source stage. + +Example of starting daemon mode with default config file: + +```sh +# config will be loaded from /etc/oura/daemon.toml +oura daemon +``` + +Example of starting daemon mode with a custom config file at `my_config.toml`: ```sh oura daemon --config my_config.toml ``` +Example of starting daemon mode specifying a particular cursor: + +```sh +oura daemon --cursor 56134714,2d2a5503c16671ac7d5296f8e6bfeee050b2c2900a7d8c97b36c434667eb99d9 +``` + ## Configuration The configuration file needs to specify the source, filters and sink to use in a particular pipeline. The following toml represent the typical skeleton of an _Oura_ config file: