Skip to content

Commit

Permalink
feat(kafka): add consumer_lag metrics to Kafka (#15106)
Browse files Browse the repository at this point in the history
* fix: fix a typo

- fix a typo in Kafka error code name

Tested:
- No

* feat(kafka): add consumer_lag metric

- add consumer_lag metric to Kafka

Tested:
- Local run with Kafka

* feat(kafka): add metric option

* feat: add topic lag configuration

* docs: update docs

* docs: update new config option description

* docs: one more fix
  • Loading branch information
zamazan4ik authored Feb 22, 2023
1 parent 32c8452 commit 547257b
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 6 deletions.
13 changes: 11 additions & 2 deletions src/internal_events/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ impl InternalEvent for KafkaReadError {
#[derive(Debug)]
pub struct KafkaStatisticsReceived<'a> {
pub statistics: &'a rdkafka::Statistics,
pub expose_lag_metrics: bool,
}

impl InternalEvent for KafkaStatisticsReceived<'_> {
Expand Down Expand Up @@ -149,6 +150,14 @@ impl InternalEvent for KafkaStatisticsReceived<'_> {
"kafka_consumed_messages_bytes_total",
self.statistics.rxmsg_bytes as u64
);

if self.expose_lag_metrics {
for (topic_id, topic) in &self.statistics.topics {
for (partition_id, partition) in &topic.partitions {
gauge!("kafka_consumer_lag", partition.consumer_lag as f64, "topic_id" => topic_id.clone(), "partition_id" => partition_id.to_string());
}
}
}
}
}

Expand All @@ -160,15 +169,15 @@ impl InternalEvent for KafkaHeaderExtractionError<'_> {
fn emit(self) {
error!(
message = "Failed to extract header. Value should be a map of String -> Bytes.",
error_code = "extracing_header",
error_code = "extracting_header",
error_type = error_type::PARSER_FAILED,
stage = error_stage::RECEIVING,
header_field = self.header_field,
internal_log_rate_limit = true,
);
counter!(
"component_errors_total", 1,
"error_code" => "extracing_header",
"error_code" => "extracting_header",
"error_type" => error_type::PARSER_FAILED,
"stage" => error_stage::RECEIVING,
);
Expand Down
7 changes: 5 additions & 2 deletions src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,15 @@ fn pathbuf_to_string(path: &Path) -> crate::Result<&str> {
}

#[derive(Default)]
pub(crate) struct KafkaStatisticsContext;
pub(crate) struct KafkaStatisticsContext {
pub(crate) expose_lag_metrics: bool,
}

impl ClientContext for KafkaStatisticsContext {
fn stats(&self, statistics: Statistics) {
emit!(KafkaStatisticsReceived {
statistics: &statistics
statistics: &statistics,
expose_lag_metrics: self.expose_lag_metrics,
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/kafka/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub(crate) fn create_producer(
client_config: ClientConfig,
) -> crate::Result<FutureProducer<KafkaStatisticsContext>> {
let producer = client_config
.create_with_context(KafkaStatisticsContext)
.create_with_context(KafkaStatisticsContext::default())
.context(KafkaCreateFailedSnafu)?;
Ok(producer)
}
Expand Down
25 changes: 24 additions & 1 deletion src/sources/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ enum BuildError {
KafkaSubscribeError { source: rdkafka::error::KafkaError },
}

/// Metrics configuration.
#[configurable_component]
#[derive(Clone, Debug, Default)]
struct Metrics {
/// Expose topic lag metrics for all topics and partitions. Metric names are `kafka_consumer_lag`.
pub topic_lag_metric: bool,
}

/// Configuration for the `kafka` source.
#[serde_as]
#[configurable_component(source("kafka"))]
Expand Down Expand Up @@ -201,6 +209,10 @@ pub struct KafkaSourceConfig {
#[configurable(metadata(docs::hidden))]
#[serde(default)]
log_namespace: Option<bool>,

#[configurable(derived)]
#[serde(default)]
metrics: Metrics,
}

impl KafkaSourceConfig {
Expand Down Expand Up @@ -697,7 +709,9 @@ fn create_consumer(config: &KafkaSourceConfig) -> crate::Result<StreamConsumer<C
}

let consumer = client_config
.create_with_context::<_, StreamConsumer<_>>(CustomContext::default())
.create_with_context::<_, StreamConsumer<_>>(CustomContext::new(
config.metrics.topic_lag_metric,
))
.context(KafkaCreateSnafu)?;
let topics: Vec<&str> = config.topics.iter().map(|s| s.as_str()).collect();
consumer.subscribe(&topics).context(KafkaSubscribeSnafu)?;
Expand All @@ -711,6 +725,15 @@ struct CustomContext {
finalizer: OnceCell<Arc<OrderedFinalizer<FinalizerEntry>>>,
}

impl CustomContext {
fn new(expose_lag_metrics: bool) -> Self {
Self {
stats: kafka::KafkaStatisticsContext { expose_lag_metrics },
..Default::default()
}
}
}

impl ClientContext for CustomContext {
fn stats(&self, statistics: Statistics) {
self.stats.stats(statistics)
Expand Down
9 changes: 9 additions & 0 deletions website/cue/reference/components/sources/base/kafka.cue
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,15 @@ base: components: sources: kafka: configuration: {
}
}
}
metrics: {
description: "Metrics configuration."
required: false
type: object: options: topic_lag_metric: {
description: "Expose topic lag metrics for all topics and partitions. Metric names are `kafka_consumer_lag`."
required: false
type: bool: default: false
}
}
offset_key: {
description: """
Overrides the name of the log field used to add the offset to each event.
Expand Down
15 changes: 15 additions & 0 deletions website/cue/reference/components/sources/internal_metrics.cue
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,21 @@ components: sources: internal_metrics: {
default_namespace: "vector"
tags: _component_tags
}
kafka_consumer_lag: {
description: "The Kafka consumer lag."
type: "gauge"
default_namespace: "vector"
tags: _component_tags & {
topic_id: {
description: "The Kafka topic id."
required: true
}
partition_id: {
description: "The Kafka partition id."
required: true
}
}
}
file_delete_errors_total: {
description: "The total number of failures to delete a file. This metric is deprecated in favor of `component_errors_total`."
type: "counter"
Expand Down
1 change: 1 addition & 0 deletions website/cue/reference/components/sources/kafka.cue
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ components: sources: kafka: {
kafka_produced_messages_bytes_total: components.sources.internal_metrics.output.metrics.kafka_produced_messages_bytes_total
kafka_consumed_messages_total: components.sources.internal_metrics.output.metrics.kafka_consumed_messages_total
kafka_consumed_messages_bytes_total: components.sources.internal_metrics.output.metrics.kafka_consumed_messages_bytes_total
kafka_consumer_lag: components.sources.internal_metrics.output.metrics.kafka_consumer_lag
processed_bytes_total: components.sources.internal_metrics.output.metrics.processed_bytes_total
processed_events_total: components.sources.internal_metrics.output.metrics.processed_events_total
component_discarded_events_total: components.sources.internal_metrics.output.metrics.component_discarded_events_total
Expand Down

0 comments on commit 547257b

Please sign in to comment.