Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(api): Refactor top and tap for library use #18129

Merged
merged 17 commits into from
Aug 2, 2023
34 changes: 7 additions & 27 deletions lib/vector-api-client/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use anyhow::Context;
use graphql_client::GraphQLQuery;
use indoc::indoc;
use url::Url;

use crate::gql::HealthQueryExt;

/// Wrapped `Result` type, that returns deserialized GraphQL response data.
pub type QueryResult<T> =
anyhow::Result<graphql_client::Response<<T as GraphQLQuery>::ResponseData>>;
Expand All @@ -19,32 +20,11 @@ impl Client {
Self { url }
}

pub async fn new_with_healthcheck(url: Url) -> Option<Self> {
#![allow(clippy::print_stderr)]

use crate::gql::HealthQueryExt;

// Create a new API client for connecting to the local/remote Vector instance.
let client = Self::new(url.clone());

// Check that the GraphQL server is reachable
match client.health_query().await {
Ok(_) => Some(client),
_ => {
eprintln!(
indoc! {"
Vector API server isn't reachable ({}).

Have you enabled the API?

To enable the API, add the following to your `vector.toml` config file:

[api]
enabled = true"},
url
);
None
}
/// Send a health query
pub async fn healthcheck(&self) -> Result<(), ()> {
match self.health_query().await {
Ok(_) => Ok(()),
Err(_) => Err(()),
}
wbew marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down
8 changes: 8 additions & 0 deletions src/config/api.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::net::{Ipv4Addr, SocketAddr};

use url::Url;
use vector_config::configurable_component;

/// API options.
Expand Down Expand Up @@ -41,6 +42,13 @@ pub fn default_address() -> Option<SocketAddr> {
Some(SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 8686))
}

/// Default GraphQL API address
pub fn default_graphql_url() -> Url {
let addr = default_address().unwrap();
Url::parse(&format!("http://{}/graphql", addr))
.expect("Couldn't parse default API URL. Please report this.")
}

const fn default_playground() -> bool {
true
}
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,12 @@ pub mod sources;
pub mod stats;
#[cfg(feature = "api-client")]
#[allow(unreachable_pub)]
mod tap;
pub mod tap;
pub mod template;
pub mod test_util;
#[cfg(feature = "api-client")]
#[allow(unreachable_pub)]
pub(crate) mod top;
pub mod top;
#[allow(unreachable_pub)]
pub mod topology;
pub mod trace;
Expand Down
63 changes: 26 additions & 37 deletions src/tap/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,60 +12,49 @@ use vector_api_client::{
Client,
};

use crate::{
config,
signal::{SignalRx, SignalTo},
};
use crate::signal::{SignalRx, SignalTo};

/// Delay (in milliseconds) before attempting to reconnect to the Vector API
const RECONNECT_DELAY: u64 = 5000;

/// CLI command func for issuing 'tap' queries, and communicating with a local/remote
/// Vector API server via HTTP/WebSockets.
pub(crate) async fn cmd(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode::ExitCode {
// Use the provided URL as the Vector GraphQL API server, or default to the local port
// provided by the API config. This will work despite `api` and `api-client` being distinct
// features; the config is available even if `api` is disabled.
let mut url = opts.url.clone().unwrap_or_else(|| {
let addr = config::api::default_address().unwrap();
Url::parse(&format!("http://{}/graphql", addr))
.expect("Couldn't parse default API URL. Please report this.")
});

pub(crate) async fn cmd(opts: &super::Opts, signal_rx: SignalRx) -> exitcode::ExitCode {
let url = opts.url();
// Return early with instructions for enabling the API if the endpoint isn't reachable
// via a healthcheck.
if Client::new_with_healthcheck(url.clone()).await.is_none() {
let client = Client::new(url.clone());
#[allow(clippy::print_stderr)]
if client.healthcheck().await.is_err() {
eprintln!(
indoc::indoc! {"
Vector API server isn't reachable ({}).

Have you enabled the API?

To enable the API, add the following to your `vector.toml` config file:

[api]
enabled = true"},
url
);
return exitcode::UNAVAILABLE;
}

// Change the HTTP schema to WebSockets.
url.set_scheme(match url.scheme() {
"https" => "wss",
_ => "ws",
})
.expect("Couldn't build WebSocket URL. Please report.");

// If no patterns are provided, tap all components' outputs
let outputs_patterns = if opts.component_id_patterns.is_empty()
&& opts.outputs_of.is_empty()
&& opts.inputs_of.is_empty()
{
vec!["*".to_string()]
} else {
opts.outputs_of
.iter()
.cloned()
.chain(opts.component_id_patterns.iter().cloned())
.collect()
};
tap(opts, signal_rx).await
}

/// Observe event flow from specified components
pub async fn tap(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode::ExitCode {
let subscription_url = opts.web_socket_url();
let formatter = EventFormatter::new(opts.meta, opts.format);
let outputs_patterns = opts.outputs_patterns();

loop {
tokio::select! {
biased;
Ok(SignalTo::Shutdown | SignalTo::Quit) = signal_rx.recv() => break,
status = run(url.clone(), opts, outputs_patterns.clone(), formatter.clone()) => {
status = run(subscription_url.clone(), opts, outputs_patterns.clone(), formatter.clone()) => {
if status == exitcode::UNAVAILABLE || status == exitcode::TEMPFAIL && !opts.no_reconnect {
#[allow(clippy::print_stderr)]
{
Expand Down Expand Up @@ -93,7 +82,7 @@ async fn run(
Err(e) => {
#[allow(clippy::print_stderr)]
{
eprintln!("[tap] Couldn't connect to Vector API via WebSockets: {}", e);
eprintln!("[tap] Couldn't connect to API via WebSockets: {}", e);
}
return exitcode::UNAVAILABLE;
}
Expand Down
47 changes: 45 additions & 2 deletions src/tap/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
//! Tap subcommand
mod cmd;

use clap::Parser;
pub(crate) use cmd::cmd;
pub use cmd::tap;
use url::Url;
use vector_api_client::gql::TapEncodingFormat;

use crate::config::api::default_graphql_url;

/// Tap options
#[derive(Parser, Debug, Clone)]
#[command(rename_all = "kebab-case")]
pub struct Opts {
/// Interval to sample logs at, in milliseconds
#[arg(default_value = "500", short = 'i', long)]
interval: u32,

/// Vector GraphQL API server endpoint
/// GraphQL API server endpoint
#[arg(short, long)]
url: Option<Url>,

Expand Down Expand Up @@ -44,7 +49,45 @@ pub struct Opts {
#[arg(short, long)]
meta: bool,

/// Whether to reconnect if the underlying Vector API connection drops. By default, tap will attempt to reconnect if the connection drops.
/// Whether to reconnect if the underlying API connection drops. By default, tap will attempt to reconnect if the connection drops.
#[arg(short, long)]
no_reconnect: bool,
}

impl Opts {
/// Component ID patterns to tap
///
/// If no patterns are provided, tap all components' outputs
pub fn outputs_patterns(&self) -> Vec<String> {
if self.component_id_patterns.is_empty()
&& self.outputs_of.is_empty()
&& self.inputs_of.is_empty()
{
vec!["*".to_string()]
} else {
self.outputs_of
.iter()
.cloned()
.chain(self.component_id_patterns.iter().cloned())
.collect()
}
}

/// Use the provided URL as the Vector GraphQL API server, or default to the local port
/// provided by the API config.
pub fn url(&self) -> Url {
self.url.clone().unwrap_or_else(default_graphql_url)
}

/// URL with scheme set to WebSockets
pub fn web_socket_url(&self) -> Url {
let mut url = self.url();
url.set_scheme(match url.scheme() {
"https" => "wss",
_ => "ws",
})
.expect("Couldn't build WebSocket URL. Please report.");

url
}
}
Loading