Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: propagate config build error instead of panicking #18124

Merged
merged 3 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions lib/codecs/src/decoding/format/protobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ pub struct ProtobufDeserializerConfig {

impl ProtobufDeserializerConfig {
/// Build the `ProtobufDeserializer` from this configuration.
pub fn build(&self) -> ProtobufDeserializer {
// TODO return a Result instead.
ProtobufDeserializer::try_from(self).unwrap()
pub fn build(&self) -> vector_common::Result<ProtobufDeserializer> {
ProtobufDeserializer::try_from(self)
}

/// Return the type of event build by this deserializer.
Expand Down
18 changes: 10 additions & 8 deletions lib/codecs/src/decoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,16 +277,18 @@ impl From<NativeJsonDeserializerConfig> for DeserializerConfig {

impl DeserializerConfig {
/// Build the `Deserializer` from this configuration.
pub fn build(&self) -> Deserializer {
pub fn build(&self) -> vector_common::Result<Deserializer> {
match self {
DeserializerConfig::Bytes => Deserializer::Bytes(BytesDeserializerConfig.build()),
DeserializerConfig::Json(config) => Deserializer::Json(config.build()),
DeserializerConfig::Protobuf(config) => Deserializer::Protobuf(config.build()),
DeserializerConfig::Bytes => Ok(Deserializer::Bytes(BytesDeserializerConfig.build())),
DeserializerConfig::Json(config) => Ok(Deserializer::Json(config.build())),
DeserializerConfig::Protobuf(config) => Ok(Deserializer::Protobuf(config.build()?)),
#[cfg(feature = "syslog")]
DeserializerConfig::Syslog(config) => Deserializer::Syslog(config.build()),
DeserializerConfig::Native => Deserializer::Native(NativeDeserializerConfig.build()),
DeserializerConfig::NativeJson(config) => Deserializer::NativeJson(config.build()),
DeserializerConfig::Gelf(config) => Deserializer::Gelf(config.build()),
DeserializerConfig::Syslog(config) => Ok(Deserializer::Syslog(config.build())),
DeserializerConfig::Native => {
Ok(Deserializer::Native(NativeDeserializerConfig.build()))
}
DeserializerConfig::NativeJson(config) => Ok(Deserializer::NativeJson(config.build())),
DeserializerConfig::Gelf(config) => Ok(Deserializer::Gelf(config.build())),
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/codecs/decoding/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ impl DecodingConfig {
}

/// Builds a `Decoder` from the provided configuration.
pub fn build(&self) -> Decoder {
pub fn build(&self) -> vector_common::Result<Decoder> {
// Build the framer.
let framer = self.framing.build();

// Build the deserializer.
let deserializer = self.decoding.build();
let deserializer = self.decoding.build()?;

Decoder::new(framer, deserializer).with_log_namespace(self.log_namespace)
Ok(Decoder::new(framer, deserializer).with_log_namespace(self.log_namespace))
}
}
16 changes: 10 additions & 6 deletions src/components/validation/resources/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,15 @@ impl HttpResourceConfig {
codec: ResourceCodec,
output_tx: mpsc::Sender<Vec<Event>>,
task_coordinator: &TaskCoordinator<Configuring>,
) {
) -> vector_common::Result<()> {
match direction {
// We'll pull data from the sink.
ResourceDirection::Pull => {
spawn_output_http_client(self, codec, output_tx, task_coordinator)
}
ResourceDirection::Pull => Ok(spawn_output_http_client(
self,
codec,
output_tx,
task_coordinator,
)),
// The sink will push data to us.
ResourceDirection::Push => {
spawn_output_http_server(self, codec, output_tx, task_coordinator)
Expand Down Expand Up @@ -227,12 +230,12 @@ fn spawn_output_http_server(
codec: ResourceCodec,
output_tx: mpsc::Sender<Vec<Event>>,
task_coordinator: &TaskCoordinator<Configuring>,
) {
) -> vector_common::Result<()> {
// This HTTP server will wait for events to be sent by a sink, and collect them and send them on
// via an output sender. We accept/collect events until we're told to shutdown.

// First, we'll build and spawn our HTTP server.
let decoder = codec.into_decoder();
let decoder = codec.into_decoder()?;

let (_, http_server_shutdown_tx) =
spawn_http_server(task_coordinator, &config, move |request| {
Expand Down Expand Up @@ -276,6 +279,7 @@ fn spawn_output_http_server(

debug!("HTTP server external output resource completed.");
});
Ok(())
}

/// Spawns an HTTP client that pulls events by making requests to an HTTP server driven by a sink.
Expand Down
14 changes: 8 additions & 6 deletions src/components/validation/resources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,24 +96,24 @@ impl ResourceCodec {
///
/// The decoder is generated as an inverse to the input codec: if an encoding configuration was
/// given, we generate a decoder that satisfies that encoding configuration, and vice versa.
pub fn into_decoder(&self) -> Decoder {
pub fn into_decoder(&self) -> vector_common::Result<Decoder> {
let (framer, deserializer) = match self {
Self::Decoding(config) => return config.build(),
Self::Encoding(config) => (
encoder_framing_to_decoding_framer(config.config().default_stream_framing()),
serializer_config_to_deserializer(config.config()),
serializer_config_to_deserializer(config.config())?,
),
Self::EncodingWithFraming(config) => {
let (maybe_framing, serializer) = config.config();
let framing = maybe_framing.clone().unwrap_or(FramingConfig::Bytes);
(
encoder_framing_to_decoding_framer(framing),
serializer_config_to_deserializer(serializer),
serializer_config_to_deserializer(serializer)?,
)
}
};

Decoder::new(framer, deserializer)
Ok(Decoder::new(framer, deserializer))
}
}

Expand Down Expand Up @@ -178,7 +178,9 @@ fn decoder_framing_to_encoding_framer(framing: &decoding::FramingConfig) -> enco
framing_config.build()
}

fn serializer_config_to_deserializer(config: &SerializerConfig) -> decoding::Deserializer {
fn serializer_config_to_deserializer(
config: &SerializerConfig,
) -> vector_common::Result<decoding::Deserializer> {
let deserializer_config = match config {
SerializerConfig::Avro { .. } => todo!(),
SerializerConfig::Csv { .. } => todo!(),
Expand Down Expand Up @@ -311,7 +313,7 @@ impl ExternalResource {
self,
output_tx: mpsc::Sender<Vec<Event>>,
task_coordinator: &TaskCoordinator<Configuring>,
) {
) -> vector_common::Result<()> {
match self.definition {
ResourceDefinition::Http(http_config) => {
http_config.spawn_as_output(self.direction, self.codec, output_tx, task_coordinator)
Expand Down
18 changes: 9 additions & 9 deletions src/components/validation/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl Runner {
}
}

pub async fn run_validation(self) -> Result<Vec<RunnerResults>, String> {
pub async fn run_validation(self) -> Result<Vec<RunnerResults>, vector_common::Error> {
// Initialize our test environment.
initialize_test_environment();

Expand Down Expand Up @@ -251,7 +251,7 @@ impl Runner {
&self.configuration,
&input_task_coordinator,
&output_task_coordinator,
);
)?;
let input_tx = runner_input.into_sender(controlled_edges.input);
let output_rx = runner_output.into_receiver(controlled_edges.output);
debug!("External resource (if any) and controlled edges built and spawned.");
Expand Down Expand Up @@ -413,7 +413,7 @@ fn build_external_resource(
configuration: &ValidationConfiguration,
input_task_coordinator: &TaskCoordinator<Configuring>,
output_task_coordinator: &TaskCoordinator<Configuring>,
) -> (RunnerInput, RunnerOutput, Option<Encoder<encoding::Framer>>) {
) -> Result<(RunnerInput, RunnerOutput, Option<Encoder<encoding::Framer>>), vector_common::Error> {
let component_type = configuration.component_type();
let maybe_external_resource = configuration.external_resource();
let maybe_encoder = maybe_external_resource
Expand All @@ -430,15 +430,15 @@ fn build_external_resource(
maybe_external_resource.expect("a source must always have an external resource");
resource.spawn_as_input(rx, input_task_coordinator);

(
Ok((
RunnerInput::External(tx),
RunnerOutput::Controlled,
maybe_encoder,
)
))
}
ComponentType::Transform => {
// Transforms have no external resources.
(RunnerInput::Controlled, RunnerOutput::Controlled, None)
Ok((RunnerInput::Controlled, RunnerOutput::Controlled, None))
}
ComponentType::Sink => {
// As an external resource for a sink, we create a channel that the validation runner
Expand All @@ -448,13 +448,13 @@ fn build_external_resource(
let (tx, rx) = mpsc::channel(1024);
let resource =
maybe_external_resource.expect("a sink must always have an external resource");
resource.spawn_as_output(tx, output_task_coordinator);
resource.spawn_as_output(tx, output_task_coordinator)?;

(
Ok((
RunnerInput::Controlled,
RunnerOutput::External(rx),
maybe_encoder,
)
))
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/sources/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ fn default_offset_key() -> OptionalValuePath {
impl_generate_config_from_default!(AmqpSourceConfig);

impl AmqpSourceConfig {
fn decoder(&self, log_namespace: LogNamespace) -> Decoder {
fn decoder(&self, log_namespace: LogNamespace) -> vector_common::Result<Decoder> {
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build()
}
}
Expand Down Expand Up @@ -317,7 +317,8 @@ async fn receive_event(
msg: Delivery,
) -> Result<(), ()> {
let payload = Cursor::new(Bytes::copy_from_slice(&msg.data));
let mut stream = FramedRead::new(payload, config.decoder(log_namespace));
let decoder = config.decoder(log_namespace).map_err(|_e| ())?;
let mut stream = FramedRead::new(payload, decoder);

// Extract timestamp from AMQP message
let timestamp = msg
Expand Down
3 changes: 2 additions & 1 deletion src/sources/aws_kinesis_firehose/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ impl SourceConfig for AwsKinesisFirehoseConfig {
async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
let log_namespace = cx.log_namespace(self.log_namespace);
let decoder =
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build();
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
.build()?;

let acknowledgements = cx.do_acknowledgements(self.acknowledgements);

Expand Down
3 changes: 2 additions & 1 deletion src/sources/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ impl AwsS3Config {
.await?;

let decoder =
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build();
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
.build()?;

match self.sqs {
Some(ref sqs) => {
Expand Down
3 changes: 2 additions & 1 deletion src/sources/aws_sqs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ impl SourceConfig for AwsSqsConfig {

let client = self.build_client(&cx).await?;
let decoder =
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build();
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
.build()?;
let acknowledgements = cx.do_acknowledgements(self.acknowledgements);

Ok(Box::pin(
Expand Down
6 changes: 4 additions & 2 deletions src/sources/aws_sqs/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ mod tests {
config.decoding,
LogNamespace::Vector,
)
.build(),
.build()
.unwrap(),
"aws_sqs",
b"test",
Some(now),
Expand Down Expand Up @@ -297,7 +298,8 @@ mod tests {
config.decoding,
LogNamespace::Legacy,
)
.build(),
.build()
.unwrap(),
"aws_sqs",
b"test",
Some(now),
Expand Down
3 changes: 2 additions & 1 deletion src/sources/datadog_agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ impl SourceConfig for DatadogAgentConfig {
.clone();

let decoder =
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build();
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
.build()?;

let tls = MaybeTlsSettings::from_config(&self.tls, true)?;
let source = DatadogAgentSource::new(
Expand Down
6 changes: 4 additions & 2 deletions src/sources/demo_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,8 @@ impl SourceConfig for DemoLogsConfig {

self.format.validate()?;
let decoder =
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build();
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
.build()?;
Ok(Box::pin(demo_logs_source(
self.interval,
self.count,
Expand Down Expand Up @@ -361,7 +362,8 @@ mod tests {
default_decoding(),
LogNamespace::Legacy,
)
.build();
.build()
.unwrap();
demo_logs_source(
config.interval,
config.count,
Expand Down
2 changes: 1 addition & 1 deletion src/sources/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl SourceConfig for ExecConfig {
.clone()
.unwrap_or_else(|| self.decoding.default_stream_framing());
let decoder =
DecodingConfig::new(framing, self.decoding.clone(), LogNamespace::Legacy).build();
DecodingConfig::new(framing, self.decoding.clone(), LogNamespace::Legacy).build()?;

match &self.mode {
Mode::Scheduled => {
Expand Down
2 changes: 1 addition & 1 deletion src/sources/file_descriptors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub trait FileDescriptorConfig: NamedComponent {
let framing = self
.framing()
.unwrap_or_else(|| decoding.default_stream_framing());
let decoder = DecodingConfig::new(framing, decoding, log_namespace).build();
let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?;

let (sender, receiver) = mpsc::channel(1024);

Expand Down
2 changes: 1 addition & 1 deletion src/sources/gcp_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ impl SourceConfig for PubsubConfig {
self.decoding.clone(),
log_namespace,
)
.build(),
.build()?,
acknowledgements: cx.do_acknowledgements(self.acknowledgements),
shutdown: cx.shutdown,
out: cx.out,
Expand Down
3 changes: 2 additions & 1 deletion src/sources/heroku_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ impl SourceConfig for LogplexConfig {
let log_namespace = cx.log_namespace(self.log_namespace);

let decoder =
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build();
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
.build()?;

let source = LogplexSource {
query_parameters: self.query_parameters.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/sources/http_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl SourceConfig for HttpClientConfig {
let log_namespace = cx.log_namespace(self.log_namespace);

// build the decoder
let decoder = self.get_decoding_config(Some(log_namespace)).build();
let decoder = self.get_decoding_config(Some(log_namespace)).build()?;

let content_type = self.decoding.content_type(&self.framing).to_string();

Expand Down
2 changes: 1 addition & 1 deletion src/sources/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ fn remove_duplicates(mut list: Vec<String>, list_name: &str) -> Vec<String> {
#[typetag::serde(name = "http_server")]
impl SourceConfig for SimpleHttpConfig {
async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
let decoder = self.get_decoding_config()?.build();
let decoder = self.get_decoding_config()?.build()?;
let log_namespace = cx.log_namespace(self.log_namespace);

let source = SimpleHttpSource {
Expand Down
6 changes: 4 additions & 2 deletions src/sources/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,8 @@ impl SourceConfig for KafkaSourceConfig {

let consumer = create_consumer(self)?;
let decoder =
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build();
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
.build()?;
let acknowledgements = cx.do_acknowledgements(self.acknowledgements);

Ok(Box::pin(kafka_source(
Expand Down Expand Up @@ -1158,7 +1159,8 @@ mod integration_test {
config.decoding.clone(),
log_namespace,
)
.build();
.build()
.unwrap();

tokio::spawn(kafka_source(
config,
Expand Down
Loading