Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(codecs): Add full codec support to AWS S3 source/sink #17098

Merged
merged 11 commits into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/codecs/src/decoding/framing/newline_delimited.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub struct NewlineDelimitedDecoderOptions {
/// consider setting the maximum length to a reasonably large value as a safety net. This
/// ensures that processing is not actually unbounded.
#[serde(skip_serializing_if = "vector_core::serde::skip_serializing_if_default")]
max_length: Option<usize>,
pub max_length: Option<usize>,
}

impl NewlineDelimitedDecoderOptions {
Expand Down
22 changes: 14 additions & 8 deletions src/codecs/decoding/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ use crate::{
/// messages.
#[derive(Clone)]
pub struct Decoder {
framer: Framer,
deserializer: Deserializer,
log_namespace: LogNamespace,
/// The framer being used
pub framer: Framer,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes generally look good, but I'm missing what is special about the aws_s3 source that required these to be public where they weren't before. What necessitated this change (same with newline_delimited.rs above)?

Copy link
Member Author

@fuchsnj fuchsnj Apr 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Decoder, the way S3 utilizes the decoder is a bit unique since it also allows a "multiline" config. The framing and decoding are done separately with the "multiline" code happening in the middle. So the code needed access to the individual parts.

For newline_delimited.rs, this is the first source that uses this as the default. This default was kept for backwards compatibility since it was previously hard-coded.

I could have added methods to access / modify these values instead of making them public, but since these types are simple data containers, it doesn't matter too much.

/// The deserializer being used
pub deserializer: Deserializer,
/// The `log_namespace` being used
fuchsnj marked this conversation as resolved.
Show resolved Hide resolved
pub log_namespace: LogNamespace,
}

impl Default for Decoder {
Expand Down Expand Up @@ -61,16 +64,19 @@ impl Decoder {
Error::FramingError(error)
})?;

let frame = match frame {
Some(frame) => frame,
_ => return Ok(None),
};
frame
.map(|frame| self.deserializer_parse(frame))
.transpose()
}

/// Parses a frame using the included deserializer, and handles any errors by logging.
pub fn deserializer_parse(&self, frame: Bytes) -> Result<(SmallVec<[Event; 1]>, usize), Error> {
let byte_size = frame.len();

// Parse structured events from the byte frame.
self.deserializer
.parse(frame, self.log_namespace)
.map(|events| Some((events, byte_size)))
.map(|events| (events, byte_size))
.map_err(|error| {
emit!(DecoderDeserializeError { error: &error });
Error::ParsingError(error)
Expand Down
7 changes: 2 additions & 5 deletions src/sinks/aws_s3/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ use super::sink::S3RequestOptions;
use crate::{
aws::{AwsAuthentication, RegionOrEndpoint},
codecs::{Encoder, EncodingConfigWithFraming, SinkType},
config::{
AcknowledgementsConfig, DataType, GenerateConfig, Input, ProxyConfig, SinkConfig,
SinkContext,
},
config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext},
sinks::{
s3_common::{
self,
Expand Down Expand Up @@ -177,7 +174,7 @@ impl SinkConfig for S3SinkConfig {
}

fn input(&self) -> Input {
Input::new(self.encoding.config().1.input_type() & DataType::Log)
Input::new(self.encoding.config().1.input_type())
}

fn acknowledgements(&self) -> &AcknowledgementsConfig {
Expand Down
34 changes: 30 additions & 4 deletions src/sources/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,25 @@ use std::{convert::TryInto, io::ErrorKind};

use async_compression::tokio::bufread;
use aws_sdk_s3::types::ByteStream;
use codecs::decoding::{DeserializerConfig, FramingConfig, NewlineDelimitedDecoderOptions};
use codecs::BytesDeserializerConfig;
use futures::{stream, stream::StreamExt, TryStreamExt};
use lookup::owned_value_path;
use snafu::Snafu;
use tokio_util::io::StreamReader;
use value::{kind::Collection, Kind};
use vector_config::configurable_component;
use vector_core::config::{DataType, LegacyKey, LogNamespace};
use vector_core::config::{LegacyKey, LogNamespace};

use super::util::MultilineConfig;
use crate::codecs::DecodingConfig;
use crate::config::DataType;
use crate::{
aws::{auth::AwsAuthentication, create_client, RegionOrEndpoint},
common::{s3::S3ClientBuilder, sqs::SqsClientBuilder},
config::{Output, ProxyConfig, SourceAcknowledgementsConfig, SourceConfig, SourceContext},
line_agg,
serde::bool_or_struct,
serde::{bool_or_struct, default_decoding},
tls::TlsConfig,
};

Expand Down Expand Up @@ -69,7 +72,8 @@ enum Strategy {
//
// Maybe showing defaults at all, when there are required properties, doesn't actually make sense? :thinkies:
#[configurable_component(source("aws_s3", "Collect logs from AWS S3."))]
#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Derivative)]
#[derivative(Default)]
#[serde(default, deny_unknown_fields)]
pub struct AwsS3Config {
#[serde(flatten)]
Expand Down Expand Up @@ -113,6 +117,23 @@ pub struct AwsS3Config {
#[configurable(metadata(docs::hidden))]
#[serde(default)]
log_namespace: Option<bool>,

#[configurable(derived)]
#[serde(default = "default_framing")]
#[derivative(Default(value = "default_framing()"))]
neuronull marked this conversation as resolved.
Show resolved Hide resolved
pub framing: FramingConfig,

#[configurable(derived)]
#[serde(default = "default_decoding")]
#[derivative(Default(value = "default_decoding()"))]
pub decoding: DeserializerConfig,
}

const fn default_framing() -> FramingConfig {
// This is used for backwards compatibility. It used to be the only (hardcoded) option.
FramingConfig::NewlineDelimited {
newline_delimited: NewlineDelimitedDecoderOptions { max_length: None },
}
}

impl_generate_config_from_default!(AwsS3Config);
Expand All @@ -131,7 +152,7 @@ impl SourceConfig for AwsS3Config {

match self.strategy {
Strategy::Sqs => Ok(Box::pin(
self.create_sqs_ingestor(multiline_config, &cx.proxy)
self.create_sqs_ingestor(multiline_config, &cx.proxy, log_namespace)
.await?
.run(cx, self.acknowledgements, log_namespace),
)),
Expand Down Expand Up @@ -198,6 +219,7 @@ impl AwsS3Config {
&self,
multiline: Option<line_agg::Config>,
proxy: &ProxyConfig,
log_namespace: LogNamespace,
) -> crate::Result<sqs::Ingestor> {
let region = self
.region
Expand All @@ -219,6 +241,9 @@ impl AwsS3Config {
)
.await?;

let decoder =
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build();

match self.sqs {
Some(ref sqs) => {
let sqs_client = create_client::<SqsClientBuilder>(
Expand All @@ -238,6 +263,7 @@ impl AwsS3Config {
sqs.clone(),
self.compression,
multiline,
decoder,
)
.await?;

Expand Down
Loading