Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AttributeError: 'confluent_kafka.cimpl.KafkaError' object has no attribute '__traceback__' #1897

Open
3 of 7 tasks
mohsen-eng74 opened this issue Jan 8, 2025 · 0 comments

Comments

@mohsen-eng74
Copy link

mohsen-eng74 commented Jan 8, 2025

Description

The behavior of the confluent_kafka.cimpl.KafkaError object is somewhat unusual. The documentation mentions it serves multiple purposes, and it seems itself is not an exception. However, unlike other exceptions that must be derived from builtins.BaseException, Python doesn't complain any issues when you attempt to raise an instance of this class.

>>> from confluent_kafka import KafkaError
>>> raise KafkaError(-195) 
# Nothing will raises here, and also Python interpreter doesn't complain about
>>> [issubclass(KafkaError, BaseException), isinstance(KafkaError(-195), BaseException)]
[False, False]

>>> class Sentinel: pass
>>> raise Sentinel() # Expected behaviour, if the object is not an exception
TypeError: exceptions must derive from BaseException

However, similar to other Exceptions, you can catch it:

>>> try:
...     raise KafkaError(-195) 
... except KafkaError:
...     print("Oops!")
Oops!

But if you catch it, and also want to log the traceback, you can't:

>>> import logging
>>> logging.basicConfig(level=logging.INFO, handlers=[logging.StreamHandler()])
>>> logger = logging.getLogger("root")

>>> try:
...     raise KafkaError(-195)
... except KafkaError:
...     logger.exception("Oops!", exc_info=True, stack_info=True)
--- Logging error ---
Exception in thread MainThread:
...
AttributeError: 'confluent_kafka.cimpl.KafkaError' object has no attribute '__traceback__'

Fortunately, I haven't seen the confluent_kafka.cimpl.KafkaError object being directly raised by the client itself, which suggests it is safe in that regard because it cannot be catched with builtins.Exception class. However, this object is used as an argument in several callbacks that can be registered with confluent_kafka.Consumer or confluent_kafka.Producer, such as error_cb, on_delivery, and on_commit. In these instances, attempting to log any exceptions with traceback, even if within your logic, can result in the worker thread being terminated.

How to reproduce

import logging
import threading
from typing import Never

from confluent_kafka import Consumer, KafkaError

logger = logging.getLogger(__name__)


def kafka_error_cb(err: KafkaError) -> None:
    logger.exception("Oops! Something went wrong", exc_info=True, stack_info=True)


def target() -> Never:
    consumer = Consumer(
        {
            "bootstrap.servers": "localhost:9092",
            "group.id": "consumer-1",
            "error_cb": kafka_error_cb,
        }
    )
    consumer.subscribe(["topic-1"])

    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            logger.error(rf"Error: {msg.error()}")
            continue
        logger.info(rf"Message: {msg.value()}")


def main() -> Never:
    worker = threading.Thread(target=target)
    worker.start()
    worker.join()


if __name__ == "__main__":
    main()

While the consumer worker is still running, if the Kafka broker is restarted, an error of type KafkaError occurs (i.e. KafkaError(-195)), the next call to consumer.poll triggers the worker's thread termination due to logging the exception traceback within the kafka_error_cb callback. Although I haven't tested this, But if you implement your logic within these callbacks and attempt to catch and log those exceptions, it may behave similarly and cause the thread to terminate.

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): 2.6.1
  • Apache Kafka broker version: 3.7
  • Client configuration: {...}
  • Operating system: linux-debain-bookworm
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant