diff --git a/.template/mandatory_files.txt b/.template/mandatory_files.txt index 660a15ed..fcfb0f85 100644 --- a/.template/mandatory_files.txt +++ b/.template/mandatory_files.txt @@ -23,6 +23,7 @@ lock/requirements-dev.txt lock/requirements.txt Dockerfile +Dockerfile.debian config_schema.json example_config.yaml LICENSE diff --git a/.template/mandatory_files_ignore.txt b/.template/mandatory_files_ignore.txt index 4ac1451a..84a2ac38 100644 --- a/.template/mandatory_files_ignore.txt +++ b/.template/mandatory_files_ignore.txt @@ -10,6 +10,7 @@ scripts/script_utils/fastapi_app_location.py Dockerfile +Dockerfile.debian config_schema.json example_config.yaml diff --git a/src/hexkit/config.py b/src/hexkit/config.py index 29fe1eb6..e8f9bd2e 100644 --- a/src/hexkit/config.py +++ b/src/hexkit/config.py @@ -143,7 +143,7 @@ class ModSettings(settings): model_config = SettingsConfigDict(frozen=True, env_prefix=f"{prefix}_") @classmethod - def settings_customise_sources( # noqa: PLR0913 + def settings_customise_sources( cls, settings_cls: type[BaseSettings], init_settings: PydanticBaseSettingsSource, diff --git a/src/hexkit/custom_types.py b/src/hexkit/custom_types.py index 7f6a5129..233ec922 100644 --- a/src/hexkit/custom_types.py +++ b/src/hexkit/custom_types.py @@ -27,12 +27,12 @@ # A type indicating that a string should be ascii-compatible. -# Technically it is an alias for `str` so it only serves documention purposes. +# Technically it is an alias for `str` so it only serves documentation purposes. Ascii = str # A AsyncConstructable is a class with a async (class-)method `construct` that is used when -# asynchronous constuction/instantiation logic is needed (which cannot be handeled in +# asynchronous construction/instantiation logic is needed (which cannot be handled in # a synchronous __init__ method). # With the current typing features of Python, it seems not possible to correctly type # a class with that signature. diff --git a/src/hexkit/protocols/eventpub.py b/src/hexkit/protocols/eventpub.py index 1020b09f..7ffb541a 100644 --- a/src/hexkit/protocols/eventpub.py +++ b/src/hexkit/protocols/eventpub.py @@ -17,6 +17,8 @@ """Protocol related to event publishing.""" from abc import ABC, abstractmethod +from collections.abc import Mapping +from typing import Optional from hexkit.custom_types import Ascii, JsonObject from hexkit.utils import check_ascii @@ -26,31 +28,52 @@ class EventPublisherProtocol(ABC): """A protocol for publishing events to an event broker.""" async def publish( - self, *, payload: JsonObject, type_: Ascii, key: Ascii, topic: Ascii + self, + *, + payload: JsonObject, + type_: Ascii, + key: Ascii, + topic: Ascii, + headers: Optional[Mapping[str, str]] = None, ) -> None: """Publish an event. Args: - payload (JSON): The payload to ship with the event. - type_ (str): The event type. ASCII characters only. - key (str): The event type. ASCII characters only. - topic (str): The event type. ASCII characters only. + - `payload` (JSON): The payload to ship with the event. + - `type_` (str): The event type. ASCII characters only. + - `key` (str): The event type. ASCII characters only. + - `topic` (str): The event type. ASCII characters only. + - `headers`: Additional headers to attach to the event. """ check_ascii(type_, key, topic) + if headers is None: + headers = {} + await self._publish_validated( - payload=payload, type_=type_, key=key, topic=topic + payload=payload, + type_=type_, + key=key, + topic=topic, + headers=headers, ) @abstractmethod async def _publish_validated( - self, *, payload: JsonObject, type_: Ascii, key: Ascii, topic: Ascii + self, + *, + payload: JsonObject, + type_: Ascii, + key: Ascii, + topic: Ascii, + headers: Mapping[str, str], ) -> None: """Publish an event with already validated topic and type. Args: - payload (JSON): The payload to ship with the event. - type_ (str): The event type. ASCII characters only. - key (str): The event type. ASCII characters only. - topic (str): The event type. ASCII characters only. + - `payload` (JSON): The payload to ship with the event. + - `type_` (str): The event type. ASCII characters only. + - `key` (str): The event type. ASCII characters only. + - `topic` (str): The event type. ASCII characters only. + - `headers`: Additional headers to attach to the event. """ ... diff --git a/src/hexkit/providers/akafka/config.py b/src/hexkit/providers/akafka/config.py index 5868a796..d88353f4 100644 --- a/src/hexkit/providers/akafka/config.py +++ b/src/hexkit/providers/akafka/config.py @@ -18,7 +18,7 @@ from typing import Literal -from pydantic import Field, PositiveInt, SecretStr +from pydantic import Field, NonNegativeInt, PositiveInt, SecretStr, model_validator from pydantic_settings import BaseSettings @@ -26,12 +26,12 @@ class KafkaConfig(BaseSettings): """Config parameters needed for connecting to Apache Kafka.""" service_name: str = Field( - ..., + default=..., examples=["my-cool-special-service"], description="The name of the (micro-)service from which messages are published.", ) service_instance_id: str = Field( - ..., + default=..., examples=["germany-bw-instance-001"], description=( "A string that uniquely identifies this instance across all instances of" @@ -40,7 +40,7 @@ class KafkaConfig(BaseSettings): ), ) kafka_servers: list[str] = Field( - ..., + default=..., examples=[["localhost:9092"]], description="A list of connection strings to connect to Kafka bootstrap servers.", ) @@ -82,3 +82,62 @@ class KafkaConfig(BaseSettings): + " services that have a need to send/receive larger messages should set this.", examples=[1024 * 1024, 16 * 1024 * 1024], ) + kafka_dlq_topic: str = Field( + default="", + description="The name of the service-specific topic used for the dead letter queue.", + examples=["dcs-dlq", "ifrs-dlq", "mass-dlq"], + title="Kafka DLQ Topic", + ) + kafka_retry_topic: str = Field( + default="", + description=( + "The name of the service-specific topic used to retry previously failed events." + ), + title="Kafka Retry Topic", + examples=["dcs-dlq-retry", "ifrs-dlq-retry", "mass-dlq-retry"], + ) + kafka_max_retries: NonNegativeInt = Field( + default=0, + description=( + "The maximum number of times to immediately retry consuming an event upon" + + " failure. Works independently of the dead letter queue." + ), + title="Kafka Max Retries", + examples=[0, 1, 2, 3, 5], + ) + kafka_enable_dlq: bool = Field( + default=False, + description=( + "A flag to toggle the dead letter queue. If set to False, the service will" + + " crash upon exhausting retries instead of publishing events to the DLQ." + + " If set to True, the service will publish events to the DLQ topic after" + + " exhausting all retries, and both `kafka_dlq_topic` and" + + " `kafka_retry_topic` must be set." + ), + title="Kafka Enable DLQ", + examples=[True, False], + ) + kafka_retry_backoff: NonNegativeInt = Field( + default=0, + description=( + "The number of seconds to wait before retrying a failed event. The backoff" + + " time is doubled for each retry attempt." + ), + title="Kafka Retry Backoff", + examples=[0, 1, 2, 3, 5], + ) + + @model_validator(mode="after") + def validate_retry_topic(self): + """Ensure that the retry topic is not the same as the DLQ topic.""" + if self.kafka_retry_topic and self.kafka_retry_topic == self.kafka_dlq_topic: + raise ValueError( + "kafka_retry_topic and kafka_dlq_topic cannot be the same." + ) + if self.kafka_enable_dlq and not ( + self.kafka_dlq_topic and self.kafka_retry_topic + ): + raise ValueError( + "Both kafka_dlq_topic and kafka_retry_topic must be set when the DLQ is enabled." + ) + return self diff --git a/src/hexkit/providers/akafka/provider/__init__.py b/src/hexkit/providers/akafka/provider/__init__.py index 2ca5829f..3794dd7f 100644 --- a/src/hexkit/providers/akafka/provider/__init__.py +++ b/src/hexkit/providers/akafka/provider/__init__.py @@ -23,16 +23,22 @@ from .eventpub import KafkaEventPublisher from .eventsub import ( ConsumerEvent, + ExtractedEventInfo, + KafkaDLQSubscriber, KafkaEventSubscriber, - get_header_value, headers_as_dict, + process_dlq_event, + validate_dlq_headers, ) __all__ = [ "KafkaEventPublisher", + "ExtractedEventInfo", "KafkaEventSubscriber", "ConsumerEvent", - "get_header_value", "headers_as_dict", "KafkaOutboxSubscriber", + "KafkaDLQSubscriber", + "process_dlq_event", + "validate_dlq_headers", ] diff --git a/src/hexkit/providers/akafka/provider/daosub.py b/src/hexkit/providers/akafka/provider/daosub.py index f46f3190..41845d63 100644 --- a/src/hexkit/providers/akafka/provider/daosub.py +++ b/src/hexkit/providers/akafka/provider/daosub.py @@ -21,6 +21,7 @@ import logging from collections.abc import Sequence from contextlib import asynccontextmanager +from typing import Optional from aiokafka import AIOKafkaConsumer from pydantic import ValidationError @@ -31,6 +32,7 @@ DaoSubscriberProtocol, DtoValidationError, ) +from hexkit.protocols.eventpub import EventPublisherProtocol from hexkit.protocols.eventsub import EventSubscriberProtocol from hexkit.providers.akafka.config import KafkaConfig from hexkit.providers.akafka.provider.eventsub import ( @@ -85,8 +87,8 @@ async def _consume_validated( dto = translator.dto_model.model_validate(payload) except ValidationError as error: message = ( - f"The event of type {type_} on topic {topic} was not valid wrt. the" - + " DTO model." + f"The event of type {type_} on topic {topic}" + + " was not valid wrt. the DTO model." ) logging.error(message) raise DtoValidationError(message) from error @@ -110,21 +112,34 @@ async def construct( *, config: KafkaConfig, translators: Sequence[DaoSubscriberProtocol], + dlq_publisher: Optional[EventPublisherProtocol] = None, kafka_consumer_cls: type[KafkaConsumerCompatible] = AIOKafkaConsumer, ): """Setup and teardown an instance of the provider. Args: - config: MongoDB-specific config parameters. + - `config`: MongoDB-specific config parameters. + - `translators`: A sequence of translators implementing the + `DaoSubscriberProtocol`. + - `dlq_publisher`: An instance of the publisher to use for the DLQ. Can be None + if not using the dead letter queue. It is used to publish events to the DLQ. + - `kafka_consumer_cls`: The Kafka consumer class to use. Defaults to + `AIOKafkaConsumer`. Returns: An instance of the provider. """ translator_converter = TranslatorConverter(translators=translators) + if config.kafka_enable_dlq and dlq_publisher is None: + error = ValueError("A publisher is required when the DLQ is enabled.") + logging.error(error) + raise error + async with KafkaEventSubscriber.construct( config=config, translator=translator_converter, + dlq_publisher=dlq_publisher, kafka_consumer_cls=kafka_consumer_cls, ) as event_subscriber: yield cls(event_subscriber=event_subscriber) diff --git a/src/hexkit/providers/akafka/provider/eventpub.py b/src/hexkit/providers/akafka/provider/eventpub.py index a4e8772b..d4eae271 100644 --- a/src/hexkit/providers/akafka/provider/eventpub.py +++ b/src/hexkit/providers/akafka/provider/eventpub.py @@ -22,6 +22,7 @@ import json import logging import ssl +from collections.abc import Mapping from contextlib import asynccontextmanager from typing import Any, Callable, Optional, Protocol @@ -41,6 +42,8 @@ generate_ssl_context, ) +RESERVED_HEADERS = ["type", "correlation_id"] + class KafkaProducerCompatible(Protocol): """A python duck type protocol describing an AIOKafkaProducer or equivalent.""" @@ -150,15 +153,22 @@ def __init__( self._generate_correlation_id = generate_correlation_id async def _publish_validated( - self, *, payload: JsonObject, type_: Ascii, key: Ascii, topic: Ascii + self, + *, + payload: JsonObject, + type_: Ascii, + key: Ascii, + topic: Ascii, + headers: Mapping[str, str], ) -> None: """Publish an event with already validated topic and type. Args: - payload (JSON): The payload to ship with the event. - type_ (str): The event type. ASCII characters only. - key (str): The event type. ASCII characters only. - topic (str): The event type. ASCII characters only. + - `payload` (JSON): The payload to ship with the event. + - `type_` (str): The event type. ASCII characters only. + - `key` (str): The event key. ASCII characters only. + - `topic` (str): The event topic. ASCII characters only. + - `headers`: Additional headers to attach to the event. """ try: correlation_id = get_correlation_id() @@ -171,10 +181,21 @@ async def _publish_validated( validate_correlation_id(correlation_id) - event_headers = [ - ("type", type_.encode("ascii")), - ("correlation_id", correlation_id.encode("ascii")), - ] + # Create a shallow copy of the headers + headers_copy = dict(headers) + + # Check and log warnings for reserved headers + for header in RESERVED_HEADERS: + log_msg = ( + f"The '{header}' header shouldn't be supplied, but was. Overwriting." + ) + if header in headers_copy: + logging.warning(log_msg, extra={header: headers_copy[header]}) + + headers_copy["type"] = type_ + headers_copy["correlation_id"] = correlation_id + encoded_headers_list = [(k, v.encode("ascii")) for k, v in headers_copy.items()] + await self._producer.send_and_wait( - topic, key=key, value=payload, headers=event_headers + topic, key=key, value=payload, headers=encoded_headers_list ) diff --git a/src/hexkit/providers/akafka/provider/eventsub.py b/src/hexkit/providers/akafka/provider/eventsub.py index 787e16ab..3f312bca 100644 --- a/src/hexkit/providers/akafka/provider/eventsub.py +++ b/src/hexkit/providers/akafka/provider/eventsub.py @@ -21,17 +21,21 @@ Require dependencies of the `akafka` extra. See the `setup.cfg`. """ +import asyncio import json import logging import ssl +from collections.abc import Awaitable from contextlib import asynccontextmanager -from typing import Callable, Literal, Optional, Protocol, TypeVar +from dataclasses import dataclass +from typing import Callable, Literal, Optional, Protocol, TypeVar, cast from aiokafka import AIOKafkaConsumer from hexkit.base import InboundProviderBase from hexkit.correlation import set_correlation_id from hexkit.custom_types import Ascii, JsonObject +from hexkit.protocols.eventpub import EventPublisherProtocol from hexkit.protocols.eventsub import EventSubscriberProtocol from hexkit.providers.akafka.config import KafkaConfig from hexkit.providers.akafka.provider.utils import ( @@ -39,13 +43,7 @@ generate_ssl_context, ) - -class EventHeaderNotFoundError(RuntimeError): - """Thrown when a given detail was not set in the headers of an event.""" - - def __init__(self, *, header_name): - message = f"No '{header_name}' was set in event header." - super().__init__(message) +ORIGINAL_TOPIC_FIELD = "original_topic" class ConsumerEvent(Protocol): @@ -59,19 +57,47 @@ class ConsumerEvent(Protocol): offset: int +@dataclass +class ExtractedEventInfo: + """A class encapsulating the data extracted from a `ConsumerEvent`-like object. + + This data includes the topic, type, payload, and key of the event. + """ + + topic: Ascii + type_: Ascii + payload: JsonObject + key: Ascii + headers: dict[str, str] + + def __init__(self, event: Optional[ConsumerEvent] = None, **kwargs): + """Initialize an instance of ExtractedEventInfo.""" + self.topic = kwargs.get("topic", event.topic if event else "") + self.payload = kwargs.get("payload", event.value if event else "") + self.key = kwargs.get("key", event.key if event else "") + self.headers = kwargs.get("headers", headers_as_dict(event) if event else {}) + self.headers = cast(dict, self.headers) + self.type_ = kwargs.get("type_", self.headers.get("type", "")) + + @property + def encoded_headers(self) -> list[tuple[str, bytes]]: + """Return the headers as a list of 2-tuples with the values encoded as bytes.""" + return [(name, value.encode("ascii")) for name, value in self.headers.items()] + + +def get_event_label(event: ConsumerEvent) -> str: + """Make a label that identifies an event.""" + return ( + f"{event.topic} - {event.partition} - {event.key} - {event.offset}" + + " (topic-partition-key-offset)" + ) + + def headers_as_dict(event: ConsumerEvent) -> dict[str, str]: """Extract the headers from a ConsumerEvent object and return them as a dict.""" return {name: value.decode("ascii") for name, value in event.headers} -def get_header_value(header_name: str, headers: dict[str, str]) -> str: - """Extract the given value from the dict headers and raise an error if not found.""" - try: - return headers[header_name] - except KeyError as err: - raise EventHeaderNotFoundError(header_name=header_name) from err - - KCC = TypeVar("KCC") @@ -138,6 +164,23 @@ async def __anext__(self) -> ConsumerEvent: class KafkaEventSubscriber(InboundProviderBase): """Apache Kafka-specific event subscription provider.""" + class RetriesExhaustedError(RuntimeError): + """Raised when an event has been retried the maximum number of times.""" + + def __init__(self, *, event_type: str, max_retries: int): + msg = f"All retries (total of {max_retries}) exhausted for '{event_type}' event." + super().__init__(msg) + + class RetriesLeftError(ValueError): + """Raised when the value for `retries_left` is invalid.""" + + def __init__(self, *, retries_left: int, max_retries: int): + msg = ( + f"Invalid value for retries_left: {retries_left} (should be between" + + f" 1 and {max_retries}, inclusive)." + ) + super().__init__(msg) + @classmethod @asynccontextmanager async def construct( @@ -146,9 +189,10 @@ async def construct( config: KafkaConfig, translator: EventSubscriberProtocol, kafka_consumer_cls: type[KafkaConsumerCompatible] = AIOKafkaConsumer, + dlq_publisher: Optional[EventPublisherProtocol] = None, ): """ - Setup and teardown KafkaEventPublisher instance with some config params. + Setup and teardown KafkaEventSubscriber instance with some config params. Args: config: @@ -157,6 +201,10 @@ async def construct( The translator that translates between the protocol (mentioned in the type annotation) and an application-specific port (according to the triple hexagonal architecture). + dlq_publisher: + A running instance of a publishing provider that implements the + EventPublisherProtocol, such as KafkaEventPublisher. Can be None if + not using the dead letter queue. It is used to publish events to the DLQ. kafka_consumer_cls: Overwrite the used Kafka consumer class. Only intended for unit testing. """ @@ -166,6 +214,13 @@ async def construct( topics = translator.topics_of_interest + if config.kafka_enable_dlq: + if dlq_publisher is None: + error = ValueError("A publisher is required when the DLQ is enabled.") + logging.error(error) + raise error + topics.append(config.kafka_retry_topic) + consumer = kafka_consumer_cls( *topics, bootstrap_servers=",".join(config.kafka_servers), @@ -181,76 +236,222 @@ async def construct( ), max_partition_fetch_bytes=config.kafka_max_message_size, ) + + await consumer.start() try: - await consumer.start() - yield cls(consumer=consumer, translator=translator) + yield cls( + consumer=consumer, + translator=translator, + dlq_publisher=dlq_publisher, + config=config, + ) finally: await consumer.stop() def __init__( - self, *, consumer: KafkaConsumerCompatible, translator: EventSubscriberProtocol + self, + *, + consumer: KafkaConsumerCompatible, + translator: EventSubscriberProtocol, + config: KafkaConfig, + dlq_publisher: Optional[EventPublisherProtocol] = None, ): """Please do not call directly! Should be called by the `construct` method. Args: consumer: - hands over a started AIOKafkaProducer. + hands over a started AIOKafkaConsumer. translator (EventSubscriberProtocol): The translator that translates between the protocol (mentioned in the type annotation) and an application-specific port (according to the triple hexagonal architecture). + dlq_publisher: + A running instance of a publishing provider that implements the + EventPublisherProtocol, such as KafkaEventPublisher. Can be None if + not using the dead letter queue. It is used to publish events to the DLQ. + config: + The KafkaConfig instance """ self._consumer = consumer self._translator = translator self._types_whitelist = translator.types_of_interest - - @staticmethod - def _get_event_label(event: ConsumerEvent) -> str: - """Get a label that identifies an event.""" - return ( - f"{event.topic} - {event.partition} - {event.key} - {event.offset}" - + " (topic-partition-offset)" + self._dlq_publisher = dlq_publisher + self._dlq_topic = config.kafka_dlq_topic + self._retry_topic = config.kafka_retry_topic + self._max_retries = config.kafka_max_retries + self._enable_dlq = config.kafka_enable_dlq + self._retry_backoff = config.kafka_retry_backoff + + async def _publish_to_dlq(self, *, event: ExtractedEventInfo): + """Publish the event to the DLQ topic.""" + logging.debug("About to publish an event to DLQ topic '%s'", self._dlq_topic) + await self._dlq_publisher.publish( # type: ignore + payload=event.payload, + type_=event.type_, + topic=self._dlq_topic, + key=event.key, + headers={ORIGINAL_TOPIC_FIELD: event.topic}, ) + logging.info("Published event to DLQ topic '%s'", self._dlq_topic) + + async def _retry_event(self, *, event: ExtractedEventInfo, retries_left: int): + """Retry the event until the maximum number of retries is reached. + + If the retry fails, this method is called again with `retries_left` decremented. + + Raises: + - `RetriesExhaustedError`: If all retries are exhausted without success. + - `RetriesLeftError`: If the value for `retries_left` is invalid. + """ + # Check if retries_left is valid. + if not 0 < retries_left <= self._max_retries: + error = self.RetriesLeftError( + retries_left=retries_left, max_retries=self._max_retries + ) + logging.error(error) + raise error + + # Decrement retries_left and calculate backoff time, then wait and retry. + retries_left -= 1 + retry_number = self._max_retries - retries_left + backoff_time = self._retry_backoff * 2 ** (retry_number - 1) + try: + logging.info( + "Retry %i of %i for event of type '%s' on topic '%s' with key '%s'," + + " beginning in %i seconds.", + retry_number, + self._max_retries, + event.type_, + event.topic, + event.key, + backoff_time, + ) + await asyncio.sleep(backoff_time) + await self._translator.consume( + payload=event.payload, + type_=event.type_, + topic=event.topic, + key=event.key, + ) + except Exception as err: + if retries_left > 0: + await self._retry_event(event=event, retries_left=retries_left) + else: + raise self.RetriesExhaustedError( + event_type=event.type_, max_retries=self._max_retries + ) from err + + async def _handle_consumption(self, *, event: ExtractedEventInfo): + """Try to pass the event to the consumer. + + If the event fails: + 1. Retry until retries are exhausted, if retries are configured. + 2. Publish the event to the DLQ topic if the DLQ is enabled. Done afterward. + or + 3. Allow failure with unhandled error if DLQ is not configured. + """ + try: + await self._translator.consume( + payload=event.payload, + type_=event.type_, + topic=event.topic, + key=event.key, + ) + except Exception as underlying_error: + logging.warning( + "Failed initial attempt to consume event of type '%s' on topic '%s' with key '%s'.", + event.type_, + event.topic, + event.key, + ) + + if not self._max_retries: + if not self._enable_dlq: + raise # re-raise Exception + await self._publish_to_dlq(event=event) + return + + # Don't raise RetriesExhaustedError unless retries are actually attempted + try: + await self._retry_event(event=event, retries_left=self._max_retries) + except (self.RetriesExhaustedError, self.RetriesLeftError) as retry_error: + # If the value for retries_left was invalid, we still want to handle it + # the same way as if all retries were exhausted. The separate error is + # for better traceability. + logging.warning(retry_error) + if not self._enable_dlq: + raise retry_error from underlying_error + await self._publish_to_dlq(event=event) + + def _extract_info(self, event: ConsumerEvent) -> ExtractedEventInfo: + """Validate the event, returning the extracted info.""" + event_info = ExtractedEventInfo(event) + if event_info.topic == self._retry_topic: + event_info.topic = event_info.headers.get(ORIGINAL_TOPIC_FIELD, "") + logging.info( + "Received previously failed event from topic '%s' for retry.", + event_info.topic, + ) + return event_info + + def _validate_extracted_info(self, event: ExtractedEventInfo): + """Extract and validate the event, returning the correlation ID and the extracted info.""" + correlation_id = event.headers.get("correlation_id", "") + errors = [] + if not event.type_: + errors.append("event type is empty") + elif event.type_ not in self._types_whitelist: + errors.append(f"event type '{event.type_}' is not in the whitelist") + if not correlation_id: + errors.append("correlation_id is empty") + if event.topic in (self._retry_topic, self._dlq_topic): + errors.append( + f"original_topic header cannot be {self._retry_topic} or" + + f" {self._dlq_topic}. Value: '{event.topic}'" + ) + elif not event.topic: + errors.append( + "topic is empty" + ) # only occurs if original_topic header is empty + if errors: + error = RuntimeError(", ".join(errors)) + raise error async def _consume_event(self, event: ConsumerEvent) -> None: """Consume an event by passing it down to the translator via the protocol.""" - event_label = self._get_event_label(event) - headers = headers_as_dict(event) + event_label = get_event_label(event) + event_info = self._extract_info(event) try: - type_ = get_header_value(header_name="type", headers=headers) - correlation_id = get_header_value( - header_name="correlation_id", headers=headers + self._validate_extracted_info(event_info) + except RuntimeError as err: + logging.info( + "Ignored event of type '%s': %s, errors: %s", + event_info.type_, + event_label, + str(err), ) - except EventHeaderNotFoundError as err: - logging.warning("Ignored an event: %s. %s", event_label, err.args[0]) - # acknowledge event receipt + # Always acknowledge event receipt for ignored events await self._consumer.commit() return - if type_ in self._types_whitelist: - logging.info('Consuming event of type "%s": %s', type_, event_label) - - try: - async with set_correlation_id(correlation_id): - # blocks until event processing is completed: - await self._translator.consume( - payload=event.value, - type_=type_, - topic=event.topic, - key=event.key, - ) - # acknowledge successfully processed event - await self._consumer.commit() - except Exception: - logging.error( - "A fatal error occurred while processing the event: %s", - event_label, - ) - raise - + try: + logging.info( + "Consuming event of type '%s': %s", event_info.type_, event_label + ) + correlation_id = event_info.headers["correlation_id"] + async with set_correlation_id(correlation_id): + await self._handle_consumption(event=event_info) + except Exception: + logging.critical( + "An error occurred while processing event of type '%s': %s. It was NOT" + " placed in the DLQ topic (%s)", + event_info.type_, + event_label, + self._dlq_topic if self._enable_dlq else "DLQ is disabled", + ) + raise else: - logging.info("Ignored event of type %s: %s", type_, event_label) - # acknowledge event receipt + # Only save consumed event offsets if it was successful or sent to DLQ await self._consumer.commit() async def run(self, forever: bool = True) -> None: @@ -266,3 +467,230 @@ async def run(self, forever: bool = True) -> None: else: event = await self._consumer.__anext__() await self._consume_event(event) + + +# Define function signature for the DLQ processor. +DLQEventProcessor = Callable[[ConsumerEvent], Awaitable[Optional[ExtractedEventInfo]]] + + +class DLQValidationError(RuntimeError): + """Raised when an event from the DLQ fails validation.""" + + def __init__(self, *, event: ConsumerEvent, reason: str): + msg = f"DLQ Event '{get_event_label(event)}' is invalid: {reason}" + super().__init__(msg) + + +class DLQProcessingError(RuntimeError): + """Raised when an error occurs while processing an event from the DLQ.""" + + def __init__(self, *, event: ConsumerEvent, reason: str): + msg = f"DLQ Event '{get_event_label(event)}' cannot be processed: {reason}" + super().__init__(msg) + + +def validate_dlq_headers(event: ConsumerEvent) -> None: + """Validate the headers that should be populated on every DLQ event. + + Raises: + - `DLQValidationError`: If any headers are determined to be invalid. + """ + headers = headers_as_dict(event) + expected_headers = ["type", "correlation_id", ORIGINAL_TOPIC_FIELD] + invalid_headers = [key for key in expected_headers if not headers.get(key)] + if invalid_headers: + error_msg = f"Missing or empty headers: {', '.join(invalid_headers)}" + raise DLQValidationError(event=event, reason=error_msg) + + +async def process_dlq_event(event: ConsumerEvent) -> Optional[ExtractedEventInfo]: + """ + Simple 'processing' function for a message from a dead-letter queue that + adheres to the DLQEventProcessor callable definition. + + Args: + - `event`: The event to process. + + Returns: + - `ConsumerEvent`: The unaltered event to publish to the retry topic. + - `None`: A signal to discard the event. + + Raises: + - `DLQValidationError`: If the event headers are invalid. + """ + validate_dlq_headers(event) + return ExtractedEventInfo(event) + + +class KafkaDLQSubscriber(InboundProviderBase): + """A kafka event subscriber that subscribes to the configured DLQ topic and either + discards each event or publishes it to the retry topic as instructed. + Further processing before requeuing is provided by a callable adhering to the + DLQEventProcessor definition. + """ + + @classmethod + @asynccontextmanager + async def construct( + cls, + *, + config: KafkaConfig, + dlq_publisher: EventPublisherProtocol, + process_dlq_event: DLQEventProcessor = process_dlq_event, + kafka_consumer_cls: type[KafkaConsumerCompatible] = AIOKafkaConsumer, + ): + """ + Setup and teardown KafkaDLQSubscriber instance. + + Args: + - `config`: + Config parameters needed for connecting to Apache Kafka. + - `dlq_publisher`: + A running instance of a publishing provider that implements the + EventPublisherProtocol, such as KafkaEventPublisher. It is used to publish + events to the configured retry topic. + - `kafka_consumer_cls`: + Overwrite the used Kafka consumer class. Only intended for unit testing. + - `process_dlq_event`: + An async callable adhering to the DLQEventProcessor definition that provides + validation and processing for events from the DLQ. It should return _either_ + the event to publish to the retry topic (which may be altered) or `None` to + discard the event. The `KafkaDLQSubscriber` will log and interpret + `DLQValidationError` as a signal to discard/ignore the event, and all other + errors will be re-raised as a `DLQProcessingError`. + """ + client_id = generate_client_id( + service_name=config.service_name, instance_id=config.service_instance_id + ) + + consumer = kafka_consumer_cls( + config.kafka_dlq_topic, + bootstrap_servers=",".join(config.kafka_servers), + security_protocol=config.kafka_security_protocol, + ssl_context=generate_ssl_context(config), + client_id=client_id, + group_id=config.service_name, + auto_offset_reset="earliest", + enable_auto_commit=False, + key_deserializer=lambda event_key: event_key.decode("ascii"), + value_deserializer=lambda event_value: json.loads( + event_value.decode("ascii") + ), + max_partition_fetch_bytes=config.kafka_max_message_size, + ) + + await consumer.start() + try: + yield cls( + dlq_topic=config.kafka_dlq_topic, + retry_topic=config.kafka_retry_topic, + consumer=consumer, + dlq_publisher=dlq_publisher, + process_dlq_event=process_dlq_event, + ) + finally: + await consumer.stop() + + def __init__( + self, + *, + dlq_topic: str, + retry_topic: str, + dlq_publisher: EventPublisherProtocol, + consumer: KafkaConsumerCompatible, + process_dlq_event: DLQEventProcessor, + ): + """Please do not call directly! Should be called by the `construct` method. + + Args: + - `consumer`: + hands over a started AIOKafkaConsumer. + - `dlq_publisher`: + A running instance of a publishing provider that implements the + EventPublisherProtocol, such as KafkaEventPublisher. + - `dlq_topic`: + The name of the topic used to store failed events, to which the + KafkaDLQSubscriber subscribes. + - `retry_topic`: + The name of the topic used to requeue failed events. + - `process_dlq_event`: + An async callable adhering to the DLQEventProcessor definition that provides + validation and processing for events from the DLQ. It should return _either_ + the event to publish to the retry topic (which may be altered) or `None` to + discard the event. The `KafkaDLQSubscriber` will log and interpret + `DLQValidationError` as a signal to discard/ignore the event, and all other + errors will be re-raised as a `DLQProcessingError`. + """ + self._consumer = consumer + self._publisher = dlq_publisher + self._dlq_topic = dlq_topic + self._retry_topic = retry_topic + self._process_dlq_event = process_dlq_event + + async def _publish_to_retry(self, *, event: ExtractedEventInfo) -> None: + """Publish the event to the retry topic.""" + correlation_id = event.headers["correlation_id"] + original_topic = event.headers[ORIGINAL_TOPIC_FIELD] + + async with set_correlation_id(correlation_id): + await self._publisher.publish( + payload=event.payload, + type_=event.type_, + key=event.key, + topic=self._retry_topic, + headers={ORIGINAL_TOPIC_FIELD: original_topic}, + ) + logging.info( + "Published an event with type '%s' to the retry topic '%s'", + event.type_, + self._retry_topic, + ) + + async def _handle_dlq_event(self, *, event: ConsumerEvent) -> None: + """Process an event from the dead-letter queue. + + The event is processed by `_process_dlq_event`, which validates the event + and determines whether to publish it to the retry topic or discard it. + """ + try: + event_to_publish = await self._process_dlq_event(event) + except DLQValidationError as err: + logging.error("Ignoring event from DLQ due to validation failure: %s", err) + return + + if event_to_publish: + await self._publish_to_retry(event=event_to_publish) + + async def _ignore_event(self, event: ConsumerEvent) -> None: + """Ignore the event, log it, and commit offsets""" + event_label = get_event_label(event) + logging.info( + "Ignoring event from DLQ topic '%s': %s", + self._dlq_topic, + event_label, + ) + await self._consumer.commit() + + async def run(self, ignore: bool = False) -> None: + """ + Handles one event and returns. + If `ignore` is True, the event will be ignored outright. + Otherwise, `_process_dlq_event` will be used to validate and determine what to + do with the event. + """ + event = await self._consumer.__anext__() + if ignore: + await self._ignore_event(event) + return + + try: + await self._handle_dlq_event(event=event) + await self._consumer.commit() + except Exception as exc: + error = DLQProcessingError(event=event, reason=str(exc)) + logging.critical( + "Failed to process event from DLQ topic '%s': '%s'", + self._dlq_topic, + exc, + ) + raise error from exc diff --git a/src/hexkit/providers/akafka/testutils.py b/src/hexkit/providers/akafka/testutils.py index b3d72001..16a63b1b 100644 --- a/src/hexkit/providers/akafka/testutils.py +++ b/src/hexkit/providers/akafka/testutils.py @@ -20,7 +20,7 @@ """ import json -from collections.abc import AsyncGenerator, Generator, Sequence +from collections.abc import AsyncGenerator, Generator, Mapping, Sequence from contextlib import asynccontextmanager from dataclasses import dataclass from functools import partial @@ -43,7 +43,6 @@ from hexkit.providers.akafka.provider import ( ConsumerEvent, KafkaEventPublisher, - get_header_value, headers_as_dict, ) from hexkit.providers.akafka.testcontainer import DEFAULT_IMAGE as KAFKA_IMAGE @@ -330,7 +329,7 @@ async def _get_events_since_start( recorded_events: list[RecordedEvent] = [] for raw_event in raw_events: headers = headers_as_dict(raw_event) - type_ = get_header_value("type", headers=headers) + type_ = headers.get("type", "") del headers["type"] recorded_event = RecordedEvent( @@ -407,10 +406,18 @@ def __init__( self.publisher = publisher async def publish_event( - self, *, payload: JsonObject, type_: Ascii, topic: Ascii, key: Ascii = "test" + self, + *, + payload: JsonObject, + type_: Ascii, + topic: Ascii, + key: Ascii = "test", + headers: Optional[Mapping[str, str]] = None, ) -> None: """A convenience method to publish a test event.""" - await self.publisher.publish(payload=payload, type_=type_, key=key, topic=topic) + await self.publisher.publish( + payload=payload, type_=type_, key=key, topic=topic, headers=headers + ) def record_events( self, *, in_topic: Ascii, capture_headers: bool = False diff --git a/src/hexkit/providers/s3/testutils/_utils.py b/src/hexkit/providers/s3/testutils/_utils.py index 63ed6a86..e8d0b961 100644 --- a/src/hexkit/providers/s3/testutils/_utils.py +++ b/src/hexkit/providers/s3/testutils/_utils.py @@ -44,7 +44,7 @@ class FileObject(BaseModel): bucket_id: str object_id: str - @computed_field # type: ignore [misc] + @computed_field # type: ignore [prop-decorator] @property def content(self) -> bytes: """Extract the content from the file at the provided path""" @@ -53,7 +53,7 @@ def content(self) -> bytes: with open(self.file_path, "rb") as file: return file.read() - @computed_field # type: ignore [misc] + @computed_field # type: ignore [prop-decorator] @property def md5(self) -> str: """Calculate the md5 hash of the content""" diff --git a/src/hexkit/providers/testing/eventpub.py b/src/hexkit/providers/testing/eventpub.py index 1b8bdd42..ad4bc8f1 100644 --- a/src/hexkit/providers/testing/eventpub.py +++ b/src/hexkit/providers/testing/eventpub.py @@ -21,6 +21,7 @@ """ from collections import defaultdict, deque +from collections.abc import Mapping from typing import NamedTuple, Optional from hexkit.custom_types import JsonObject @@ -77,7 +78,13 @@ def __init__(self, event_store: Optional[InMemEventStore] = None): self.event_store = event_store if event_store else InMemEventStore() async def _publish_validated( - self, *, payload: JsonObject, type_: str, key: str, topic: str + self, + *, + payload: JsonObject, + type_: str, + key: str, + topic: str, + headers: Mapping[str, str], ) -> None: """Publish an event with already validated topic and type. diff --git a/tests/fixtures/utils.py b/tests/fixtures/utils.py index a55fd084..7d89cb7d 100644 --- a/tests/fixtures/utils.py +++ b/tests/fixtures/utils.py @@ -18,6 +18,7 @@ import logging import os from pathlib import Path +from typing import Literal import pytest import yaml @@ -49,3 +50,59 @@ def root_logger_reset(): # reset level and handlers root.setLevel(original_level) root.handlers = root_handlers + + +LogLevel = Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "FATAL"] + + +def assert_logged( + level: LogLevel, + message: str, + records: list[logging.LogRecord], + parse: bool = True, +) -> str: + """Assert that a log message was logged with the given level and message. + + Args: + - `level`: The log level to check for. + - `message`: The message to check for. + - `records`: The log records to check (usually `caplog.records`) + - `parse`: Whether to parse the message with the arguments from the log record. + + If a match is found, the parsed message is returned (regardless of the value of + `parse`) and removed from the records list. + """ + for i, record in enumerate(records): + msg_to_inspect = str(record.msg) % record.args if parse else record.msg + if record.levelname == level and msg_to_inspect == message: + del records[i] + return msg_to_inspect if parse else str(record.msg) % record.args + else: + assert False, f"Log message not found: {level} - {message}" + + +def assert_not_logged( + level: LogLevel, + message: str, + records: list[logging.LogRecord], + parse: bool = True, +): + """Assert that a log message was not logged with the given level and message. + + Args: + - `level`: The log level to check for. + - `message`: The message to check for. + - `records`: The log records to check (usually `caplog.records`) + - `parse`: Whether to parse the message with the arguments from the log record. + """ + for record in records: + msg_to_inspect = str(record.msg) % record.args if parse else record.msg + if record.levelname == level and msg_to_inspect == message: + assert False, f"Log message found: {level} - {message}" + + +@pytest.fixture(name="caplog_debug") +def caplog_debug_fixture(caplog): + """Convenience fixture to set the log level of caplog to debug for a test.""" + caplog.set_level(logging.DEBUG) + yield caplog diff --git a/tests/integration/test_akafka.py b/tests/integration/test_akafka.py index c14d1325..7abb36b5 100644 --- a/tests/integration/test_akafka.py +++ b/tests/integration/test_akafka.py @@ -174,7 +174,7 @@ async def test_consumer_commit_mode(kafka: KafkaFixture): """Verify the consumer implementation behavior matches expectations.""" type_ = "test_type" topic = "test_topic" - type_ = "test_type" + partition = TopicPartition(topic, 0) error_message = "Consumer crashed successfully." diff --git a/tests/integration/test_correlation.py b/tests/integration/test_correlation.py index 0535f3d8..528ecdfa 100644 --- a/tests/integration/test_correlation.py +++ b/tests/integration/test_correlation.py @@ -174,7 +174,7 @@ async def test_context_var_setter(): (VALID_CORRELATION_ID, False, None), ("invalid", True, InvalidCorrelationIdError), ("invalid", False, None), - ("", True, InvalidCorrelationIdError), + ("", True, None), ("", False, None), ], ) diff --git a/tests/unit/test_dlqsub.py b/tests/unit/test_dlqsub.py new file mode 100644 index 00000000..c86f5ec6 --- /dev/null +++ b/tests/unit/test_dlqsub.py @@ -0,0 +1,701 @@ +# Copyright 2021 - 2024 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln +# for the German Human Genome-Phenome Archive (GHGA) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for the Dead Letter Queue (DLQ) event subscribers""" + +from collections.abc import Mapping +from contextlib import nullcontext +from copy import deepcopy +from typing import Optional + +import pytest +from pydantic import BaseModel + +from hexkit.correlation import new_correlation_id, set_correlation_id +from hexkit.custom_types import Ascii, JsonObject +from hexkit.protocols.daosub import DaoSubscriberProtocol +from hexkit.protocols.eventpub import EventPublisherProtocol +from hexkit.protocols.eventsub import EventSubscriberProtocol +from hexkit.providers.akafka import KafkaConfig +from hexkit.providers.akafka.provider.daosub import KafkaOutboxSubscriber +from hexkit.providers.akafka.provider.eventsub import ( + ORIGINAL_TOPIC_FIELD, + ConsumerEvent, + DLQProcessingError, + ExtractedEventInfo, + KafkaDLQSubscriber, + KafkaEventSubscriber, + headers_as_dict, +) +from hexkit.providers.akafka.testutils import ( # noqa: F401 + KafkaFixture, + kafka_container_fixture, + kafka_fixture, +) +from tests.fixtures.utils import ( + assert_logged, + assert_not_logged, + caplog_debug_fixture, # noqa: F401 +) + +TEST_EVENT = ExtractedEventInfo( + payload={"key": "value"}, + type_="test_type", + topic="test-topic", + key="key", +) + + +class OutboxDto(BaseModel): + """Dummy DTO model to use for outbox tests""" + + user_id: str + email: str + name: str + + +OUTBOX_EVENT_UPSERT = ExtractedEventInfo( + payload=OutboxDto( + user_id="123456", email="test@test.com", name="Test User" + ).model_dump(), + topic="users", + type_="upserted", + key="123456", +) + +OUTBOX_EVENT_DELETE = deepcopy(OUTBOX_EVENT_UPSERT) +OUTBOX_EVENT_DELETE.type_ = "deleted" + + +class FailSwitchTranslator(EventSubscriberProtocol): + """Event translator that can be set to fail or collect events.""" + + fail: bool + failures: list[ExtractedEventInfo] + successes: list[ExtractedEventInfo] + topics_of_interest: list[str] + types_of_interest: list[str] + + def __init__( + self, + *, + topics_of_interest: list[str], + types_of_interest: list[str], + fail: bool = False, + ): + self.fail = fail + self.successes = [] + self.failures = [] + self.topics_of_interest = topics_of_interest + self.types_of_interest = types_of_interest + + async def _consume_validated( + self, + *, + payload: JsonObject, + type_: str, + topic: str, + key: str, + ) -> None: + """Add event to failures or successful list depending on `fail`. + + Raises RuntimeError if `fail` is True. + """ + event = ExtractedEventInfo(payload=payload, type_=type_, topic=topic, key=key) + if self.fail: + self.failures.append(event) + raise RuntimeError("Destined to fail.") + self.successes.append(event) + + +class FailSwitchOutboxTranslator(DaoSubscriberProtocol): + """Translator for the outbox event.""" + + event_topic: str = "users" + dto_model: type[BaseModel] = OutboxDto + fail: bool = False + upsertions: list[ExtractedEventInfo] + deletions: list[str] + + def __init__(self, *, fail: bool = False) -> None: + self.upsertions = [] + self.deletions = [] + self.fail = fail + + async def changed(self, resource_id: str, update: OutboxDto) -> None: + """Dummy""" + self.upsertions.append( + ExtractedEventInfo( + payload=update.model_dump(), + type_="upserted", + topic=self.event_topic, + key=resource_id, + ) + ) + if self.fail: + raise RuntimeError("Destined to fail.") + + async def deleted(self, resource_id: str) -> None: + """Dummy""" + self.deletions.append(resource_id) + if self.fail: + raise RuntimeError("Destined to fail.") + + +class DummyPublisher(EventPublisherProtocol): + """Dummy class to intercept publishing""" + + published: list[ExtractedEventInfo] + + def __init__(self) -> None: + self.published = [] + + async def _publish_validated( + self, + *, + payload: JsonObject, + type_: Ascii, + key: Ascii, + topic: Ascii, + headers: Mapping[str, str], + ) -> None: + self.published.append( + ExtractedEventInfo( + payload=payload, type_=type_, topic=topic, key=key, headers=headers + ) + ) + + +def make_config( + kafka_config: Optional[KafkaConfig] = None, + *, + retry_topic: str = "retry", + dlq_topic: str = "dlq", + max_retries: int = 0, + enable_dlq: bool = True, + retry_backoff: int = 0, +) -> KafkaConfig: + """Convenience method to merge kafka fixture config with provided DLQ values.""" + return KafkaConfig( + service_name=getattr(kafka_config, "service_name", "test"), + service_instance_id=getattr(kafka_config, "service_instance_id", "test"), + kafka_servers=getattr(kafka_config, "kafka_servers", ["localhost:9092"]), + kafka_dlq_topic=dlq_topic, + kafka_retry_topic=retry_topic, + kafka_max_retries=max_retries, + kafka_enable_dlq=enable_dlq, + kafka_retry_backoff=retry_backoff, + ) + + +@pytest.mark.parametrize( + "retry_topic, dlq_topic, max_retries, enable_dlq, error", + [ + ("retry", "dlq", 0, True, False), + ("retry", "dlq", 1, True, False), + ("retry", "dlq", -1, True, True), + ("retry", "retry", 0, True, True), + ("retry", "retry", 0, False, True), + ("", "", 0, False, False), + ("", "dlq", 0, False, False), + ("retry", "dlq", 0, False, False), + ("retry", "", 0, True, True), + ("", "dlq", 0, True, True), + ("", "", 0, True, True), + ], +) +def test_config_validation( + retry_topic: str, + dlq_topic: str, + max_retries: int, + enable_dlq: bool, + error: bool, +): + """Test for config validation. + + Errors should occur: + 1. Anytime max_retries is < 0 + 2. If retry and DLQ topics are the same (non-empty) + 3. If the DLQ is enabled but the topics are not set (either or both) + """ + with pytest.raises(ValueError) if error else nullcontext(): + make_config( + retry_topic=retry_topic, + dlq_topic=dlq_topic, + max_retries=max_retries, + enable_dlq=enable_dlq, + ) + + +@pytest.mark.asyncio() +async def test_original_topic_is_preserved(kafka: KafkaFixture): + """Ensure the original topic is preserved when it reaches the DLQ subscriber and + when it comes back to the subscriber. + + Consume a failing event, send to DLQ, consume from DLQ, send to Retry, consume from + Retry, and check the original topic. + """ + config = make_config(kafka.config) + + # Publish test event + await kafka.publisher.publish(**vars(TEST_EVENT)) + + # Create dummy translator and set it to auto-fail, then run the Retry subscriber + translator = FailSwitchTranslator( + topics_of_interest=["test-topic"], types_of_interest=["test_type"], fail=True + ) + assert not translator.successes + async with KafkaEventSubscriber.construct( + config=config, translator=translator, dlq_publisher=kafka.publisher + ) as event_subscriber: + assert not translator.failures + await event_subscriber.run(forever=False) + + # Run the DLQ subscriber, telling it to publish the event to the retry topic + async with KafkaDLQSubscriber.construct( + config=config, dlq_publisher=kafka.publisher + ) as dlq_sub: + await dlq_sub.run() + + # Make sure the translator has nothing in the successes list, then run again + assert not translator.successes + translator.fail = False + await event_subscriber.run(forever=False) + + # Make sure the event received by the translator is identical to the original + # This means the original topic is preserved and `original_topic` is removed + assert translator.failures == [TEST_EVENT] + assert translator.successes == [TEST_EVENT] + + +@pytest.mark.asyncio() +async def test_invalid_retries_left(kafka: KafkaFixture, caplog_debug): + """Ensure that the proper error is raised when retries_left is invalid.""" + config = make_config(kafka.config, max_retries=2) + translator = FailSwitchTranslator( + topics_of_interest=["test-topic"], types_of_interest=["test_type"] + ) + dummy_publisher = DummyPublisher() + async with KafkaEventSubscriber.construct( + config=config, translator=translator, dlq_publisher=dummy_publisher + ) as retry_sub: + with pytest.raises(KafkaEventSubscriber.RetriesLeftError): + await retry_sub._retry_event(event=TEST_EVENT, retries_left=-1) + + with pytest.raises(KafkaEventSubscriber.RetriesLeftError): + await retry_sub._retry_event(event=TEST_EVENT, retries_left=3) + + assert_logged( + "ERROR", + "Invalid value for retries_left: -1 (should be between 1 and 2, inclusive).", + caplog_debug.records, + ) + assert_logged( + "ERROR", + "Invalid value for retries_left: 3 (should be between 1 and 2, inclusive).", + caplog_debug.records, + ) + + +@pytest.mark.parametrize("max_retries", [0, 1, 2]) +@pytest.mark.parametrize("enable_dlq", [True, False]) +@pytest.mark.asyncio() +async def test_retries_exhausted( + kafka: KafkaFixture, max_retries: int, enable_dlq: bool, caplog_debug +): + """Ensure the event is sent to the DLQ topic when the retries are exhausted if + the DLQ is enabled. If the DLQ is disabled, then the underlying error should be + raised. + """ + config = make_config( + kafka.config, max_retries=max_retries, enable_dlq=enable_dlq, retry_backoff=1 + ) + + # Publish test event + await kafka.publisher.publish(**vars(TEST_EVENT)) + + # Set up dummies and consume the event + dummy_publisher = DummyPublisher() + translator = FailSwitchTranslator( + topics_of_interest=["test-topic"], types_of_interest=["test_type"], fail=True + ) + async with KafkaEventSubscriber.construct( + config=config, translator=translator, dlq_publisher=dummy_publisher + ) as retry_sub: + with pytest.raises(RuntimeError) if not enable_dlq else nullcontext(): + await retry_sub.run(forever=False) + + # Verify that the event was retried "max_retries" times after initial failure (if any) + assert translator.failures == [TEST_EVENT] * (max_retries + 1) + + # Check for initial failure log + assert_logged( + "WARNING", + "Failed initial attempt to consume event of type 'test_type' on topic" + + " 'test-topic' with key 'key'.", + caplog_debug.records, + ) + + # Make sure we see the expected number of retry logs + for n in range(1, max_retries + 1): + backoff_time = config.kafka_retry_backoff * 2 ** (n - 1) + assert_logged( + "INFO", + f"Retry {n} of {max_retries} for event of type 'test_type' on topic" + + f" 'test-topic' with key 'key', beginning in {backoff_time} seconds.", + caplog_debug.records, + ) + + # Check for final retry-related log + retry_log = f"All retries (total of {max_retries}) exhausted for 'test_type' event." + if max_retries: + assert_logged("WARNING", retry_log, caplog_debug.records) + else: + assert_not_logged("WARNING", retry_log, caplog_debug.records) + + # Put together the expected event with the original topic field appended + failed_event = ExtractedEventInfo( + type_=TEST_EVENT.type_, + topic=config.kafka_dlq_topic, + key=TEST_EVENT.key, + payload=TEST_EVENT.payload, + headers={ORIGINAL_TOPIC_FIELD: "test-topic"}, + ) + + # Verify that the event was sent to the DLQ topic just once and that it has + # the original topic field appended + expected_published = [failed_event] if enable_dlq else [] + assert dummy_publisher.published == expected_published + if enable_dlq: + assert_logged( + "INFO", "Published event to DLQ topic 'dlq'", caplog_debug.records + ) + else: + parsed_log = assert_logged( + "CRITICAL", + "An error occurred while processing event of type '%s': %s. It was NOT" + + " placed in the DLQ topic (%s)", + caplog_debug.records, + parse=False, + ) + assert parsed_log.endswith("(DLQ is disabled)") + + +@pytest.mark.asyncio() +async def test_send_to_retry(kafka: KafkaFixture, caplog_debug): + """Ensure the event is sent to the retry topic when the DLQ subscriber is instructed + to do so. + """ + config = make_config(kafka.config) + + event_to_put_in_dlq = ExtractedEventInfo( + payload=TEST_EVENT.payload, + type_="test_type", + topic=config.kafka_dlq_topic, + key="123456", + headers={ORIGINAL_TOPIC_FIELD: "test-topic"}, + ) + + await kafka.publisher.publish(**vars(event_to_put_in_dlq)) + + # Set up dummies and consume the event with the DLQ Subscriber + dummy_publisher = DummyPublisher() + async with KafkaDLQSubscriber.construct( + config=config, dlq_publisher=dummy_publisher + ) as dlq_sub: + assert not dummy_publisher.published + await dlq_sub.run(ignore=False) + + assert_logged( + "INFO", + "Published an event with type 'test_type' to the retry topic 'retry'", + caplog_debug.records, + ) + + # Verify that the event was sent to the RETRY topic + event_to_put_in_dlq.topic = config.kafka_retry_topic + assert dummy_publisher.published == [event_to_put_in_dlq] + + +@pytest.mark.asyncio() +async def test_consume_retry_without_og_topic(kafka: KafkaFixture, caplog_debug): + """If the original topic is missing when consuming an event from the retry queue, + the event should be ignored and the offset committed. The information should be logged. + """ + config = make_config(kafka.config) + + event = ExtractedEventInfo( + payload={"test_id": "123456"}, + type_="test_type", + topic=config.kafka_retry_topic, + key="key", + ) + + # Publish that event directly to RETRY Topic, as if it had already been requeued + # the original topic header is intentionally not included here + await kafka.publisher.publish(**vars(event)) + + # Set up dummies and consume the event with the DLQ Subscriber + translator = FailSwitchTranslator( + topics_of_interest=["test-topic"], types_of_interest=["test_type"] + ) + async with KafkaEventSubscriber.construct( + config=config, translator=translator, dlq_publisher=kafka.publisher + ) as event_subscriber: + assert not translator.failures or translator.successes + + await event_subscriber.run(forever=False) + parsed_log = assert_logged( + "INFO", + "Ignored event of type '%s': %s, errors: %s", + caplog_debug.records, + parse=False, + ) + assert parsed_log.startswith("Ignored event of type 'test_type': retry") + assert parsed_log.endswith("errors: topic is empty") + + +@pytest.mark.asyncio() +async def test_dlq_subscriber_ignore(kafka: KafkaFixture, caplog_debug): + """Test what happens when a DLQ Subscriber is instructed to ignore an event.""" + config = make_config(kafka.config) + + # make an event without the original_topic field in the header + event = ExtractedEventInfo( + payload={"test_id": "123456"}, + type_="test_type", + topic=config.kafka_dlq_topic, + key="key", + ) + + # Publish that event directly to DLQ Topic, as if it had already failed + # the original topic header is not included here + await kafka.publisher.publish(**vars(event)) + + # Set up dummies and consume the event with the DLQ Subscriber + dummy_publisher = DummyPublisher() + async with KafkaDLQSubscriber.construct( + config=config, dlq_publisher=dummy_publisher + ) as dlq_sub: + assert not dummy_publisher.published + await dlq_sub.run(ignore=True) + + parsed_log = assert_logged( + "INFO", + "Ignoring event from DLQ topic '%s': %s", + caplog_debug.records, + parse=False, + ) + assert parsed_log.startswith("Ignoring event from DLQ topic 'dlq': dlq") + + # Assert that the event was not published to the retry topic + assert not dummy_publisher.published + + +@pytest.mark.asyncio() +async def test_no_retries_no_dlq_original_error(kafka: KafkaFixture, caplog_debug): + """Test that not using the DLQ and configuring 0 retries results in failures that + propagate the underlying error to the provider. + """ + config = make_config(kafka.config, enable_dlq=False) + + # publish the test event + await kafka.publisher.publish(**vars(TEST_EVENT)) + + translator = FailSwitchTranslator( + topics_of_interest=["test-topic"], types_of_interest=["test_type"], fail=True + ) + async with KafkaEventSubscriber.construct( + config=config, translator=translator, dlq_publisher=kafka.publisher + ) as retry_sub: + assert not translator.successes + with pytest.raises(RuntimeError, match="Destined to fail."): + await retry_sub.run(forever=False) + assert not translator.successes + assert translator.failures == [TEST_EVENT] + + parsed_log = assert_logged( + "CRITICAL", + message="An error occurred while processing event of type '%s':" + + " %s. It was NOT placed in the DLQ topic (%s)", + records=caplog_debug.records, + parse=False, + ) + assert parsed_log.startswith( + "An error occurred while processing event of type 'test_type':" + ) + assert parsed_log.endswith("(DLQ is disabled)") + + assert_not_logged( + "WARNING", + "All retries (total of 0) exhausted for 'test_type' event.", + caplog_debug.records, + ) + + +@pytest.mark.parametrize("event_type", ["upserted", "deleted"]) +@pytest.mark.asyncio() +async def test_outbox_with_dlq(kafka: KafkaFixture, event_type: str): + """Ensure that the DLQ lifecycle works with the KafkaOutboxSubscriber.""" + config = make_config(kafka.config) + + translator = FailSwitchOutboxTranslator(fail=True) + list_to_check = ( + translator.upsertions if event_type == "upserted" else translator.deletions + ) + + event = OUTBOX_EVENT_UPSERT if event_type == "upserted" else OUTBOX_EVENT_DELETE + + # publish the test event + await kafka.publisher.publish(**vars(event)) + + # Run the outbox subscriber and expect it to fail + async with KafkaOutboxSubscriber.construct( + config=config, dlq_publisher=kafka.publisher, translators=[translator] + ) as outbox_sub: + assert not list_to_check + await outbox_sub.run(forever=False) + assert list_to_check == [event] if event_type == "upserted" else [event.key] + + # Consume event from the DLQ topic, publish to retry topic + async with KafkaDLQSubscriber.construct( + config=config, dlq_publisher=kafka.publisher + ) as dlq_sub: + await dlq_sub.run() + + # Retry the event after clearing the list + list_to_check.clear() # type: ignore + translator.fail = False + assert not list_to_check + await outbox_sub.run(forever=False) + assert list_to_check == [event] if event_type == "upserted" else [event.key] + + +@pytest.mark.asyncio +async def test_kafka_event_subcriber_construction(caplog): + """Test construction of the KafkaEventSubscriber, ensuring an error is raised if + the DLQ is enabled but no provider is used. + """ + config = make_config() + translator = FailSwitchTranslator( + topics_of_interest=["test-topic"], types_of_interest=["test_type"] + ) + + with pytest.raises(ValueError): + async with KafkaEventSubscriber.construct(config=config, translator=translator): + assert False + + assert_logged( + "ERROR", + "A publisher is required when the DLQ is enabled.", + caplog.records, + ) + + +@pytest.mark.parametrize( + "validation_error", [True, False], ids=["validation_error", "no_validation_error"] +) +@pytest.mark.asyncio +async def test_default_dlq_processor( + kafka: KafkaFixture, caplog, validation_error: bool +): + """Verify that `process_dlq_event` behaves as expected. + + Assert that the event is republished unchanged or ignored. + """ + config = make_config(kafka.config) + + dlq_test_event = ExtractedEventInfo( + payload=TEST_EVENT.payload, + type_=TEST_EVENT.type_, + topic=config.kafka_dlq_topic, + key=TEST_EVENT.key, + headers={ORIGINAL_TOPIC_FIELD: "test-topic" if not validation_error else ""}, + ) + + # Publish test event directly to DLQ with chosen correlation ID OR ignored + correlation_id = new_correlation_id() + async with set_correlation_id(correlation_id): + await kafka.publish_event(**vars(dlq_test_event)) + + dummy_publisher = DummyPublisher() + async with KafkaDLQSubscriber.construct( + config=config, dlq_publisher=dummy_publisher + ) as dlq_sub: + assert not dummy_publisher.published + caplog.clear() + await dlq_sub.run() + assert dummy_publisher.published == [] if validation_error else [dlq_test_event] + + if validation_error: + assert len(caplog.records) > 0 # could be more, but should be at least 1 + log = caplog.records[0] + assert log.msg.startswith("Ignoring event from DLQ due to validation failure:") + + +@pytest.mark.parametrize( + "processing_error", [True, False], ids=["processing_error", "no_processing_error"] +) +@pytest.mark.asyncio +async def test_custom_dlq_processors(kafka: KafkaFixture, processing_error: bool): + """Test that a custom DLQ processor can be used with the KafkaDLQSubscriber.""" + + class CustomDLQProcessor: + hits: list[ConsumerEvent] + fail: bool + + def __init__(self): + self.hits = [] + self.fail = processing_error + + async def process(self, event: ConsumerEvent) -> Optional[ExtractedEventInfo]: + self.hits.append(event) + if self.fail: + raise RuntimeError("Destined to fail.") + return ExtractedEventInfo(event) + + config = make_config(kafka.config) + + # Publish test event directly to DLQ with chosen correlation ID + correlation_id = new_correlation_id() + async with set_correlation_id(correlation_id): + await kafka.publish_event( + payload=TEST_EVENT.payload, + type_=TEST_EVENT.type_, + topic=config.kafka_dlq_topic, + key=TEST_EVENT.key, + headers={ORIGINAL_TOPIC_FIELD: "test-topic"}, + ) + + # Create custom processor instance and consume with the KafkaDLQSubscriber + custom_processor = CustomDLQProcessor() + async with KafkaDLQSubscriber.construct( + config=config, + dlq_publisher=DummyPublisher(), + process_dlq_event=custom_processor.process, + ) as dlq_sub: + assert not custom_processor.hits + with pytest.raises(DLQProcessingError) if processing_error else nullcontext(): + await dlq_sub.run() + + # verify that the event was received processed by the custom processor + assert len(custom_processor.hits) + event = custom_processor.hits[0] + headers = headers_as_dict(event) + assert headers["type"] == TEST_EVENT.type_ + assert headers["correlation_id"] == correlation_id + assert headers[ORIGINAL_TOPIC_FIELD] == "test-topic" + assert event.value == TEST_EVENT.payload + assert event.topic == config.kafka_dlq_topic + assert event.key == TEST_EVENT.key diff --git a/tests/unit/test_eventpub.py b/tests/unit/test_eventpub.py index 14595246..150b06a6 100644 --- a/tests/unit/test_eventpub.py +++ b/tests/unit/test_eventpub.py @@ -31,7 +31,7 @@ class FakePublisher(EventPublisherProtocol): any logic. """ - async def _publish_validated(self, *, payload, type_, key, topic) -> None: + async def _publish_validated(self, *, payload, type_, key, topic, headers) -> None: pass