From c5a026f9817861f597ac6cd9b2702e0f0173f17b Mon Sep 17 00:00:00 2001 From: "robert.ayrapetyan" Date: Thu, 1 Sep 2022 14:37:23 -0700 Subject: [PATCH] fix kafka: wait for metadata Kafka's instance metadata could be unavailable (because it's being filled asynchronously). extract_send_partition() is based on a metadata, so it may return `None` for partition and later cause all type of warning messages (e.g. `Invalid type NoneType for attribute value. Expected one of ['bool', 'str', 'bytes', 'int', 'float'] or a sequence of those types`). The proposed fix makes sure metadata is pre-populated (based on https://github.com/dpkp/kafka-python/blob/4d598055dab7da99e41bfcceffa8462b32931cdd/kafka/producer/kafka.py#L579). I'm just not sure if we should wrap `_wait_on_metadata` into try\except, maybe just passing Exception to the caller would be a better idea... --- .../src/opentelemetry/instrumentation/kafka/utils.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py b/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py index 52344ceaff..98ec1e357a 100644 --- a/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py @@ -2,6 +2,7 @@ from logging import getLogger from typing import Callable, Dict, List, Optional +from kafka.errors import errors as KafkaErrors from kafka.record.abc import ABCRecord from opentelemetry import context, propagate, trace @@ -146,6 +147,10 @@ def _traced_send(func, instance, args, kwargs): kwargs["headers"] = headers topic = KafkaPropertiesExtractor.extract_send_topic(args, kwargs) + try: + instance._wait_on_metadata(topic, instance.config['max_block_ms'] / 1000.0) + except KafkaErrors.BrokerResponseError as kafka_exception: + _LOG.exception(kafka_exception) bootstrap_servers = KafkaPropertiesExtractor.extract_bootstrap_servers( instance )