From 536138bedbf24d4a068d68ae5da436202d4b26b8 Mon Sep 17 00:00:00 2001 From: Will Wang Date: Mon, 31 Jul 2023 15:52:25 -0400 Subject: [PATCH 01/16] Expose tap EventFormatter and run --- src/tap/cmd.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/tap/cmd.rs b/src/tap/cmd.rs index 1b7c0a999b281..8525de61914e9 100644 --- a/src/tap/cmd.rs +++ b/src/tap/cmd.rs @@ -82,7 +82,7 @@ pub(crate) async fn cmd(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode exitcode::OK } -async fn run( +pub async fn run( url: Url, opts: &super::Opts, outputs_patterns: Vec, @@ -93,7 +93,7 @@ async fn run( Err(e) => { #[allow(clippy::print_stderr)] { - eprintln!("[tap] Couldn't connect to Vector API via WebSockets: {}", e); + eprintln!("[tap] Couldn't connect to API via WebSockets: {}", e); } return exitcode::UNAVAILABLE; } @@ -142,7 +142,7 @@ async fn run( } #[derive(Clone)] -struct EventFormatter { +pub struct EventFormatter { meta: bool, format: TapEncodingFormat, component_id_label: ColoredString, From 018ea687791c6057f8db13b049638d8b75a90ec8 Mon Sep 17 00:00:00 2001 From: Will Wang Date: Mon, 31 Jul 2023 15:55:14 -0400 Subject: [PATCH 02/16] Shorten comment --- src/tap/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tap/mod.rs b/src/tap/mod.rs index 5864660b444f9..792a0c430032c 100644 --- a/src/tap/mod.rs +++ b/src/tap/mod.rs @@ -12,7 +12,7 @@ pub struct Opts { #[arg(default_value = "500", short = 'i', long)] interval: u32, - /// Vector GraphQL API server endpoint + /// GraphQL API server endpoint #[arg(short, long)] url: Option, @@ -44,7 +44,7 @@ pub struct Opts { #[arg(short, long)] meta: bool, - /// Whether to reconnect if the underlying Vector API connection drops. By default, tap will attempt to reconnect if the connection drops. + /// Whether to reconnect if the underlying API connection drops. By default, tap will attempt to reconnect if the connection drops. #[arg(short, long)] no_reconnect: bool, } From b26006254a54040fa54e0667874900a186ae688f Mon Sep 17 00:00:00 2001 From: Will Wang Date: Mon, 31 Jul 2023 16:04:23 -0400 Subject: [PATCH 03/16] Add default_graphql_url --- src/config/api.rs | 8 ++++++++ src/tap/cmd.rs | 9 ++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/config/api.rs b/src/config/api.rs index 7024b08ee8586..0429f6be3662c 100644 --- a/src/config/api.rs +++ b/src/config/api.rs @@ -1,5 +1,6 @@ use std::net::{Ipv4Addr, SocketAddr}; +use url::Url; use vector_config::configurable_component; /// API options. @@ -41,6 +42,13 @@ pub fn default_address() -> Option { Some(SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 8686)) } +/// Default GraphQL API address +pub fn default_graphql_url() -> Url { + let addr = default_address().unwrap(); + Url::parse(&format!("http://{}/graphql", addr)) + .expect("Couldn't parse default API URL. Please report this.") +} + const fn default_playground() -> bool { true } diff --git a/src/tap/cmd.rs b/src/tap/cmd.rs index 8525de61914e9..7b2ab5e5c993a 100644 --- a/src/tap/cmd.rs +++ b/src/tap/cmd.rs @@ -26,11 +26,10 @@ pub(crate) async fn cmd(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode // Use the provided URL as the Vector GraphQL API server, or default to the local port // provided by the API config. This will work despite `api` and `api-client` being distinct // features; the config is available even if `api` is disabled. - let mut url = opts.url.clone().unwrap_or_else(|| { - let addr = config::api::default_address().unwrap(); - Url::parse(&format!("http://{}/graphql", addr)) - .expect("Couldn't parse default API URL. Please report this.") - }); + let mut url = opts + .url + .clone() + .unwrap_or_else(config::api::default_graphql_url); // Return early with instructions for enabling the API if the endpoint isn't reachable // via a healthcheck. From 04b86169cdbb72574fb920996da6badce6cda738 Mon Sep 17 00:00:00 2001 From: Will Wang Date: Mon, 31 Jul 2023 16:15:01 -0400 Subject: [PATCH 04/16] Move healthcheck error outside --- lib/vector-api-client/src/client.rs | 34 ++++++----------------------- src/tap/cmd.rs | 15 ++++++++++++- src/top/cmd.rs | 30 ++++++++++++++++--------- 3 files changed, 41 insertions(+), 38 deletions(-) diff --git a/lib/vector-api-client/src/client.rs b/lib/vector-api-client/src/client.rs index 01159be74d01f..bacaf9dfcf374 100644 --- a/lib/vector-api-client/src/client.rs +++ b/lib/vector-api-client/src/client.rs @@ -1,8 +1,9 @@ use anyhow::Context; use graphql_client::GraphQLQuery; -use indoc::indoc; use url::Url; +use crate::gql::HealthQueryExt; + /// Wrapped `Result` type, that returns deserialized GraphQL response data. pub type QueryResult = anyhow::Result::ResponseData>>; @@ -19,32 +20,11 @@ impl Client { Self { url } } - pub async fn new_with_healthcheck(url: Url) -> Option { - #![allow(clippy::print_stderr)] - - use crate::gql::HealthQueryExt; - - // Create a new API client for connecting to the local/remote Vector instance. - let client = Self::new(url.clone()); - - // Check that the GraphQL server is reachable - match client.health_query().await { - Ok(_) => Some(client), - _ => { - eprintln!( - indoc! {" - Vector API server isn't reachable ({}). - - Have you enabled the API? - - To enable the API, add the following to your `vector.toml` config file: - - [api] - enabled = true"}, - url - ); - None - } + /// Send a health query + pub async fn healthcheck(&self) -> Result<(), ()> { + match self.health_query().await { + Ok(_) => Ok(()), + Err(_) => Err(()), } } diff --git a/src/tap/cmd.rs b/src/tap/cmd.rs index 7b2ab5e5c993a..9434c1358242d 100644 --- a/src/tap/cmd.rs +++ b/src/tap/cmd.rs @@ -33,7 +33,20 @@ pub(crate) async fn cmd(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode // Return early with instructions for enabling the API if the endpoint isn't reachable // via a healthcheck. - if Client::new_with_healthcheck(url.clone()).await.is_none() { + let client = Client::new(url.clone()); + if client.healthcheck().await.is_err() { + eprintln!( + indoc::indoc! {" + Vector API server isn't reachable ({}). + + Have you enabled the API? + + To enable the API, add the following to your `vector.toml` config file: + + [api] + enabled = true"}, + url + ); return exitcode::UNAVAILABLE; } diff --git a/src/top/cmd.rs b/src/top/cmd.rs index 16aa38126548a..c36d8344840fe 100644 --- a/src/top/cmd.rs +++ b/src/top/cmd.rs @@ -3,7 +3,6 @@ use std::time::Duration; use chrono::Local; use futures_util::future::join_all; use tokio::sync::oneshot; -use url::Url; use vector_api_client::{connect_subscription_client, Client}; use super::{ @@ -31,17 +30,28 @@ pub async fn cmd(opts: &super::Opts) -> exitcode::ExitCode { // Use the provided URL as the Vector GraphQL API server, or default to the local port // provided by the API config. This will work despite `api` and `api-client` being distinct // features; the config is available even if `api` is disabled - let url = opts.url.clone().unwrap_or_else(|| { - let addr = config::api::default_address().unwrap(); - Url::parse(&format!("http://{}/graphql", addr)) - .expect("Couldn't parse default API URL. Please report this.") - }); + let url = opts + .url + .clone() + .unwrap_or_else(config::api::default_graphql_url); // Create a new API client for connecting to the local/remote Vector instance. - let client = match Client::new_with_healthcheck(url.clone()).await { - Some(client) => client, - None => return exitcode::UNAVAILABLE, - }; + let client = Client::new(url.clone()); + if client.healthcheck().await.is_err() { + eprintln!( + indoc::indoc! {" + Vector API server isn't reachable ({}). + + Have you enabled the API? + + To enable the API, add the following to your `vector.toml` config file: + + [api] + enabled = true"}, + url + ); + return exitcode::UNAVAILABLE; + } // Create a channel for updating state via event messages let (tx, rx) = tokio::sync::mpsc::channel(20); From 6bd8f7ad5327ef13c9df0ca8e3ec38c6d5fae928 Mon Sep 17 00:00:00 2001 From: Will Wang Date: Mon, 31 Jul 2023 17:19:13 -0400 Subject: [PATCH 05/16] Refactor tap pattern creation --- src/tap/cmd.rs | 14 +------------- src/tap/mod.rs | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/src/tap/cmd.rs b/src/tap/cmd.rs index 9434c1358242d..ac0ed1ca48ecc 100644 --- a/src/tap/cmd.rs +++ b/src/tap/cmd.rs @@ -57,19 +57,7 @@ pub(crate) async fn cmd(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode }) .expect("Couldn't build WebSocket URL. Please report."); - // If no patterns are provided, tap all components' outputs - let outputs_patterns = if opts.component_id_patterns.is_empty() - && opts.outputs_of.is_empty() - && opts.inputs_of.is_empty() - { - vec!["*".to_string()] - } else { - opts.outputs_of - .iter() - .cloned() - .chain(opts.component_id_patterns.iter().cloned()) - .collect() - }; + let outputs_patterns = opts.outputs_patterns(); let formatter = EventFormatter::new(opts.meta, opts.format); diff --git a/src/tap/mod.rs b/src/tap/mod.rs index 792a0c430032c..d44b3aba072d1 100644 --- a/src/tap/mod.rs +++ b/src/tap/mod.rs @@ -48,3 +48,23 @@ pub struct Opts { #[arg(short, long)] no_reconnect: bool, } + +impl Opts { + /// Component ID patterns to tap + /// + /// If no patterns are provided, tap all components' outputs + pub fn outputs_patterns(&self) -> Vec { + if self.component_id_patterns.is_empty() + && self.outputs_of.is_empty() + && self.inputs_of.is_empty() + { + vec!["*".to_string()] + } else { + self.outputs_of + .iter() + .cloned() + .chain(self.component_id_patterns.iter().cloned()) + .collect() + } + } +} From f538211d179a231b5e99886e4da3f760f0d3eac4 Mon Sep 17 00:00:00 2001 From: Will Wang Date: Mon, 31 Jul 2023 17:23:06 -0400 Subject: [PATCH 06/16] Expose RECONNECT_DELAY --- src/tap/cmd.rs | 2 +- src/top/cmd.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tap/cmd.rs b/src/tap/cmd.rs index ac0ed1ca48ecc..99b297d29d4b5 100644 --- a/src/tap/cmd.rs +++ b/src/tap/cmd.rs @@ -18,7 +18,7 @@ use crate::{ }; /// Delay (in milliseconds) before attempting to reconnect to the Vector API -const RECONNECT_DELAY: u64 = 5000; +pub const RECONNECT_DELAY: u64 = 5000; /// CLI command func for issuing 'tap' queries, and communicating with a local/remote /// Vector API server via HTTP/WebSockets. diff --git a/src/top/cmd.rs b/src/top/cmd.rs index c36d8344840fe..cf686194a204c 100644 --- a/src/top/cmd.rs +++ b/src/top/cmd.rs @@ -13,7 +13,7 @@ use super::{ use crate::config; /// Delay (in milliseconds) before attempting to reconnect to the Vector API -const RECONNECT_DELAY: u64 = 5000; +pub const RECONNECT_DELAY: u64 = 5000; /// CLI command func for displaying Vector components, and communicating with a local/remote /// Vector API server via HTTP/WebSockets From 5c5ba0b9e8caae4829d60d57863c7ce65bd9a6fc Mon Sep 17 00:00:00 2001 From: Will Wang Date: Mon, 31 Jul 2023 17:38:04 -0400 Subject: [PATCH 07/16] Refactor tap to one exportable function --- src/tap/cmd.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/tap/cmd.rs b/src/tap/cmd.rs index 99b297d29d4b5..24e98db29f86c 100644 --- a/src/tap/cmd.rs +++ b/src/tap/cmd.rs @@ -18,11 +18,11 @@ use crate::{ }; /// Delay (in milliseconds) before attempting to reconnect to the Vector API -pub const RECONNECT_DELAY: u64 = 5000; +const RECONNECT_DELAY: u64 = 5000; /// CLI command func for issuing 'tap' queries, and communicating with a local/remote /// Vector API server via HTTP/WebSockets. -pub(crate) async fn cmd(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode::ExitCode { +pub(crate) async fn cmd(opts: &super::Opts, signal_rx: SignalRx) -> exitcode::ExitCode { // Use the provided URL as the Vector GraphQL API server, or default to the local port // provided by the API config. This will work despite `api` and `api-client` being distinct // features; the config is available even if `api` is disabled. @@ -57,15 +57,20 @@ pub(crate) async fn cmd(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode }) .expect("Couldn't build WebSocket URL. Please report."); - let outputs_patterns = opts.outputs_patterns(); + tap(opts, url, signal_rx).await; + + exitcode::OK +} +pub async fn tap(opts: &super::Opts, subscription_url: Url, mut signal_rx: SignalRx) { let formatter = EventFormatter::new(opts.meta, opts.format); + let outputs_patterns = opts.outputs_patterns(); loop { tokio::select! { biased; Ok(SignalTo::Shutdown | SignalTo::Quit) = signal_rx.recv() => break, - status = run(url.clone(), opts, outputs_patterns.clone(), formatter.clone()) => { + status = run(subscription_url.clone(), opts, outputs_patterns.clone(), formatter.clone()) => { if status == exitcode::UNAVAILABLE || status == exitcode::TEMPFAIL && !opts.no_reconnect { #[allow(clippy::print_stderr)] { @@ -78,11 +83,9 @@ pub(crate) async fn cmd(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode } } } - - exitcode::OK } -pub async fn run( +async fn run( url: Url, opts: &super::Opts, outputs_patterns: Vec, @@ -142,7 +145,7 @@ pub async fn run( } #[derive(Clone)] -pub struct EventFormatter { +struct EventFormatter { meta: bool, format: TapEncodingFormat, component_id_label: ColoredString, From 05bc94e06b2cdf748c27567343a23168883253ae Mon Sep 17 00:00:00 2001 From: Will Wang Date: Mon, 31 Jul 2023 17:41:55 -0400 Subject: [PATCH 08/16] Add url() method to tap Opts --- src/tap/cmd.rs | 13 ++----------- src/tap/mod.rs | 8 ++++++++ 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/tap/cmd.rs b/src/tap/cmd.rs index 24e98db29f86c..55db8f5596f34 100644 --- a/src/tap/cmd.rs +++ b/src/tap/cmd.rs @@ -12,10 +12,7 @@ use vector_api_client::{ Client, }; -use crate::{ - config, - signal::{SignalRx, SignalTo}, -}; +use crate::signal::{SignalRx, SignalTo}; /// Delay (in milliseconds) before attempting to reconnect to the Vector API const RECONNECT_DELAY: u64 = 5000; @@ -23,13 +20,7 @@ const RECONNECT_DELAY: u64 = 5000; /// CLI command func for issuing 'tap' queries, and communicating with a local/remote /// Vector API server via HTTP/WebSockets. pub(crate) async fn cmd(opts: &super::Opts, signal_rx: SignalRx) -> exitcode::ExitCode { - // Use the provided URL as the Vector GraphQL API server, or default to the local port - // provided by the API config. This will work despite `api` and `api-client` being distinct - // features; the config is available even if `api` is disabled. - let mut url = opts - .url - .clone() - .unwrap_or_else(config::api::default_graphql_url); + let mut url = opts.url(); // Return early with instructions for enabling the API if the endpoint isn't reachable // via a healthcheck. diff --git a/src/tap/mod.rs b/src/tap/mod.rs index d44b3aba072d1..2c6c38b4493f0 100644 --- a/src/tap/mod.rs +++ b/src/tap/mod.rs @@ -5,6 +5,8 @@ pub(crate) use cmd::cmd; use url::Url; use vector_api_client::gql::TapEncodingFormat; +use crate::config::api::default_graphql_url; + #[derive(Parser, Debug, Clone)] #[command(rename_all = "kebab-case")] pub struct Opts { @@ -67,4 +69,10 @@ impl Opts { .collect() } } + + /// Use the provided URL as the Vector GraphQL API server, or default to the local port + /// provided by the API config. + pub fn url(&self) -> Url { + self.url.clone().unwrap_or_else(default_graphql_url) + } } From ac0ff4cf18bdbec0ba4406cf73423ab6be47b102 Mon Sep 17 00:00:00 2001 From: Will Wang Date: Tue, 1 Aug 2023 07:50:42 -0400 Subject: [PATCH 09/16] Refactor core top logic into function --- src/top/cmd.rs | 133 +++++++++++++++++++++++++------------------------ src/top/mod.rs | 14 +++++- 2 files changed, 81 insertions(+), 66 deletions(-) diff --git a/src/top/cmd.rs b/src/top/cmd.rs index cf686194a204c..deb042be7fd32 100644 --- a/src/top/cmd.rs +++ b/src/top/cmd.rs @@ -2,7 +2,8 @@ use std::time::Duration; use chrono::Local; use futures_util::future::join_all; -use tokio::sync::oneshot; +use tokio::sync::{mpsc, oneshot}; +use url::Url; use vector_api_client::{connect_subscription_client, Client}; use super::{ @@ -10,10 +11,9 @@ use super::{ metrics, state::{self, ConnectionStatus, EventType}, }; -use crate::config; /// Delay (in milliseconds) before attempting to reconnect to the Vector API -pub const RECONNECT_DELAY: u64 = 5000; +const RECONNECT_DELAY: u64 = 5000; /// CLI command func for displaying Vector components, and communicating with a local/remote /// Vector API server via HTTP/WebSockets @@ -27,13 +27,7 @@ pub async fn cmd(opts: &super::Opts) -> exitcode::ExitCode { return exitcode::IOERR; } - // Use the provided URL as the Vector GraphQL API server, or default to the local port - // provided by the API config. This will work despite `api` and `api-client` being distinct - // features; the config is available even if `api` is disabled - let url = opts - .url - .clone() - .unwrap_or_else(config::api::default_graphql_url); + let url = opts.url(); // Create a new API client for connecting to the local/remote Vector instance. let client = Client::new(url.clone()); @@ -53,10 +47,6 @@ pub async fn cmd(opts: &super::Opts) -> exitcode::ExitCode { return exitcode::UNAVAILABLE; } - // Create a channel for updating state via event messages - let (tx, rx) = tokio::sync::mpsc::channel(20); - let state_rx = state::updater(rx).await; - // Change the HTTP schema to WebSockets let mut ws_url = url.clone(); ws_url @@ -66,60 +56,75 @@ pub async fn cmd(opts: &super::Opts) -> exitcode::ExitCode { }) .expect("Couldn't build WebSocket URL. Please report."); - let opts_clone = opts.clone(); - let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); - // This task handles reconnecting the subscription client and all - // subscriptions in the case of a web socket disconnect - let connection = tokio::spawn(async move { - loop { - // Initialize state. On future reconnects, we re-initialize state in - // order to accurately capture added, removed, and edited - // components. - let state = match metrics::init_components(&client).await { - Ok(state) => state, - Err(_) => { - tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await; - continue; - } - }; - _ = tx.send(EventType::InitializeState(state)).await; - - let subscription_client = match connect_subscription_client(ws_url.clone()).await { - Ok(c) => c, - Err(_) => { - tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await; - continue; - } - }; - - // Subscribe to updated metrics - let finished = - metrics::subscribe(subscription_client, tx.clone(), opts_clone.interval as i64); - - _ = tx - .send(EventType::ConnectionUpdated(ConnectionStatus::Connected( - Local::now(), - ))) - .await; - // Tasks spawned in metrics::subscribe finish when the subscription - // streams have completed. Currently, subscription streams only - // complete when the underlying web socket connection to the GraphQL - // server drops. - _ = join_all(finished).await; - _ = tx - .send(EventType::ConnectionUpdated( - ConnectionStatus::Disconnected(RECONNECT_DELAY), - )) - .await; - if opts_clone.no_reconnect { - _ = shutdown_tx.send(()); - break; + top(&opts, client, ws_url).await +} + +// This task handles reconnecting the subscription client and all +// subscriptions in the case of a web socket disconnect +async fn subscription( + opts: super::Opts, + client: Client, + ws_url: Url, + tx: mpsc::Sender, + shutdown_tx: oneshot::Sender<()>, +) { + loop { + // Initialize state. On future reconnects, we re-initialize state in + // order to accurately capture added, removed, and edited + // components. + let state = match metrics::init_components(&client).await { + Ok(state) => state, + Err(_) => { + tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await; + continue; + } + }; + _ = tx.send(EventType::InitializeState(state)).await; + + let subscription_client = match connect_subscription_client(ws_url.clone()).await { + Ok(c) => c, + Err(_) => { + tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await; + continue; } + }; + + // Subscribe to updated metrics + let finished = metrics::subscribe(subscription_client, tx.clone(), opts.interval as i64); + + _ = tx + .send(EventType::ConnectionUpdated(ConnectionStatus::Connected( + Local::now(), + ))) + .await; + // Tasks spawned in metrics::subscribe finish when the subscription + // streams have completed. Currently, subscription streams only + // complete when the underlying web socket connection to the GraphQL + // server drops. + _ = join_all(finished).await; + _ = tx + .send(EventType::ConnectionUpdated( + ConnectionStatus::Disconnected(RECONNECT_DELAY), + )) + .await; + if opts.no_reconnect { + _ = shutdown_tx.send(()); + break; } - }); + } +} + +pub async fn top(opts: &super::Opts, client: Client, ws_url: Url) -> exitcode::ExitCode { + // Channel for updating state via event messages + let (tx, rx) = tokio::sync::mpsc::channel(20); + let state_rx = state::updater(rx).await; + // Channel for shutdown signal + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + + let connection = tokio::spawn(subscription(opts.clone(), client, ws_url, tx, shutdown_tx)); // Initialize the dashboard - match init_dashboard(url.as_str(), opts, state_rx, shutdown_rx).await { + match init_dashboard(opts.url().as_str(), opts, state_rx, shutdown_rx).await { Ok(_) => { connection.abort(); exitcode::OK diff --git a/src/top/mod.rs b/src/top/mod.rs index 24a0e8b48c99c..f63245cd4247c 100644 --- a/src/top/mod.rs +++ b/src/top/mod.rs @@ -8,6 +8,8 @@ use clap::Parser; pub use cmd::cmd; use url::Url; +use crate::config::api::default_graphql_url; + #[derive(Parser, Debug, Clone)] #[command(rename_all = "kebab-case")] pub struct Opts { @@ -15,7 +17,7 @@ pub struct Opts { #[arg(default_value = "1000", short = 'i', long)] interval: u32, - /// Vector GraphQL API server endpoint + /// GraphQL API server endpoint #[arg(short, long)] url: Option, @@ -23,9 +25,17 @@ pub struct Opts { #[arg(short = 'H', long, default_value_t = true)] human_metrics: bool, - /// Whether to reconnect if the underlying Vector API connection drops. + /// Whether to reconnect if the underlying API connection drops. /// /// By default, top will attempt to reconnect if the connection drops. #[arg(short, long)] no_reconnect: bool, } + +impl Opts { + /// Use the provided URL as the Vector GraphQL API server, or default to the local port + /// provided by the API config. + pub fn url(&self) -> Url { + self.url.clone().unwrap_or_else(default_graphql_url) + } +} From 40086da54f974c7e7920b9b195fc2c710a8141b6 Mon Sep 17 00:00:00 2001 From: Will Wang Date: Tue, 1 Aug 2023 08:09:24 -0400 Subject: [PATCH 10/16] Refactor web socket URL creation --- src/tap/cmd.rs | 19 +++++---------- src/tap/mod.rs | 12 +++++++++ src/top/cmd.rs | 66 +++++++++++++++++++++----------------------------- src/top/mod.rs | 12 +++++++++ 4 files changed, 58 insertions(+), 51 deletions(-) diff --git a/src/tap/cmd.rs b/src/tap/cmd.rs index 55db8f5596f34..6cb7b177e2a35 100644 --- a/src/tap/cmd.rs +++ b/src/tap/cmd.rs @@ -20,8 +20,7 @@ const RECONNECT_DELAY: u64 = 5000; /// CLI command func for issuing 'tap' queries, and communicating with a local/remote /// Vector API server via HTTP/WebSockets. pub(crate) async fn cmd(opts: &super::Opts, signal_rx: SignalRx) -> exitcode::ExitCode { - let mut url = opts.url(); - + let url = opts.url(); // Return early with instructions for enabling the API if the endpoint isn't reachable // via a healthcheck. let client = Client::new(url.clone()); @@ -41,19 +40,11 @@ pub(crate) async fn cmd(opts: &super::Opts, signal_rx: SignalRx) -> exitcode::Ex return exitcode::UNAVAILABLE; } - // Change the HTTP schema to WebSockets. - url.set_scheme(match url.scheme() { - "https" => "wss", - _ => "ws", - }) - .expect("Couldn't build WebSocket URL. Please report."); - - tap(opts, url, signal_rx).await; - - exitcode::OK + tap(opts, signal_rx).await } -pub async fn tap(opts: &super::Opts, subscription_url: Url, mut signal_rx: SignalRx) { +pub async fn tap(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode::ExitCode { + let subscription_url = opts.web_socket_url(); let formatter = EventFormatter::new(opts.meta, opts.format); let outputs_patterns = opts.outputs_patterns(); @@ -74,6 +65,8 @@ pub async fn tap(opts: &super::Opts, subscription_url: Url, mut signal_rx: Signa } } } + + exitcode::OK } async fn run( diff --git a/src/tap/mod.rs b/src/tap/mod.rs index 2c6c38b4493f0..184de5ddbf165 100644 --- a/src/tap/mod.rs +++ b/src/tap/mod.rs @@ -75,4 +75,16 @@ impl Opts { pub fn url(&self) -> Url { self.url.clone().unwrap_or_else(default_graphql_url) } + + /// URL with scheme set to WebSockets + pub fn web_socket_url(&self) -> Url { + let mut url = self.url(); + url.set_scheme(match url.scheme() { + "https" => "wss", + _ => "ws", + }) + .expect("Couldn't build WebSocket URL. Please report."); + + url + } } diff --git a/src/top/cmd.rs b/src/top/cmd.rs index deb042be7fd32..db5e5dc047e51 100644 --- a/src/top/cmd.rs +++ b/src/top/cmd.rs @@ -3,7 +3,6 @@ use std::time::Duration; use chrono::Local; use futures_util::future::join_all; use tokio::sync::{mpsc, oneshot}; -use url::Url; use vector_api_client::{connect_subscription_client, Client}; use super::{ @@ -28,7 +27,6 @@ pub async fn cmd(opts: &super::Opts) -> exitcode::ExitCode { } let url = opts.url(); - // Create a new API client for connecting to the local/remote Vector instance. let client = Client::new(url.clone()); if client.healthcheck().await.is_err() { @@ -47,16 +45,33 @@ pub async fn cmd(opts: &super::Opts) -> exitcode::ExitCode { return exitcode::UNAVAILABLE; } - // Change the HTTP schema to WebSockets - let mut ws_url = url.clone(); - ws_url - .set_scheme(match url.scheme() { - "https" => "wss", - _ => "ws", - }) - .expect("Couldn't build WebSocket URL. Please report."); + top(&opts, client).await +} - top(&opts, client, ws_url).await +pub async fn top(opts: &super::Opts, client: Client) -> exitcode::ExitCode { + // Channel for updating state via event messages + let (tx, rx) = tokio::sync::mpsc::channel(20); + let state_rx = state::updater(rx).await; + // Channel for shutdown signal + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + + let connection = tokio::spawn(subscription(opts.clone(), client, tx, shutdown_tx)); + + // Initialize the dashboard + match init_dashboard(opts.url().as_str(), opts, state_rx, shutdown_rx).await { + Ok(_) => { + connection.abort(); + exitcode::OK + } + Err(err) => { + #[allow(clippy::print_stderr)] + { + eprintln!("Encountered error: {}", err); + } + connection.abort(); + exitcode::IOERR + } + } } // This task handles reconnecting the subscription client and all @@ -64,10 +79,11 @@ pub async fn cmd(opts: &super::Opts) -> exitcode::ExitCode { async fn subscription( opts: super::Opts, client: Client, - ws_url: Url, tx: mpsc::Sender, shutdown_tx: oneshot::Sender<()>, ) { + let ws_url = opts.web_socket_url(); + loop { // Initialize state. On future reconnects, we re-initialize state in // order to accurately capture added, removed, and edited @@ -113,29 +129,3 @@ async fn subscription( } } } - -pub async fn top(opts: &super::Opts, client: Client, ws_url: Url) -> exitcode::ExitCode { - // Channel for updating state via event messages - let (tx, rx) = tokio::sync::mpsc::channel(20); - let state_rx = state::updater(rx).await; - // Channel for shutdown signal - let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); - - let connection = tokio::spawn(subscription(opts.clone(), client, ws_url, tx, shutdown_tx)); - - // Initialize the dashboard - match init_dashboard(opts.url().as_str(), opts, state_rx, shutdown_rx).await { - Ok(_) => { - connection.abort(); - exitcode::OK - } - Err(err) => { - #[allow(clippy::print_stderr)] - { - eprintln!("Encountered error: {}", err); - } - connection.abort(); - exitcode::IOERR - } - } -} diff --git a/src/top/mod.rs b/src/top/mod.rs index f63245cd4247c..4ea89f5a8b6b3 100644 --- a/src/top/mod.rs +++ b/src/top/mod.rs @@ -38,4 +38,16 @@ impl Opts { pub fn url(&self) -> Url { self.url.clone().unwrap_or_else(default_graphql_url) } + + /// URL with scheme set to WebSockets + pub fn web_socket_url(&self) -> Url { + let mut url = self.url(); + url.set_scheme(match url.scheme() { + "https" => "wss", + _ => "ws", + }) + .expect("Couldn't build WebSocket URL. Please report."); + + url + } } From 4cb897ae3963926167f65f7ef7880c6f74c9fc05 Mon Sep 17 00:00:00 2001 From: Will Wang Date: Tue, 1 Aug 2023 08:13:35 -0400 Subject: [PATCH 11/16] Adjust error message --- src/tap/cmd.rs | 1 + src/top/cmd.rs | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/tap/cmd.rs b/src/tap/cmd.rs index 6cb7b177e2a35..f596190937780 100644 --- a/src/tap/cmd.rs +++ b/src/tap/cmd.rs @@ -24,6 +24,7 @@ pub(crate) async fn cmd(opts: &super::Opts, signal_rx: SignalRx) -> exitcode::Ex // Return early with instructions for enabling the API if the endpoint isn't reachable // via a healthcheck. let client = Client::new(url.clone()); + #[allow(clippy::print_stderr)] if client.healthcheck().await.is_err() { eprintln!( indoc::indoc! {" diff --git a/src/top/cmd.rs b/src/top/cmd.rs index db5e5dc047e51..d7254b6eb267b 100644 --- a/src/top/cmd.rs +++ b/src/top/cmd.rs @@ -29,6 +29,7 @@ pub async fn cmd(opts: &super::Opts) -> exitcode::ExitCode { let url = opts.url(); // Create a new API client for connecting to the local/remote Vector instance. let client = Client::new(url.clone()); + #[allow(clippy::print_stderr)] if client.healthcheck().await.is_err() { eprintln!( indoc::indoc! {" @@ -45,7 +46,7 @@ pub async fn cmd(opts: &super::Opts) -> exitcode::ExitCode { return exitcode::UNAVAILABLE; } - top(&opts, client).await + top(opts, client).await } pub async fn top(opts: &super::Opts, client: Client) -> exitcode::ExitCode { @@ -66,7 +67,7 @@ pub async fn top(opts: &super::Opts, client: Client) -> exitcode::ExitCode { Err(err) => { #[allow(clippy::print_stderr)] { - eprintln!("Encountered error: {}", err); + eprintln!("[top] Encountered shutdown error: {}", err); } connection.abort(); exitcode::IOERR From 2b5a13ad3de1d377b88d612e2129a005e9891cba Mon Sep 17 00:00:00 2001 From: Will Wang Date: Tue, 1 Aug 2023 08:19:20 -0400 Subject: [PATCH 12/16] Publicize tap/top --- src/lib.rs | 4 ++-- src/tap/mod.rs | 2 ++ src/top/mod.rs | 3 +++ 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 37d71f87922ec..d16cc17ce9a46 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -101,12 +101,12 @@ pub mod sources; pub mod stats; #[cfg(feature = "api-client")] #[allow(unreachable_pub)] -mod tap; +pub mod tap; pub mod template; pub mod test_util; #[cfg(feature = "api-client")] #[allow(unreachable_pub)] -pub(crate) mod top; +pub mod top; #[allow(unreachable_pub)] pub mod topology; pub mod trace; diff --git a/src/tap/mod.rs b/src/tap/mod.rs index 184de5ddbf165..c9dd0e6f3ca57 100644 --- a/src/tap/mod.rs +++ b/src/tap/mod.rs @@ -1,3 +1,4 @@ +//! Tap subcommand mod cmd; use clap::Parser; @@ -7,6 +8,7 @@ use vector_api_client::gql::TapEncodingFormat; use crate::config::api::default_graphql_url; +/// Tap options #[derive(Parser, Debug, Clone)] #[command(rename_all = "kebab-case")] pub struct Opts { diff --git a/src/top/mod.rs b/src/top/mod.rs index 4ea89f5a8b6b3..e0f18949dbcb4 100644 --- a/src/top/mod.rs +++ b/src/top/mod.rs @@ -1,3 +1,4 @@ +//! Top subcommand mod cmd; mod dashboard; mod events; @@ -6,10 +7,12 @@ mod state; use clap::Parser; pub use cmd::cmd; +pub use dashboard::is_tty; use url::Url; use crate::config::api::default_graphql_url; +/// Top options #[derive(Parser, Debug, Clone)] #[command(rename_all = "kebab-case")] pub struct Opts { From b93a8b4387677ca86592370e36e899b9b7cc2f0d Mon Sep 17 00:00:00 2001 From: Will Wang Date: Tue, 1 Aug 2023 11:12:58 -0400 Subject: [PATCH 13/16] Use cmd::tap --- src/tap/cmd.rs | 1 + src/tap/mod.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/src/tap/cmd.rs b/src/tap/cmd.rs index f596190937780..9a0eccc5700c1 100644 --- a/src/tap/cmd.rs +++ b/src/tap/cmd.rs @@ -44,6 +44,7 @@ pub(crate) async fn cmd(opts: &super::Opts, signal_rx: SignalRx) -> exitcode::Ex tap(opts, signal_rx).await } +/// Observe event flow from specified components pub async fn tap(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode::ExitCode { let subscription_url = opts.web_socket_url(); let formatter = EventFormatter::new(opts.meta, opts.format); diff --git a/src/tap/mod.rs b/src/tap/mod.rs index c9dd0e6f3ca57..81b4d5a8d5733 100644 --- a/src/tap/mod.rs +++ b/src/tap/mod.rs @@ -3,6 +3,7 @@ mod cmd; use clap::Parser; pub(crate) use cmd::cmd; +pub use cmd::tap; use url::Url; use vector_api_client::gql::TapEncodingFormat; From 75c4c12668b759619708e6f0afbf5ff6ef713860 Mon Sep 17 00:00:00 2001 From: Will Wang Date: Tue, 1 Aug 2023 11:16:11 -0400 Subject: [PATCH 14/16] Use cmd::top --- src/top/cmd.rs | 1 + src/top/mod.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/src/top/cmd.rs b/src/top/cmd.rs index d7254b6eb267b..7ec91cbb8499b 100644 --- a/src/top/cmd.rs +++ b/src/top/cmd.rs @@ -49,6 +49,7 @@ pub async fn cmd(opts: &super::Opts) -> exitcode::ExitCode { top(opts, client).await } +/// General monitoring pub async fn top(opts: &super::Opts, client: Client) -> exitcode::ExitCode { // Channel for updating state via event messages let (tx, rx) = tokio::sync::mpsc::channel(20); diff --git a/src/top/mod.rs b/src/top/mod.rs index e0f18949dbcb4..790d7c6025dc0 100644 --- a/src/top/mod.rs +++ b/src/top/mod.rs @@ -7,6 +7,7 @@ mod state; use clap::Parser; pub use cmd::cmd; +pub use cmd::top; pub use dashboard::is_tty; use url::Url; From 5a1ed23b47e1a6c6fadfb30da93aabb6f33cb5cb Mon Sep 17 00:00:00 2001 From: Will Wang Date: Tue, 1 Aug 2023 14:19:58 -0400 Subject: [PATCH 15/16] Allow customizing dashboard title --- src/top/cmd.rs | 14 +++++++++++--- src/top/dashboard.rs | 11 +++++++---- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/top/cmd.rs b/src/top/cmd.rs index 7ec91cbb8499b..a774ca7bcc448 100644 --- a/src/top/cmd.rs +++ b/src/top/cmd.rs @@ -46,11 +46,11 @@ pub async fn cmd(opts: &super::Opts) -> exitcode::ExitCode { return exitcode::UNAVAILABLE; } - top(opts, client).await + top(opts, client, "Vector").await } /// General monitoring -pub async fn top(opts: &super::Opts, client: Client) -> exitcode::ExitCode { +pub async fn top(opts: &super::Opts, client: Client, dashboard_title: &str) -> exitcode::ExitCode { // Channel for updating state via event messages let (tx, rx) = tokio::sync::mpsc::channel(20); let state_rx = state::updater(rx).await; @@ -60,7 +60,15 @@ pub async fn top(opts: &super::Opts, client: Client) -> exitcode::ExitCode { let connection = tokio::spawn(subscription(opts.clone(), client, tx, shutdown_tx)); // Initialize the dashboard - match init_dashboard(opts.url().as_str(), opts, state_rx, shutdown_rx).await { + match init_dashboard( + dashboard_title, + opts.url().as_str(), + opts, + state_rx, + shutdown_rx, + ) + .await + { Ok(_) => { connection.abort(); exitcode::OK diff --git a/src/top/dashboard.rs b/src/top/dashboard.rs index ccf0b49085a5c..e38505555dce0 100644 --- a/src/top/dashboard.rs +++ b/src/top/dashboard.rs @@ -143,11 +143,12 @@ struct Widgets<'a> { constraints: Vec, url_string: &'a str, opts: &'a super::Opts, + title: &'a str, } impl<'a> Widgets<'a> { /// Creates a new Widgets, containing constraints to re-use across renders. - pub fn new(url_string: &'a str, opts: &'a super::Opts) -> Self { + pub fn new(title: &'a str, url_string: &'a str, opts: &'a super::Opts) -> Self { let constraints = vec![ Constraint::Length(3), Constraint::Max(90), @@ -158,10 +159,11 @@ impl<'a> Widgets<'a> { constraints, url_string, opts, + title, } } - /// Renders a title showing 'Vector', and the URL the dashboard is currently connected to. + /// Renders a title and the URL the dashboard is currently connected to. fn title( &'a self, f: &mut Frame, @@ -181,7 +183,7 @@ impl<'a> Widgets<'a> { let text = vec![Spans::from(text)]; let block = Block::default().borders(Borders::ALL).title(Span::styled( - "Vector", + self.title, Style::default() .fg(Color::Green) .add_modifier(Modifier::BOLD), @@ -353,6 +355,7 @@ pub fn is_tty() -> bool { /// as well as entering an 'alternate screen' to overlay the console. This ensures that when /// the dashboard is exited, the user's previous terminal session can commence, unaffected. pub async fn init_dashboard<'a>( + title: &'a str, url: &'a str, opts: &'a super::Opts, mut state_rx: state::StateRx, @@ -377,7 +380,7 @@ pub async fn init_dashboard<'a>( // Clear the screen, readying it for output terminal.clear()?; - let widgets = Widgets::new(url, opts); + let widgets = Widgets::new(title, url, opts); loop { tokio::select! { From 027fee57fa4a8fcf509bfcf8fe8d1b9ff5308b46 Mon Sep 17 00:00:00 2001 From: Will Wang Date: Wed, 2 Aug 2023 11:14:56 -0400 Subject: [PATCH 16/16] Apply PR suggestion --- lib/vector-api-client/src/client.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/lib/vector-api-client/src/client.rs b/lib/vector-api-client/src/client.rs index bacaf9dfcf374..941ffa7963f27 100644 --- a/lib/vector-api-client/src/client.rs +++ b/lib/vector-api-client/src/client.rs @@ -22,10 +22,7 @@ impl Client { /// Send a health query pub async fn healthcheck(&self) -> Result<(), ()> { - match self.health_query().await { - Ok(_) => Ok(()), - Err(_) => Err(()), - } + self.health_query().await.map(|_| ()).map_err(|_| ()) } /// Issue a GraphQL query using Reqwest, serializing the response to the associated