diff --git a/Cargo.lock b/Cargo.lock index 918e0a3..bcae43a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1122,6 +1122,7 @@ dependencies = [ "itertools", "k8s-openapi", "kube", + "log", "once_cell", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index ed48a02..99de0c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,5 +49,6 @@ k8s-openapi = { version = ">=0.15", features = ["v1_24"] } clap = { version = ">=3.2", features = ["env", "derive"] } serde = { version = "*", default-features = false, features = ["derive"] } kube = { version = ">=0.73", default-features = false, features = ["client", "config", "rustls-tls"] } -tokio = { version = ">=1", default-features = false, features = ["net", "macros", "rt-multi-thread"] } +tokio = { version = ">=1", default-features = false, features = ["fs", "net", "macros", "rt-multi-thread"] } futures-util = { version = "0.3.21", default-features = false, features = ["alloc", "async-await", "tokio-io"] } +log = "0.4.17" diff --git a/src/cli.rs b/src/cli.rs index 4ed89a4..8fe000f 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,9 +1,25 @@ // Pinnothera's command line argument parsing components -use std::path::PathBuf; +use std::fmt::Formatter; +// Standard Library Imports +use std::path::{Path, PathBuf}; +use std::sync::Arc; -use aws_types::SdkConfig as AWSConfig; +// Third Party Imports +use aws_sdk_sns::config::Config as SNSClientConfig; +use aws_sdk_sqs::config::Config as SQSClientConfig; +use aws_types::credentials::{ + future::ProvideCredentials as ProvideAWSCredentials, Credentials as AWSCredentials, + CredentialsError as AWSCredentialsError, ProvideCredentials as AWSCredentialProvider, + SharedCredentialsProvider as SharedAWSCredentialsProvider, +}; +use aws_types::{region::Region, SdkConfig as AWSConfig}; use clap::Parser; +use easy_error::{bail, Terminator}; +use kube::Client; + +// Project-Level Imports +use crate::{EnvName, PinnConfig, CLUSTER_ENV}; // const CLI_ABOUT: &str = ""; @@ -28,6 +44,11 @@ pub(crate) struct CLIArgs { #[clap(short = 'c', long = "kube-context", value_parser)] pub(crate) kube_context: Option, + /// Name of the name of the "environment" the target + /// cluster is running in (i.e. 'dev' or 'production') + #[clap(short = 'e', long = "env-name", value_parser)] + pub(crate) env_name: Option, + // // @@ -49,12 +70,12 @@ pub(crate) struct CLIArgs { /// The Secret Key ID that pinnothera should use /// to communicate with AWS SNS/SQS services #[clap(long = "aws-access-key-id", value_parser)] - pub(crate) aws_secret_key_id: Option, + pub(crate) aws_access_key_id: Option, /// The Secret Access Key that pinnothera should use /// to communicate with AWS SNS/SQS services #[clap(long = "aws-secret-access-key", value_parser)] - pub(crate) aws_secret_access_id: Option, + pub(crate) aws_secret_access_key: Option, // @@ -83,15 +104,172 @@ pub(crate) struct CLIArgs { // } +struct CLICredentialProvider { + access_key_id: String, + secret_access_key: String, +} + +impl std::fmt::Debug for CLICredentialProvider { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "CLICredentialProvider(access_key_id: {}, secret_access_key: {},", + self.access_key_id.as_str(), + self.secret_access_key.as_str() + ) + } +} + +impl CLICredentialProvider { + async fn aws_credentials(&self) -> aws_types::credentials::Result { + Ok(AWSCredentials::new( + self.access_key_id.as_str(), + self.secret_access_key.as_str(), + None, + None, + "Pinnothera CLI arguments", + )) + } +} + +impl AWSCredentialProvider for CLICredentialProvider { + fn provide_credentials<'a>(&'a self) -> ProvideAWSCredentials<'a> + where + Self: 'a, + { + ProvideAWSCredentials::new(self.aws_credentials()) + } +} + +impl TryFrom<&CLIArgs> for CLICredentialProvider { + type Error = AWSCredentialsError; + + fn try_from(args: &CLIArgs) -> Result { + if !args.aws_access_key_id.is_some() { + Err(AWSCredentialsError::provider_error( + "Missing or empty access key id!", + )) + } else if !args.aws_secret_access_key.is_some() { + Err(AWSCredentialsError::provider_error( + "Missing or empty secret access key!", + )) + } else { + Ok(CLICredentialProvider { + access_key_id: args.aws_access_key_id.as_ref().unwrap().clone(), + secret_access_key: args.aws_secret_access_key.as_ref().unwrap().clone(), + }) + } + } +} + impl CLIArgs { - pub async fn aws_config(&self) -> AWSConfig { - // let config: AWSConfig = aws_types::SdkConfig::builder() - // .region(aws_types::region::Region::new("us-east-2")) - // .credentials_provider(LocalstackCredentialProvider::new_shared_provider()) - // .endpoint_resolver(aws_smithy_http::endpoint::Endpoint::immutable( - // http::Uri::from_static("http://aws.localstack"), - // )) - // .build(); - todo!() + // + pub async fn aws_client_configs( + &'static self, + ) -> Result<(SNSClientConfig, SQSClientConfig), Terminator> { + if self.aws_role_arn.is_some() { + bail!("Support for explicit AWS Role ARNs not yet implemented!") + } + // Infer and create an AWS `Config` from the current environment + let config: AWSConfig = aws_config::load_from_env().await; + + let (sns_config, sqs_config) = ( + aws_sdk_sns::config::Builder::from(&config), + aws_sdk_sqs::config::Builder::from(&config), + ); + + let (mut sns_config, mut sqs_config) = match &self.aws_region { + Some(region) => ( + sns_config.region(Region::new(region)), + sqs_config.region(Region::new(region)), + ), + None => (sns_config, sqs_config), + }; + + let endpoint = if let Some(url) = &self.aws_endpoint { + Some(url.as_str()) + } else if CLUSTER_ENV + .get() + .unwrap_or(&Arc::new(EnvName::Unknown)) + .is_local() + { + Some("http://aws.localstack") + } else { + None + }; + + if let Some(url) = endpoint { + sns_config.set_endpoint_resolver(Some(Arc::new( + aws_smithy_http::endpoint::Endpoint::immutable(http::Uri::from_static(url)), + ))); + sqs_config.set_endpoint_resolver(Some(Arc::new( + aws_smithy_http::endpoint::Endpoint::immutable(http::Uri::from_static(url)), + ))); + } + + if self.aws_access_key_id.is_some() & self.aws_secret_access_key.is_some() { + sns_config.set_credentials_provider(Some(SharedAWSCredentialsProvider::new( + CLICredentialProvider::try_from(self)?, + ))); + sqs_config.set_credentials_provider(Some(SharedAWSCredentialsProvider::new( + CLICredentialProvider::try_from(self)?, + ))); + } + + Ok((sns_config.build(), sqs_config.build())) + } + + // + + // + + async fn kube_config(&self) -> Result { + let options = kube::config::KubeConfigOptions { + context: self.kube_context.clone(), + cluster: None, + user: None, + }; + + let config = + kube::Config::from_custom_kubeconfig(kube::config::Kubeconfig::read()?, &options) + .await?; + + Ok(config) } + + // + + // + + pub async fn pinn_config(&mut self) -> Result<(EnvName, PinnConfig), Terminator> { + if let Some(json_path) = &self.json_file { + self.json_data = Some(tokio::fs::read_to_string(json_path).await?); + } else if let Some(yaml_path) = &self.yaml_file { + self.yaml_data = Some(tokio::fs::read_to_string(yaml_path).await?); + } + + if let Some(json_data) = &self.json_data { + return Ok(( + EnvName::from(self.env_name.clone()), + PinnConfig::from_json(json_data)?, + )); + } else if let Some(yaml_data) = &self.yaml_data { + return Ok(( + EnvName::from(self.env_name.clone()), + PinnConfig::from_yaml(yaml_data)?, + )); + } + + let client = kube::Client::try_from(self.kube_config().await?)?; + + PinnConfig::from_cluster( + client, + &self.env_name, + &self.namespace, + &self.configmap_name, + ) + .await + } + + // } diff --git a/src/main.rs b/src/main.rs index 871f5e7..c82ed2f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,32 +2,35 @@ #![allow(dead_code, unused_imports)] -// Standard Library +// Standard Library Imports use std::sync::Arc; -// Third Party +// Third Party Imports use aws_sdk_sns::Client as SNSClient; use aws_sdk_sqs::Client as SQSClient; use aws_types::SdkConfig as AWSConfig; use clap; use easy_error::{bail, Terminator}; +use log; use once_cell::sync::OnceCell; -// Crate-local -use cli::CLIArgs; -use types::{ +// Project-Level Imports +pub(crate) use cli::CLIArgs; +pub(crate) use types::{ EnvName, PinnConfig, SNSSubscriptionARN, SNSTopicARN, SQSQueueARN, SQSQueueConfig, SQSQueueName, SQSQueueURL, }; -mod cli; -mod types; +pub(crate) mod cli; +pub(crate) mod types; // -static SNS_CLIENT: OnceCell> = OnceCell::new(); -static SQS_CLIENT: OnceCell> = OnceCell::new(); -static CLUSTER_ENV: OnceCell> = OnceCell::new(); +pub(crate) static CLUSTER_ENV: OnceCell> = OnceCell::new(); +pub(crate) static SNS_CLIENT: OnceCell> = OnceCell::new(); +pub(crate) static SQS_CLIENT: OnceCell> = OnceCell::new(); +pub(crate) static PINN_CONFIG: OnceCell> = OnceCell::new(); +pub(crate) static CLI_ARGS: OnceCell> = OnceCell::new(); // @@ -193,7 +196,7 @@ async fn create_subscription>( async fn apply_queue_configuration>( queue: T, - config: SQSQueueConfig, + config: &'static SQSQueueConfig, ) -> Result<(), Terminator> { // Create a convenient place to accumulate // the task handles we're about to create @@ -203,18 +206,22 @@ async fn apply_queue_configuration>( // If the supplied queue is actually the sentinel value // "unsubscribed", just create the configured topics but // don't attempt to subscribe them to anything - config.topics.into_iter().for_each(|topic| { - tasks.push(tokio::spawn(async { create_topic(topic).await.expect("") })); + config.topics.iter().for_each(|topic| { + tasks.push(tokio::spawn(async { + create_topic(topic.to_string()).await.expect("") + })); }); } else { // Get the specified queue's URL and ARN let (_queue_url, queue_arn) = create_queue(queue).await?; // Create the queue's required subscriptions - config.topics.into_iter().for_each(|topic| { + config.topics.iter().for_each(|topic| { let task_arn = queue_arn.clone(); tasks.push(tokio::spawn(async move { - create_subscription(task_arn, topic).await.expect("") + create_subscription(task_arn, topic.to_string()) + .await + .expect("") })); }) } @@ -227,78 +234,51 @@ async fn apply_queue_configuration>( // -// +// -async fn aws_client_configs_for_env() -> (aws_sdk_sns::config::Config, aws_sdk_sqs::config::Config) -{ - // Infer and create an AWS `Config` from the current environment - let config: AWSConfig = aws_config::load_from_env().await; +#[tokio::main] +async fn main() -> Result<(), Terminator> { + // Parse and store any cli arguments that were supplied + let mut args: CLIArgs = ::parse(); - let (mut sns_config, mut sqs_config) = ( - aws_sdk_sns::config::Builder::from(&config), - aws_sdk_sqs::config::Builder::from(&config), - ); + // Get the SNS/SQS topic & queue configuration from the + // cluster (if it exists in the current namespace) + let (env_name, pinn_config) = args.pinn_config().await?; - if CLUSTER_ENV.get().unwrap().is_local() { - sns_config.set_endpoint_resolver(Some(Arc::new( - aws_smithy_http::endpoint::Endpoint::immutable(http::Uri::from_static( - "http://aws.localstack", - )), - ))); - sqs_config.set_endpoint_resolver(Some(Arc::new( - aws_smithy_http::endpoint::Endpoint::immutable(http::Uri::from_static( - "http://aws.localstack", - )), - ))); - } + println!("Applying queue configuration: {:#?}", &pinn_config); - (sns_config.build(), sqs_config.build()) -} + PINN_CONFIG.set(Arc::new(pinn_config)).unwrap(); + CLUSTER_ENV.set(Arc::new(env_name)).unwrap(); + CLI_ARGS.set(Arc::new(args)).unwrap(); -// + // Get a usable AWS configuration objects for the local environment + let (sns_config, sqs_config) = CLI_ARGS.get().unwrap().aws_client_configs().await?; -// + // Use the inferred AWS config to create SNS and SQS clients + let sns_client: SNSClient = SNSClient::from_conf(sns_config); + let sqs_client: SQSClient = SQSClient::from_conf(sqs_config); -#[tokio::main] -async fn main() -> Result<(), Terminator> { - let args = ::parse(); - - println!("{:#?}", args); - - // // Get the SNS/SQS topic & queue configuration from the - // // cluster (if it exists in the current namespace) - // let (env_name, pinn_config) = PinnConfig::from_cluster().await?; - // - // println!("Applying queue configuration: {:#?}", &pinn_config); - // - // CLUSTER_ENV.set(Arc::new(env_name)).unwrap(); - // - // // Get a usable AWS configuration objects for the local environment - // let (sns_config, sqs_config) = aws_client_configs_for_env().await; - // - // // Use the inferred AWS config to create SNS and SQS clients - // let sns_client: SNSClient = SNSClient::from_conf(sns_config); - // let sqs_client: SQSClient = SQSClient::from_conf(sqs_config); - // - // // Allow the created clients to be accessed globally safely via - // // the `OnceCell`s created as part of pinnothera's initialization - // SNS_CLIENT.set(Arc::new(sns_client)).unwrap(); - // SQS_CLIENT.set(Arc::new(sqs_client)).unwrap(); - // - // // Spawn async tasks to apply the parsed queue & topic configurations - // let tasks: Vec<_> = pinn_config - // .into_iter() - // .map(|(queue, queue_config)| { - // tokio::spawn(async move { - // apply_queue_configuration(queue, queue_config) - // .await - // .unwrap() - // }) - // }) - // .collect(); - // - // // Wait for all of the spawned tasks to finish - // futures_util::future::join_all(tasks).await; + // Allow the created clients to be accessed globally safely via + // the `OnceCell`s created as part of pinnothera's initialization + SNS_CLIENT.set(Arc::new(sns_client)).unwrap(); + SQS_CLIENT.set(Arc::new(sqs_client)).unwrap(); + + // Spawn async tasks to apply the parsed queue & topic configurations + let tasks: Vec<_> = PINN_CONFIG + .get() + .unwrap() + .iter() + .map(|(queue, queue_config)| { + tokio::spawn(async move { + apply_queue_configuration(queue, queue_config) + .await + .unwrap() + }) + }) + .collect(); + + // Wait for all of the spawned tasks to finish + futures_util::future::join_all(tasks).await; Ok(()) } diff --git a/src/types.rs b/src/types.rs index 31704ee..df5d5cd 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,12 +1,13 @@ // Pinnothera's internal structs and enums -// Standard Library +// Standard Library Imports use std::collections::BTreeMap; -// Third Party +// Third Party Imports use easy_error::Terminator; use k8s_openapi::api::core::v1::ConfigMap; -use kube::{api::Api, Client as K8sClient}; +use kube::{api::Api as K8sAPI, Client as K8sClient}; +use log; use serde::Deserialize; // @@ -112,25 +113,6 @@ impl std::ops::Deref for PinnConfig { } } -// impl> TryFrom for PinnConfig { -// type Error = Terminator; -// -// fn try_from(data: T) -> Result { -// let data: &str = data.as_ref().trim(); -// -// match data.chars().next()? { -// // If the leading character is "{" -// // we've most likely got JSON data -// '{' => PinnConfig::from_json(data), -// -// // Otherwise, assume the data is -// // either improperly formatted, or -// // actually is YAML data -// _ => PinnConfig::from_yaml(data), -// } -// } -// } - impl PinnConfig { pub fn for_unknown_env() -> Result<(EnvName, PinnConfig), Terminator> { Ok((EnvName::Unknown, Self::default())) @@ -158,20 +140,39 @@ impl PinnConfig { } } - pub async fn from_cluster() -> Result<(EnvName, PinnConfig), Terminator> { - // Infer and create a Kubernetes `Client` from the runtime environment - let client: K8sClient = K8sClient::try_default().await?; + pub async fn from_cluster>( + client: K8sClient, + env_name: &Option, + namespace: &Option, + configmap_name: &T, + ) -> Result<(EnvName, PinnConfig), Terminator> { + // Ensure the name of the target configmap is usable + let configmap_name: &str = configmap_name.as_ref(); - // Read `ConfigMap`s in the configured namespace + // Read `ConfigMap`s in the specified (or default) namespace // into the typed interface from k8s-openapi - let config_maps: Api = Api::default_namespaced(client); + let (config_maps, namespace): (K8sAPI, String) = if let Some(value) = &namespace + { + ( + K8sAPI::namespaced(client, value.as_ref()), + format!("cluster namespace '{}'", value.as_ref()), + ) + } else { + ( + K8sAPI::default_namespaced(client), + "the current cluster namespace".to_string(), + ) + }; // Use the typed interface to pull the namespace's // pinnothera configuration (if it exists) - let pinn_confmap: ConfigMap = match config_maps.get_opt("sns-sqs-config").await? { + let pinn_confmap: ConfigMap = match config_maps.get_opt(configmap_name).await? { Some(obj) => obj, None => { - println!("No pinnothera-recognized ConfigMap in current cluster namespace!"); + println!( + "No `ConfigMap` named '{}' in {}!", + configmap_name, &namespace + ); return Self::for_unknown_env(); } }; @@ -182,14 +183,18 @@ impl PinnConfig { None => BTreeMap::new(), }; - let env_name: EnvName = EnvName::from(annotations.get("app-env")); + let env_name: EnvName = match env_name { + Some(value) => EnvName::from(Some(value)), + None => EnvName::from(annotations.get("app-env")), + }; // Pull out the ConfigMap's `data` element (if it exists) let confs_map: BTreeMap = match pinn_confmap.data { Some(obj) => obj, None => { println!( - "The `sns-sqs-config` ConfigMap retrieved from the current cluster namespace has no `data` element!" + "The '{}' `ConfigMap` retrieved from {} has no `data` element!", + configmap_name, &namespace, ); return Self::for_unknown_env(); } @@ -208,7 +213,7 @@ impl PinnConfig { }; }; - println!("The `data` element in the `sns-sqs-config` ConfigMap retrieved from the current cluster namespace has no pinnothera-recognized keys!"); + println!("The `data` element in the '{}' ConfigMap retrieved from {} has no pinnothera-recognized keys!", configmap_name, &namespace, ); Self::for_unknown_env() }