Skip to content

Commit

Permalink
fix: propagate config build error instead of panicking (#18124)
Browse files Browse the repository at this point in the history
* fix: propagate config build error instead of panicking

* update usages in integration tests

* propagate the actual error type instead of string
  • Loading branch information
pront authored Aug 2, 2023
1 parent 39ac765 commit f15b422
Show file tree
Hide file tree
Showing 23 changed files with 82 additions and 60 deletions.
5 changes: 2 additions & 3 deletions lib/codecs/src/decoding/format/protobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ pub struct ProtobufDeserializerConfig {

impl ProtobufDeserializerConfig {
/// Build the `ProtobufDeserializer` from this configuration.
pub fn build(&self) -> ProtobufDeserializer {
// TODO return a Result instead.
ProtobufDeserializer::try_from(self).unwrap()
pub fn build(&self) -> vector_common::Result<ProtobufDeserializer> {
ProtobufDeserializer::try_from(self)
}

/// Return the type of event build by this deserializer.
Expand Down
18 changes: 10 additions & 8 deletions lib/codecs/src/decoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,16 +277,18 @@ impl From<NativeJsonDeserializerConfig> for DeserializerConfig {

impl DeserializerConfig {
/// Build the `Deserializer` from this configuration.
pub fn build(&self) -> Deserializer {
pub fn build(&self) -> vector_common::Result<Deserializer> {
match self {
DeserializerConfig::Bytes => Deserializer::Bytes(BytesDeserializerConfig.build()),
DeserializerConfig::Json(config) => Deserializer::Json(config.build()),
DeserializerConfig::Protobuf(config) => Deserializer::Protobuf(config.build()),
DeserializerConfig::Bytes => Ok(Deserializer::Bytes(BytesDeserializerConfig.build())),
DeserializerConfig::Json(config) => Ok(Deserializer::Json(config.build())),
DeserializerConfig::Protobuf(config) => Ok(Deserializer::Protobuf(config.build()?)),
#[cfg(feature = "syslog")]
DeserializerConfig::Syslog(config) => Deserializer::Syslog(config.build()),
DeserializerConfig::Native => Deserializer::Native(NativeDeserializerConfig.build()),
DeserializerConfig::NativeJson(config) => Deserializer::NativeJson(config.build()),
DeserializerConfig::Gelf(config) => Deserializer::Gelf(config.build()),
DeserializerConfig::Syslog(config) => Ok(Deserializer::Syslog(config.build())),
DeserializerConfig::Native => {
Ok(Deserializer::Native(NativeDeserializerConfig.build()))
}
DeserializerConfig::NativeJson(config) => Ok(Deserializer::NativeJson(config.build())),
DeserializerConfig::Gelf(config) => Ok(Deserializer::Gelf(config.build())),
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/codecs/decoding/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ impl DecodingConfig {
}

/// Builds a `Decoder` from the provided configuration.
pub fn build(&self) -> Decoder {
pub fn build(&self) -> vector_common::Result<Decoder> {
// Build the framer.
let framer = self.framing.build();

// Build the deserializer.
let deserializer = self.decoding.build();
let deserializer = self.decoding.build()?;

Decoder::new(framer, deserializer).with_log_namespace(self.log_namespace)
Ok(Decoder::new(framer, deserializer).with_log_namespace(self.log_namespace))
}
}
16 changes: 10 additions & 6 deletions src/components/validation/resources/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,15 @@ impl HttpResourceConfig {
codec: ResourceCodec,
output_tx: mpsc::Sender<Vec<Event>>,
task_coordinator: &TaskCoordinator<Configuring>,
) {
) -> vector_common::Result<()> {
match direction {
// We'll pull data from the sink.
ResourceDirection::Pull => {
spawn_output_http_client(self, codec, output_tx, task_coordinator)
}
ResourceDirection::Pull => Ok(spawn_output_http_client(
self,
codec,
output_tx,
task_coordinator,
)),
// The sink will push data to us.
ResourceDirection::Push => {
spawn_output_http_server(self, codec, output_tx, task_coordinator)
Expand Down Expand Up @@ -227,12 +230,12 @@ fn spawn_output_http_server(
codec: ResourceCodec,
output_tx: mpsc::Sender<Vec<Event>>,
task_coordinator: &TaskCoordinator<Configuring>,
) {
) -> vector_common::Result<()> {
// This HTTP server will wait for events to be sent by a sink, and collect them and send them on
// via an output sender. We accept/collect events until we're told to shutdown.

// First, we'll build and spawn our HTTP server.
let decoder = codec.into_decoder();
let decoder = codec.into_decoder()?;

let (_, http_server_shutdown_tx) =
spawn_http_server(task_coordinator, &config, move |request| {
Expand Down Expand Up @@ -276,6 +279,7 @@ fn spawn_output_http_server(

debug!("HTTP server external output resource completed.");
});
Ok(())
}

/// Spawns an HTTP client that pulls events by making requests to an HTTP server driven by a sink.
Expand Down
14 changes: 8 additions & 6 deletions src/components/validation/resources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,24 +96,24 @@ impl ResourceCodec {
///
/// The decoder is generated as an inverse to the input codec: if an encoding configuration was
/// given, we generate a decoder that satisfies that encoding configuration, and vice versa.
pub fn into_decoder(&self) -> Decoder {
pub fn into_decoder(&self) -> vector_common::Result<Decoder> {
let (framer, deserializer) = match self {
Self::Decoding(config) => return config.build(),
Self::Encoding(config) => (
encoder_framing_to_decoding_framer(config.config().default_stream_framing()),
serializer_config_to_deserializer(config.config()),
serializer_config_to_deserializer(config.config())?,
),
Self::EncodingWithFraming(config) => {
let (maybe_framing, serializer) = config.config();
let framing = maybe_framing.clone().unwrap_or(FramingConfig::Bytes);
(
encoder_framing_to_decoding_framer(framing),
serializer_config_to_deserializer(serializer),
serializer_config_to_deserializer(serializer)?,
)
}
};

Decoder::new(framer, deserializer)
Ok(Decoder::new(framer, deserializer))
}
}

Expand Down Expand Up @@ -178,7 +178,9 @@ fn decoder_framing_to_encoding_framer(framing: &decoding::FramingConfig) -> enco
framing_config.build()
}

fn serializer_config_to_deserializer(config: &SerializerConfig) -> decoding::Deserializer {
fn serializer_config_to_deserializer(
config: &SerializerConfig,
) -> vector_common::Result<decoding::Deserializer> {
let deserializer_config = match config {
SerializerConfig::Avro { .. } => todo!(),
SerializerConfig::Csv { .. } => todo!(),
Expand Down Expand Up @@ -311,7 +313,7 @@ impl ExternalResource {
self,
output_tx: mpsc::Sender<Vec<Event>>,
task_coordinator: &TaskCoordinator<Configuring>,
) {
) -> vector_common::Result<()> {
match self.definition {
ResourceDefinition::Http(http_config) => {
http_config.spawn_as_output(self.direction, self.codec, output_tx, task_coordinator)
Expand Down
18 changes: 9 additions & 9 deletions src/components/validation/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl Runner {
}
}

pub async fn run_validation(self) -> Result<Vec<RunnerResults>, String> {
pub async fn run_validation(self) -> Result<Vec<RunnerResults>, vector_common::Error> {
// Initialize our test environment.
initialize_test_environment();

Expand Down Expand Up @@ -251,7 +251,7 @@ impl Runner {
&self.configuration,
&input_task_coordinator,
&output_task_coordinator,
);
)?;
let input_tx = runner_input.into_sender(controlled_edges.input);
let output_rx = runner_output.into_receiver(controlled_edges.output);
debug!("External resource (if any) and controlled edges built and spawned.");
Expand Down Expand Up @@ -413,7 +413,7 @@ fn build_external_resource(
configuration: &ValidationConfiguration,
input_task_coordinator: &TaskCoordinator<Configuring>,
output_task_coordinator: &TaskCoordinator<Configuring>,
) -> (RunnerInput, RunnerOutput, Option<Encoder<encoding::Framer>>) {
) -> Result<(RunnerInput, RunnerOutput, Option<Encoder<encoding::Framer>>), vector_common::Error> {
let component_type = configuration.component_type();
let maybe_external_resource = configuration.external_resource();
let maybe_encoder = maybe_external_resource
Expand All @@ -430,15 +430,15 @@ fn build_external_resource(
maybe_external_resource.expect("a source must always have an external resource");
resource.spawn_as_input(rx, input_task_coordinator);

(
Ok((
RunnerInput::External(tx),
RunnerOutput::Controlled,
maybe_encoder,
)
))
}
ComponentType::Transform => {
// Transforms have no external resources.
(RunnerInput::Controlled, RunnerOutput::Controlled, None)
Ok((RunnerInput::Controlled, RunnerOutput::Controlled, None))
}
ComponentType::Sink => {
// As an external resource for a sink, we create a channel that the validation runner
Expand All @@ -448,13 +448,13 @@ fn build_external_resource(
let (tx, rx) = mpsc::channel(1024);
let resource =
maybe_external_resource.expect("a sink must always have an external resource");
resource.spawn_as_output(tx, output_task_coordinator);
resource.spawn_as_output(tx, output_task_coordinator)?;

(
Ok((
RunnerInput::Controlled,
RunnerOutput::External(rx),
maybe_encoder,
)
))
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/sources/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ fn default_offset_key() -> OptionalValuePath {
impl_generate_config_from_default!(AmqpSourceConfig);

impl AmqpSourceConfig {
fn decoder(&self, log_namespace: LogNamespace) -> Decoder {
fn decoder(&self, log_namespace: LogNamespace) -> vector_common::Result<Decoder> {
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build()
}
}
Expand Down Expand Up @@ -317,7 +317,8 @@ async fn receive_event(
msg: Delivery,
) -> Result<(), ()> {
let payload = Cursor::new(Bytes::copy_from_slice(&msg.data));
let mut stream = FramedRead::new(payload, config.decoder(log_namespace));
let decoder = config.decoder(log_namespace).map_err(|_e| ())?;
let mut stream = FramedRead::new(payload, decoder);

// Extract timestamp from AMQP message
let timestamp = msg
Expand Down
3 changes: 2 additions & 1 deletion src/sources/aws_kinesis_firehose/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ impl SourceConfig for AwsKinesisFirehoseConfig {
async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
let log_namespace = cx.log_namespace(self.log_namespace);
let decoder =
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build();
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
.build()?;

let acknowledgements = cx.do_acknowledgements(self.acknowledgements);

Expand Down
3 changes: 2 additions & 1 deletion src/sources/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ impl AwsS3Config {
.await?;

let decoder =
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build();
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
.build()?;

match self.sqs {
Some(ref sqs) => {
Expand Down
3 changes: 2 additions & 1 deletion src/sources/aws_sqs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ impl SourceConfig for AwsSqsConfig {

let client = self.build_client(&cx).await?;
let decoder =
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build();
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
.build()?;
let acknowledgements = cx.do_acknowledgements(self.acknowledgements);

Ok(Box::pin(
Expand Down
6 changes: 4 additions & 2 deletions src/sources/aws_sqs/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ mod tests {
config.decoding,
LogNamespace::Vector,
)
.build(),
.build()
.unwrap(),
"aws_sqs",
b"test",
Some(now),
Expand Down Expand Up @@ -297,7 +298,8 @@ mod tests {
config.decoding,
LogNamespace::Legacy,
)
.build(),
.build()
.unwrap(),
"aws_sqs",
b"test",
Some(now),
Expand Down
3 changes: 2 additions & 1 deletion src/sources/datadog_agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ impl SourceConfig for DatadogAgentConfig {
.clone();

let decoder =
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build();
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
.build()?;

let tls = MaybeTlsSettings::from_config(&self.tls, true)?;
let source = DatadogAgentSource::new(
Expand Down
6 changes: 4 additions & 2 deletions src/sources/demo_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,8 @@ impl SourceConfig for DemoLogsConfig {

self.format.validate()?;
let decoder =
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build();
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
.build()?;
Ok(Box::pin(demo_logs_source(
self.interval,
self.count,
Expand Down Expand Up @@ -361,7 +362,8 @@ mod tests {
default_decoding(),
LogNamespace::Legacy,
)
.build();
.build()
.unwrap();
demo_logs_source(
config.interval,
config.count,
Expand Down
2 changes: 1 addition & 1 deletion src/sources/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl SourceConfig for ExecConfig {
.clone()
.unwrap_or_else(|| self.decoding.default_stream_framing());
let decoder =
DecodingConfig::new(framing, self.decoding.clone(), LogNamespace::Legacy).build();
DecodingConfig::new(framing, self.decoding.clone(), LogNamespace::Legacy).build()?;

match &self.mode {
Mode::Scheduled => {
Expand Down
2 changes: 1 addition & 1 deletion src/sources/file_descriptors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub trait FileDescriptorConfig: NamedComponent {
let framing = self
.framing()
.unwrap_or_else(|| decoding.default_stream_framing());
let decoder = DecodingConfig::new(framing, decoding, log_namespace).build();
let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?;

let (sender, receiver) = mpsc::channel(1024);

Expand Down
2 changes: 1 addition & 1 deletion src/sources/gcp_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ impl SourceConfig for PubsubConfig {
self.decoding.clone(),
log_namespace,
)
.build(),
.build()?,
acknowledgements: cx.do_acknowledgements(self.acknowledgements),
shutdown: cx.shutdown,
out: cx.out,
Expand Down
3 changes: 2 additions & 1 deletion src/sources/heroku_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ impl SourceConfig for LogplexConfig {
let log_namespace = cx.log_namespace(self.log_namespace);

let decoder =
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build();
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
.build()?;

let source = LogplexSource {
query_parameters: self.query_parameters.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/sources/http_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl SourceConfig for HttpClientConfig {
let log_namespace = cx.log_namespace(self.log_namespace);

// build the decoder
let decoder = self.get_decoding_config(Some(log_namespace)).build();
let decoder = self.get_decoding_config(Some(log_namespace)).build()?;

let content_type = self.decoding.content_type(&self.framing).to_string();

Expand Down
2 changes: 1 addition & 1 deletion src/sources/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ fn remove_duplicates(mut list: Vec<String>, list_name: &str) -> Vec<String> {
#[typetag::serde(name = "http_server")]
impl SourceConfig for SimpleHttpConfig {
async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
let decoder = self.get_decoding_config()?.build();
let decoder = self.get_decoding_config()?.build()?;
let log_namespace = cx.log_namespace(self.log_namespace);

let source = SimpleHttpSource {
Expand Down
6 changes: 4 additions & 2 deletions src/sources/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,8 @@ impl SourceConfig for KafkaSourceConfig {

let consumer = create_consumer(self)?;
let decoder =
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build();
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
.build()?;
let acknowledgements = cx.do_acknowledgements(self.acknowledgements);

Ok(Box::pin(kafka_source(
Expand Down Expand Up @@ -1158,7 +1159,8 @@ mod integration_test {
config.decoding.clone(),
log_namespace,
)
.build();
.build()
.unwrap();

tokio::spawn(kafka_source(
config,
Expand Down
Loading

0 comments on commit f15b422

Please sign in to comment.