Skip to content

Commit

Permalink
enable all event types
Browse files Browse the repository at this point in the history
  • Loading branch information
fuchsnj committed Apr 7, 2023
1 parent 7dc5910 commit 43f5bc3
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 18 deletions.
7 changes: 2 additions & 5 deletions src/sinks/aws_s3/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ use super::sink::S3RequestOptions;
use crate::{
aws::{AwsAuthentication, RegionOrEndpoint},
codecs::{Encoder, EncodingConfigWithFraming, SinkType},
config::{
AcknowledgementsConfig, DataType, GenerateConfig, Input, ProxyConfig, SinkConfig,
SinkContext,
},
config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext},
sinks::{
s3_common::{
self,
Expand Down Expand Up @@ -177,7 +174,7 @@ impl SinkConfig for S3SinkConfig {
}

fn input(&self) -> Input {
Input::new(self.encoding.config().1.input_type() & DataType::Log)
Input::new(self.encoding.config().1.input_type())
}

fn acknowledgements(&self) -> &AcknowledgementsConfig {
Expand Down
7 changes: 5 additions & 2 deletions src/sources/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use snafu::Snafu;
use tokio_util::io::StreamReader;
use value::{kind::Collection, Kind};
use vector_config::configurable_component;
use vector_core::config::{DataType, LegacyKey, LogNamespace};
use vector_core::config::{LegacyKey, LogNamespace};

use super::util::MultilineConfig;
use crate::codecs::DecodingConfig;
Expand Down Expand Up @@ -210,7 +210,10 @@ impl SourceConfig for AwsS3Config {
schema_definition = schema_definition.unknown_fields(Kind::bytes());
}

vec![SourceOutput::new_logs(DataType::Log, schema_definition)]
vec![SourceOutput::new_logs(
self.decoding.output_type(),
schema_definition,
)]
}

fn can_acknowledge(&self) -> bool {
Expand Down
21 changes: 10 additions & 11 deletions src/sources/aws_s3/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use crate::{
};
use lookup::{metadata_path, path, PathPrefix};
use vector_core::config::{log_schema, LegacyKey, LogNamespace};
use vector_core::event::MaybeAsLogMut;

static SUPPORTED_S3_EVENT_VERSION: Lazy<semver::VersionReq> =
Lazy::new(|| semver::VersionReq::parse("~2").unwrap());
Expand Down Expand Up @@ -554,22 +555,20 @@ impl IngestorProcess {

let events = events
.into_iter()
.flat_map(|mut event: Event| {
// only Log events are currently supported
event.try_into_log().map(|mut log| {
handle_single_event(
&mut log,
.map(|mut event: Event| {
if let Some(log_event) = event.maybe_as_log_mut() {
handle_single_log(
log_event,
log_namespace,
&s3_event,
&metadata,
timestamp,
);
events_received
.emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
log
})
}
events_received.emit(CountByteSize(1, event.estimated_json_encoded_size_of()));
event
})
.collect::<Vec<_>>();
.collect::<Vec<Event>>();
futures::stream::iter(events)
});

Expand Down Expand Up @@ -645,7 +644,7 @@ impl IngestorProcess {
}
}

fn handle_single_event(
fn handle_single_log(
log: &mut LogEvent,
log_namespace: LogNamespace,
s3_event: &S3EventRecord,
Expand Down

0 comments on commit 43f5bc3

Please sign in to comment.