Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#11185: Allow specifying a KMS key and tags for newly created AWS CloudWatch log groups. #22274

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ aws-sdk-sqs = { version = "1.3.0", default-features = false, features = ["behavi
aws-sdk-sns = { version = "1.6.0", default-features = false, features = ["behavior-version-latest", "rt-tokio"], optional = true }
aws-sdk-cloudwatch = { version = "1.3.0", default-features = false, features = ["behavior-version-latest", "rt-tokio"], optional = true }
aws-sdk-cloudwatchlogs = { version = "1.3.0", default-features = false, features = ["behavior-version-latest", "rt-tokio"], optional = true }
aws-sdk-kms = { version = "1.3.0", default-features = false, features = ["behavior-version-latest", "rt-tokio"], optional = true }
aws-sdk-elasticsearch = { version = "1.3.0", default-features = false, features = ["behavior-version-latest", "rt-tokio"], optional = true }
aws-sdk-firehose = { version = "1.3.0", default-features = false, features = ["behavior-version-latest", "rt-tokio"], optional = true }
aws-sdk-kinesis = { version = "1.3.0", default-features = false, features = ["behavior-version-latest", "rt-tokio"], optional = true }
Expand Down Expand Up @@ -757,7 +758,7 @@ sinks-metrics = [

sinks-amqp = ["lapin"]
sinks-appsignal = []
sinks-aws_cloudwatch_logs = ["aws-core", "dep:aws-sdk-cloudwatchlogs"]
sinks-aws_cloudwatch_logs = ["aws-core", "dep:aws-sdk-cloudwatchlogs", "dep:aws-sdk-kms"]
sinks-aws_cloudwatch_metrics = ["aws-core", "dep:aws-sdk-cloudwatch"]
sinks-aws_kinesis_firehose = ["aws-core", "dep:aws-sdk-firehose"]
sinks-aws_kinesis_streams = ["aws-core", "dep:aws-sdk-kinesis"]
Expand Down
1 change: 1 addition & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ aws-sdk-cloudwatch,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust
aws-sdk-cloudwatchlogs,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
aws-sdk-firehose,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
aws-sdk-kinesis,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
aws-sdk-kms,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
aws-sdk-s3,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
aws-sdk-secretsmanager,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
aws-sdk-sns,https://github.com/awslabs/aws-sdk-rust,Apache-2.0,"AWS Rust SDK Team <[email protected]>, Russell Cohen <[email protected]>"
Expand Down
3 changes: 3 additions & 0 deletions changelog.d/11185.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Allows users to specify a KMS key and tags for newly created AWS CloudWatch log groups.

authors: johannesfloriangeiger
2 changes: 1 addition & 1 deletion scripts/integration/aws/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ services:
mock-localstack:
image: docker.io/localstack/localstack:3
environment:
- SERVICES=kinesis,s3,cloudwatch,es,firehose,sqs,sns,logs
- SERVICES=kinesis,s3,cloudwatch,es,firehose,kms,sqs,sns,logs
mock-ecs:
image: docker.io/amazon/amazon-ecs-local-container-endpoints:latest
volumes:
Expand Down
1 change: 1 addition & 0 deletions scripts/integration/aws/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ env:
ECS_ADDRESS: http://mock-ecs
ELASTICSEARCH_ADDRESS: http://mock-localstack:4566
KINESIS_ADDRESS: http://mock-localstack:4566
KMS_ADDRESS: http://mock-localstack:4566
S3_ADDRESS: http://mock-localstack:4566
SQS_ADDRESS: http://mock-localstack:4566
SNS_ADDRESS: http://mock-localstack:4566
Expand Down
11 changes: 11 additions & 0 deletions src/sinks/aws_cloudwatch_logs/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use aws_sdk_cloudwatchlogs::Client as CloudwatchLogsClient;
use futures::FutureExt;
use serde::{de, Deserialize, Deserializer};
use std::collections::HashMap;
use tower::ServiceBuilder;
use vector_lib::codecs::JsonSerializerConfig;
use vector_lib::configurable::configurable_component;
Expand Down Expand Up @@ -164,6 +165,14 @@ pub struct CloudwatchLogsSinkConfig {
skip_serializing_if = "crate::serde::is_default"
)]
pub acknowledgements: AcknowledgementsConfig,

#[configurable(derived)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @johannesfloriangeiger! Please add rustdocs and then make generate-components-docs.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

#[serde(default)]
pub kms_key: Option<String>,

#[configurable(derived)]
#[serde(default)]
pub tags: Option<HashMap<String, String>>,
}

impl CloudwatchLogsSinkConfig {
Expand Down Expand Up @@ -248,6 +257,8 @@ fn default_config(encoding: EncodingConfig) -> CloudwatchLogsSinkConfig {
assume_role: Default::default(),
auth: Default::default(),
acknowledgements: Default::default(),
kms_key: Default::default(),
tags: Default::default(),
}
}

Expand Down
135 changes: 134 additions & 1 deletion src/sinks/aws_cloudwatch_logs/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use std::collections::HashMap;
use std::convert::TryFrom;

use aws_config::Region;
use aws_sdk_cloudwatchlogs::Client as CloudwatchLogsClient;
use aws_sdk_kms::Client as KMSClient;
use chrono::Duration;
use futures::{stream, StreamExt};
use similar_asserts::assert_eq;
use vector_lib::codecs::TextSerializerConfig;
use vector_lib::lookup;

use super::*;
use crate::aws::create_client;
use crate::aws::{create_client, ClientBuilder};
use crate::aws::{AwsAuthentication, RegionOrEndpoint};
use crate::sinks::aws_cloudwatch_logs::config::CloudwatchLogsClientBuilder;
use crate::{
Expand All @@ -29,6 +31,20 @@ fn cloudwatch_address() -> String {
std::env::var("CLOUDWATCH_ADDRESS").unwrap_or_else(|_| "http://localhost:4566".into())
}

fn kms_address() -> String {
std::env::var("KMS_ADDRESS").unwrap_or_else(|_| "http://localhost:4566".into())
}

struct KMSClientBuilder;

impl ClientBuilder for KMSClientBuilder {
type Client = aws_sdk_kms::client::Client;

fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
aws_sdk_kms::client::Client::new(config)
}
}

#[tokio::test]
async fn cloudwatch_insert_log_event() {
trace_init();
Expand All @@ -51,6 +67,8 @@ async fn cloudwatch_insert_log_event() {
assume_role: None,
auth: Default::default(),
acknowledgements: Default::default(),
kms_key: None,
tags: None,
};

let (sink, _) = config.build(SinkContext::default()).await.unwrap();
Expand Down Expand Up @@ -102,6 +120,8 @@ async fn cloudwatch_insert_log_events_sorted() {
assume_role: None,
auth: Default::default(),
acknowledgements: Default::default(),
kms_key: None,
tags: None,
};

let (sink, _) = config.build(SinkContext::default()).await.unwrap();
Expand Down Expand Up @@ -178,6 +198,8 @@ async fn cloudwatch_insert_out_of_range_timestamp() {
assume_role: None,
auth: Default::default(),
acknowledgements: Default::default(),
kms_key: None,
tags: None,
};

let (sink, _) = config.build(SinkContext::default()).await.unwrap();
Expand Down Expand Up @@ -255,6 +277,8 @@ async fn cloudwatch_dynamic_group_and_stream_creation() {
assume_role: None,
auth: Default::default(),
acknowledgements: Default::default(),
kms_key: None,
tags: None,
};

let (sink, _) = config.build(SinkContext::default()).await.unwrap();
Expand Down Expand Up @@ -284,6 +308,90 @@ async fn cloudwatch_dynamic_group_and_stream_creation() {
assert_eq!(output_lines.sort(), input_lines.sort());
}

#[tokio::test]
async fn cloudwatch_dynamic_group_and_stream_creation_with_kms_key_and_tags() {
trace_init();

let stream_name = gen_name();
let group_name = gen_name();

let config = CloudwatchLogsSinkConfig {
stream_name: Template::try_from(stream_name.as_str()).unwrap(),
group_name: Template::try_from(group_name.as_str()).unwrap(),
region: RegionOrEndpoint::with_both("us-east-1", cloudwatch_address().as_str()),
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
tls: Default::default(),
assume_role: None,
auth: Default::default(),
acknowledgements: Default::default(),
kms_key: Some(
create_kms_client_test()
.await
.create_key()
.send()
.await
.unwrap()
.key_metadata()
.unwrap()
.key_id()
.parse()
.unwrap(),
),
tags: Some(HashMap::from_iter(vec![(
"key".to_string(),
"value".to_string(),
)])),
};

let (sink, _) = config.build(SinkContext::default()).await.unwrap();

let timestamp = chrono::Utc::now();

let (mut input_lines, events) = random_lines_with_stream(100, 11, None);
run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await;

let response = create_client_test()
.await
.get_log_events()
.log_stream_name(stream_name)
.log_group_name(group_name.clone())
.start_time(timestamp.timestamp_millis())
.send()
.await
.unwrap();

let events = response.events.unwrap();

let mut output_lines = events
.into_iter()
.map(|e| e.message.unwrap())
.collect::<Vec<_>>();

assert_eq!(output_lines.sort(), input_lines.sort());

let log_group = create_client_test()
.await
.describe_log_groups()
.log_group_name_pattern(group_name.clone())
.limit(1)
.send()
.await
.unwrap()
.log_groups()
.first()
.unwrap()
.clone();

let kms_key = log_group.kms_key_id().unwrap();
assert_eq!(kms_key, config.kms_key.unwrap());
}

#[tokio::test]
async fn cloudwatch_insert_log_event_batched() {
trace_init();
Expand Down Expand Up @@ -311,6 +419,8 @@ async fn cloudwatch_insert_log_event_batched() {
assume_role: None,
auth: Default::default(),
acknowledgements: Default::default(),
kms_key: None,
tags: None,
};

let (sink, _) = config.build(SinkContext::default()).await.unwrap();
Expand Down Expand Up @@ -362,6 +472,8 @@ async fn cloudwatch_insert_log_event_partitioned() {
assume_role: None,
auth: Default::default(),
acknowledgements: Default::default(),
kms_key: None,
tags: None,
};

let (sink, _) = config.build(SinkContext::default()).await.unwrap();
Expand Down Expand Up @@ -455,6 +567,8 @@ async fn cloudwatch_healthcheck() {
assume_role: None,
auth: Default::default(),
acknowledgements: Default::default(),
kms_key: None,
tags: None,
};

let client = config.create_client(&ProxyConfig::default()).await.unwrap();
Expand All @@ -480,6 +594,25 @@ async fn create_client_test() -> CloudwatchLogsClient {
.unwrap()
}

async fn create_kms_client_test() -> KMSClient {
let auth = AwsAuthentication::test_auth();
let region = Some(Region::new("us-east-1"));
let endpoint = Some(kms_address());
let proxy = ProxyConfig::default();

create_client::<KMSClientBuilder>(
&KMSClientBuilder {},
&auth,
region,
endpoint,
&proxy,
None,
None,
)
.await
.unwrap()
}

async fn ensure_group() {
let client = create_client_test().await;
_ = client
Expand Down
22 changes: 16 additions & 6 deletions src/sinks/aws_cloudwatch_logs/request.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
use std::{
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};

use aws_sdk_cloudwatchlogs::{
operation::{
create_log_group::CreateLogGroupError,
Expand All @@ -19,6 +13,12 @@ use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkErro
use futures::{future::BoxFuture, FutureExt};
use http::{header::HeaderName, HeaderValue};
use indexmap::IndexMap;
use std::collections::HashMap;
use std::{
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::oneshot;

use crate::sinks::aws_cloudwatch_logs::config::Retention;
Expand All @@ -40,6 +40,8 @@ struct Client {
group_name: String,
headers: IndexMap<HeaderName, HeaderValue>,
retention_days: u32,
kms_key: Option<String>,
tags: Option<HashMap<String, String>>,
}

type ClientResult<T, E> = BoxFuture<'static, Result<T, SdkError<E, HttpResponse>>>;
Expand All @@ -63,6 +65,8 @@ impl CloudwatchFuture {
create_missing_group: bool,
create_missing_stream: bool,
retention: Retention,
kms_key: Option<String>,
tags: Option<HashMap<String, String>>,
mut events: Vec<Vec<InputLogEvent>>,
token: Option<String>,
token_tx: oneshot::Sender<Option<String>>,
Expand All @@ -74,6 +78,8 @@ impl CloudwatchFuture {
group_name,
headers,
retention_days,
kms_key,
tags,
};

let state = if let Some(token) = token {
Expand Down Expand Up @@ -288,10 +294,14 @@ impl Client {
pub fn create_log_group(&self) -> ClientResult<(), CreateLogGroupError> {
let client = self.client.clone();
let group_name = self.group_name.clone();
let kms_key = self.kms_key.clone();
let tags = self.tags.clone();
Box::pin(async move {
client
.create_log_group()
.log_group_name(group_name)
.set_kms_key_id(kms_key)
.set_tags(tags)
.send()
.await?;
Ok(())
Expand Down
Loading