Skip to content

Commit

Permalink
Kafka DLQ (#123)
Browse files Browse the repository at this point in the history
* Expand KafkaConfig for DLQ

* Add DLQ functionality

* Remove reference to Self

* Fix weird formatting

* Add caplog record checking util functions

* Improve pre-consume validation

Rename ExtractedEvent to ExtractedEventInfo

* Delete unused get_header_value & its error

Move OriginalTopicError into DLQ subscriber only

* Improve logging and documentation

* Update tests to check logs, add more cases, and consolidate

* Rename 'publisher' attr to 'dlq_publisher'

* Add 'title=' to new config

* Change lowercase dlq to DLQ

* Make assorted documentation typo fixes

* Missed documentation

* Add configurable exponential backoff for direct retries

* Validate retries_left param in _retry_event()

Improve error handling in _handle_consumption()

* Add headers to event pub protocol/move og topic to headers

* Adapt tests for updated protocol and dlq header changes

* Remove unused ignores

* Make mypy happy again

* Add headers param to kafka fixture publish_event

* Allow callbacks in KafkaDLQSubscriber

* Make ExtractedEventInfo more useful

* Remove redundant header check

Co-authored-by: Christoph Zwerschke <[email protected]>

* Fix some doc strings

* Move consumer.start() outside of try block

* Tweak error messages

* Simplify list comprehension in validate_dlq_headers

* Add Dockerfile.debian to ignored list

* Use RESERVED_HEADERS const and remove lead underscore from og topic

* Add RetriesLeftError

* Update src/hexkit/providers/akafka/provider/eventsub.py

Co-authored-by: Christoph Zwerschke <[email protected]>

* Clarify an error message in _validate_extracted_info()

* Fix some wording issues

* Fix a test and remove debug print statements

---------

Co-authored-by: Christoph Zwerschke <[email protected]>
  • Loading branch information
TheByronHimes and Cito authored Aug 26, 2024
1 parent fa59749 commit 9c11826
Show file tree
Hide file tree
Showing 18 changed files with 1,429 additions and 103 deletions.
1 change: 1 addition & 0 deletions .template/mandatory_files.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ lock/requirements-dev.txt
lock/requirements.txt

Dockerfile
Dockerfile.debian
config_schema.json
example_config.yaml
LICENSE
Expand Down
1 change: 1 addition & 0 deletions .template/mandatory_files_ignore.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
scripts/script_utils/fastapi_app_location.py

Dockerfile
Dockerfile.debian
config_schema.json
example_config.yaml

Expand Down
2 changes: 1 addition & 1 deletion src/hexkit/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class ModSettings(settings):
model_config = SettingsConfigDict(frozen=True, env_prefix=f"{prefix}_")

@classmethod
def settings_customise_sources( # noqa: PLR0913
def settings_customise_sources(
cls,
settings_cls: type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
Expand Down
4 changes: 2 additions & 2 deletions src/hexkit/custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@


# A type indicating that a string should be ascii-compatible.
# Technically it is an alias for `str` so it only serves documention purposes.
# Technically it is an alias for `str` so it only serves documentation purposes.
Ascii = str


# A AsyncConstructable is a class with a async (class-)method `construct` that is used when
# asynchronous constuction/instantiation logic is needed (which cannot be handeled in
# asynchronous construction/instantiation logic is needed (which cannot be handled in
# a synchronous __init__ method).
# With the current typing features of Python, it seems not possible to correctly type
# a class with that signature.
Expand Down
45 changes: 34 additions & 11 deletions src/hexkit/protocols/eventpub.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
"""Protocol related to event publishing."""

from abc import ABC, abstractmethod
from collections.abc import Mapping
from typing import Optional

from hexkit.custom_types import Ascii, JsonObject
from hexkit.utils import check_ascii
Expand All @@ -26,31 +28,52 @@ class EventPublisherProtocol(ABC):
"""A protocol for publishing events to an event broker."""

async def publish(
self, *, payload: JsonObject, type_: Ascii, key: Ascii, topic: Ascii
self,
*,
payload: JsonObject,
type_: Ascii,
key: Ascii,
topic: Ascii,
headers: Optional[Mapping[str, str]] = None,
) -> None:
"""Publish an event.
Args:
payload (JSON): The payload to ship with the event.
type_ (str): The event type. ASCII characters only.
key (str): The event type. ASCII characters only.
topic (str): The event type. ASCII characters only.
- `payload` (JSON): The payload to ship with the event.
- `type_` (str): The event type. ASCII characters only.
- `key` (str): The event type. ASCII characters only.
- `topic` (str): The event type. ASCII characters only.
- `headers`: Additional headers to attach to the event.
"""
check_ascii(type_, key, topic)
if headers is None:
headers = {}

await self._publish_validated(
payload=payload, type_=type_, key=key, topic=topic
payload=payload,
type_=type_,
key=key,
topic=topic,
headers=headers,
)

@abstractmethod
async def _publish_validated(
self, *, payload: JsonObject, type_: Ascii, key: Ascii, topic: Ascii
self,
*,
payload: JsonObject,
type_: Ascii,
key: Ascii,
topic: Ascii,
headers: Mapping[str, str],
) -> None:
"""Publish an event with already validated topic and type.
Args:
payload (JSON): The payload to ship with the event.
type_ (str): The event type. ASCII characters only.
key (str): The event type. ASCII characters only.
topic (str): The event type. ASCII characters only.
- `payload` (JSON): The payload to ship with the event.
- `type_` (str): The event type. ASCII characters only.
- `key` (str): The event type. ASCII characters only.
- `topic` (str): The event type. ASCII characters only.
- `headers`: Additional headers to attach to the event.
"""
...
67 changes: 63 additions & 4 deletions src/hexkit/providers/akafka/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@

from typing import Literal

from pydantic import Field, PositiveInt, SecretStr
from pydantic import Field, NonNegativeInt, PositiveInt, SecretStr, model_validator
from pydantic_settings import BaseSettings


class KafkaConfig(BaseSettings):
"""Config parameters needed for connecting to Apache Kafka."""

service_name: str = Field(
...,
default=...,
examples=["my-cool-special-service"],
description="The name of the (micro-)service from which messages are published.",
)
service_instance_id: str = Field(
...,
default=...,
examples=["germany-bw-instance-001"],
description=(
"A string that uniquely identifies this instance across all instances of"
Expand All @@ -40,7 +40,7 @@ class KafkaConfig(BaseSettings):
),
)
kafka_servers: list[str] = Field(
...,
default=...,
examples=[["localhost:9092"]],
description="A list of connection strings to connect to Kafka bootstrap servers.",
)
Expand Down Expand Up @@ -82,3 +82,62 @@ class KafkaConfig(BaseSettings):
+ " services that have a need to send/receive larger messages should set this.",
examples=[1024 * 1024, 16 * 1024 * 1024],
)
kafka_dlq_topic: str = Field(
default="",
description="The name of the service-specific topic used for the dead letter queue.",
examples=["dcs-dlq", "ifrs-dlq", "mass-dlq"],
title="Kafka DLQ Topic",
)
kafka_retry_topic: str = Field(
default="",
description=(
"The name of the service-specific topic used to retry previously failed events."
),
title="Kafka Retry Topic",
examples=["dcs-dlq-retry", "ifrs-dlq-retry", "mass-dlq-retry"],
)
kafka_max_retries: NonNegativeInt = Field(
default=0,
description=(
"The maximum number of times to immediately retry consuming an event upon"
+ " failure. Works independently of the dead letter queue."
),
title="Kafka Max Retries",
examples=[0, 1, 2, 3, 5],
)
kafka_enable_dlq: bool = Field(
default=False,
description=(
"A flag to toggle the dead letter queue. If set to False, the service will"
+ " crash upon exhausting retries instead of publishing events to the DLQ."
+ " If set to True, the service will publish events to the DLQ topic after"
+ " exhausting all retries, and both `kafka_dlq_topic` and"
+ " `kafka_retry_topic` must be set."
),
title="Kafka Enable DLQ",
examples=[True, False],
)
kafka_retry_backoff: NonNegativeInt = Field(
default=0,
description=(
"The number of seconds to wait before retrying a failed event. The backoff"
+ " time is doubled for each retry attempt."
),
title="Kafka Retry Backoff",
examples=[0, 1, 2, 3, 5],
)

@model_validator(mode="after")
def validate_retry_topic(self):
"""Ensure that the retry topic is not the same as the DLQ topic."""
if self.kafka_retry_topic and self.kafka_retry_topic == self.kafka_dlq_topic:
raise ValueError(
"kafka_retry_topic and kafka_dlq_topic cannot be the same."
)
if self.kafka_enable_dlq and not (
self.kafka_dlq_topic and self.kafka_retry_topic
):
raise ValueError(
"Both kafka_dlq_topic and kafka_retry_topic must be set when the DLQ is enabled."
)
return self
10 changes: 8 additions & 2 deletions src/hexkit/providers/akafka/provider/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,22 @@
from .eventpub import KafkaEventPublisher
from .eventsub import (
ConsumerEvent,
ExtractedEventInfo,
KafkaDLQSubscriber,
KafkaEventSubscriber,
get_header_value,
headers_as_dict,
process_dlq_event,
validate_dlq_headers,
)

__all__ = [
"KafkaEventPublisher",
"ExtractedEventInfo",
"KafkaEventSubscriber",
"ConsumerEvent",
"get_header_value",
"headers_as_dict",
"KafkaOutboxSubscriber",
"KafkaDLQSubscriber",
"process_dlq_event",
"validate_dlq_headers",
]
21 changes: 18 additions & 3 deletions src/hexkit/providers/akafka/provider/daosub.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import logging
from collections.abc import Sequence
from contextlib import asynccontextmanager
from typing import Optional

from aiokafka import AIOKafkaConsumer
from pydantic import ValidationError
Expand All @@ -31,6 +32,7 @@
DaoSubscriberProtocol,
DtoValidationError,
)
from hexkit.protocols.eventpub import EventPublisherProtocol
from hexkit.protocols.eventsub import EventSubscriberProtocol
from hexkit.providers.akafka.config import KafkaConfig
from hexkit.providers.akafka.provider.eventsub import (
Expand Down Expand Up @@ -85,8 +87,8 @@ async def _consume_validated(
dto = translator.dto_model.model_validate(payload)
except ValidationError as error:
message = (
f"The event of type {type_} on topic {topic} was not valid wrt. the"
+ " DTO model."
f"The event of type {type_} on topic {topic}"
+ " was not valid wrt. the DTO model."
)
logging.error(message)
raise DtoValidationError(message) from error
Expand All @@ -110,21 +112,34 @@ async def construct(
*,
config: KafkaConfig,
translators: Sequence[DaoSubscriberProtocol],
dlq_publisher: Optional[EventPublisherProtocol] = None,
kafka_consumer_cls: type[KafkaConsumerCompatible] = AIOKafkaConsumer,
):
"""Setup and teardown an instance of the provider.
Args:
config: MongoDB-specific config parameters.
- `config`: MongoDB-specific config parameters.
- `translators`: A sequence of translators implementing the
`DaoSubscriberProtocol`.
- `dlq_publisher`: An instance of the publisher to use for the DLQ. Can be None
if not using the dead letter queue. It is used to publish events to the DLQ.
- `kafka_consumer_cls`: The Kafka consumer class to use. Defaults to
`AIOKafkaConsumer`.
Returns:
An instance of the provider.
"""
translator_converter = TranslatorConverter(translators=translators)

if config.kafka_enable_dlq and dlq_publisher is None:
error = ValueError("A publisher is required when the DLQ is enabled.")
logging.error(error)
raise error

async with KafkaEventSubscriber.construct(
config=config,
translator=translator_converter,
dlq_publisher=dlq_publisher,
kafka_consumer_cls=kafka_consumer_cls,
) as event_subscriber:
yield cls(event_subscriber=event_subscriber)
Expand Down
41 changes: 31 additions & 10 deletions src/hexkit/providers/akafka/provider/eventpub.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import json
import logging
import ssl
from collections.abc import Mapping
from contextlib import asynccontextmanager
from typing import Any, Callable, Optional, Protocol

Expand All @@ -41,6 +42,8 @@
generate_ssl_context,
)

RESERVED_HEADERS = ["type", "correlation_id"]


class KafkaProducerCompatible(Protocol):
"""A python duck type protocol describing an AIOKafkaProducer or equivalent."""
Expand Down Expand Up @@ -150,15 +153,22 @@ def __init__(
self._generate_correlation_id = generate_correlation_id

async def _publish_validated(
self, *, payload: JsonObject, type_: Ascii, key: Ascii, topic: Ascii
self,
*,
payload: JsonObject,
type_: Ascii,
key: Ascii,
topic: Ascii,
headers: Mapping[str, str],
) -> None:
"""Publish an event with already validated topic and type.
Args:
payload (JSON): The payload to ship with the event.
type_ (str): The event type. ASCII characters only.
key (str): The event type. ASCII characters only.
topic (str): The event type. ASCII characters only.
- `payload` (JSON): The payload to ship with the event.
- `type_` (str): The event type. ASCII characters only.
- `key` (str): The event key. ASCII characters only.
- `topic` (str): The event topic. ASCII characters only.
- `headers`: Additional headers to attach to the event.
"""
try:
correlation_id = get_correlation_id()
Expand All @@ -171,10 +181,21 @@ async def _publish_validated(

validate_correlation_id(correlation_id)

event_headers = [
("type", type_.encode("ascii")),
("correlation_id", correlation_id.encode("ascii")),
]
# Create a shallow copy of the headers
headers_copy = dict(headers)

# Check and log warnings for reserved headers
for header in RESERVED_HEADERS:
log_msg = (
f"The '{header}' header shouldn't be supplied, but was. Overwriting."
)
if header in headers_copy:
logging.warning(log_msg, extra={header: headers_copy[header]})

headers_copy["type"] = type_
headers_copy["correlation_id"] = correlation_id
encoded_headers_list = [(k, v.encode("ascii")) for k, v in headers_copy.items()]

await self._producer.send_and_wait(
topic, key=key, value=payload, headers=event_headers
topic, key=key, value=payload, headers=encoded_headers_list
)
Loading

0 comments on commit 9c11826

Please sign in to comment.