Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka DLQ #123

Merged
merged 35 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
ccd78e0
Expand KafkaConfig for DLQ
TheByronHimes Jul 25, 2024
af158f4
Add DLQ functionality
TheByronHimes Jul 25, 2024
2e52080
Remove reference to Self
TheByronHimes Jul 25, 2024
fe4bc4c
Fix weird formatting
TheByronHimes Jul 26, 2024
152e757
Add caplog record checking util functions
TheByronHimes Jul 29, 2024
20b5942
Improve pre-consume validation
TheByronHimes Jul 29, 2024
7d25ddf
Delete unused get_header_value & its error
TheByronHimes Jul 29, 2024
9a972df
Improve logging and documentation
TheByronHimes Jul 29, 2024
a83c73e
Update tests to check logs, add more cases, and consolidate
TheByronHimes Jul 29, 2024
3426999
Rename 'publisher' attr to 'dlq_publisher'
TheByronHimes Jul 30, 2024
f32b026
Add 'title=' to new config
TheByronHimes Jul 30, 2024
f8982b5
Change lowercase dlq to DLQ
TheByronHimes Jul 30, 2024
c411124
Make assorted documentation typo fixes
TheByronHimes Jul 30, 2024
4de40a6
Missed documentation
TheByronHimes Jul 30, 2024
26f2894
Add configurable exponential backoff for direct retries
TheByronHimes Jul 31, 2024
1c46a5e
Validate retries_left param in _retry_event()
TheByronHimes Jul 31, 2024
ee00b5e
Add headers to event pub protocol/move og topic to headers
TheByronHimes Aug 1, 2024
753af80
Adapt tests for updated protocol and dlq header changes
TheByronHimes Aug 1, 2024
65f2e71
Remove unused ignores
TheByronHimes Aug 1, 2024
2ca34e1
Make mypy happy again
TheByronHimes Aug 1, 2024
92b04e8
Add headers param to kafka fixture publish_event
TheByronHimes Aug 2, 2024
d03a158
Allow callbacks in KafkaDLQSubscriber
TheByronHimes Aug 5, 2024
b12e2b4
Make ExtractedEventInfo more useful
TheByronHimes Aug 5, 2024
e4b78c6
Remove redundant header check
TheByronHimes Aug 14, 2024
53b3fae
Fix some doc strings
TheByronHimes Aug 14, 2024
4d2b5c9
Move consumer.start() outside of try block
TheByronHimes Aug 14, 2024
e6271fc
Tweak error messages
TheByronHimes Aug 14, 2024
792763c
Simplify list comprehension in validate_dlq_headers
TheByronHimes Aug 14, 2024
b5ac74e
Add Dockerfile.debian to ignored list
TheByronHimes Aug 14, 2024
e817525
Use RESERVED_HEADERS const and remove lead underscore from og topic
TheByronHimes Aug 14, 2024
27edd4a
Add RetriesLeftError
TheByronHimes Aug 14, 2024
5f98b68
Update src/hexkit/providers/akafka/provider/eventsub.py
TheByronHimes Aug 14, 2024
389e061
Clarify an error message in _validate_extracted_info()
TheByronHimes Aug 23, 2024
da1b25f
Fix some wording issues
TheByronHimes Aug 26, 2024
51612c7
Fix a test and remove debug print statements
TheByronHimes Aug 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .template/mandatory_files.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ lock/requirements-dev.txt
lock/requirements.txt

Dockerfile
Dockerfile.debian
config_schema.json
example_config.yaml
LICENSE
Expand Down
1 change: 1 addition & 0 deletions .template/mandatory_files_ignore.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
scripts/script_utils/fastapi_app_location.py

Dockerfile
Dockerfile.debian
config_schema.json
example_config.yaml

Expand Down
2 changes: 1 addition & 1 deletion src/hexkit/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class ModSettings(settings):
model_config = SettingsConfigDict(frozen=True, env_prefix=f"{prefix}_")

@classmethod
def settings_customise_sources( # noqa: PLR0913
def settings_customise_sources(
cls,
settings_cls: type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
Expand Down
4 changes: 2 additions & 2 deletions src/hexkit/custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@


# A type indicating that a string should be ascii-compatible.
# Technically it is an alias for `str` so it only serves documention purposes.
# Technically it is an alias for `str` so it only serves documentation purposes.
Ascii = str


# A AsyncConstructable is a class with a async (class-)method `construct` that is used when
# asynchronous constuction/instantiation logic is needed (which cannot be handeled in
# asynchronous construction/instantiation logic is needed (which cannot be handled in
# a synchronous __init__ method).
# With the current typing features of Python, it seems not possible to correctly type
# a class with that signature.
Expand Down
45 changes: 34 additions & 11 deletions src/hexkit/protocols/eventpub.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
"""Protocol related to event publishing."""

from abc import ABC, abstractmethod
from collections.abc import Mapping
from typing import Optional

from hexkit.custom_types import Ascii, JsonObject
from hexkit.utils import check_ascii
Expand All @@ -26,31 +28,52 @@ class EventPublisherProtocol(ABC):
"""A protocol for publishing events to an event broker."""

async def publish(
self, *, payload: JsonObject, type_: Ascii, key: Ascii, topic: Ascii
self,
*,
payload: JsonObject,
type_: Ascii,
key: Ascii,
topic: Ascii,
headers: Optional[Mapping[str, str]] = None,
) -> None:
"""Publish an event.

Args:
payload (JSON): The payload to ship with the event.
type_ (str): The event type. ASCII characters only.
key (str): The event type. ASCII characters only.
topic (str): The event type. ASCII characters only.
- `payload` (JSON): The payload to ship with the event.
- `type_` (str): The event type. ASCII characters only.
- `key` (str): The event type. ASCII characters only.
- `topic` (str): The event type. ASCII characters only.
- `headers`: Additional headers to attach to the event.
"""
check_ascii(type_, key, topic)
if headers is None:
headers = {}

await self._publish_validated(
payload=payload, type_=type_, key=key, topic=topic
payload=payload,
type_=type_,
key=key,
topic=topic,
headers=headers,
)

@abstractmethod
async def _publish_validated(
self, *, payload: JsonObject, type_: Ascii, key: Ascii, topic: Ascii
self,
*,
payload: JsonObject,
type_: Ascii,
key: Ascii,
topic: Ascii,
headers: Mapping[str, str],
) -> None:
"""Publish an event with already validated topic and type.

Args:
payload (JSON): The payload to ship with the event.
type_ (str): The event type. ASCII characters only.
key (str): The event type. ASCII characters only.
topic (str): The event type. ASCII characters only.
- `payload` (JSON): The payload to ship with the event.
- `type_` (str): The event type. ASCII characters only.
- `key` (str): The event type. ASCII characters only.
- `topic` (str): The event type. ASCII characters only.
- `headers`: Additional headers to attach to the event.
"""
...
67 changes: 63 additions & 4 deletions src/hexkit/providers/akafka/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@

from typing import Literal

from pydantic import Field, PositiveInt, SecretStr
from pydantic import Field, NonNegativeInt, PositiveInt, SecretStr, model_validator
from pydantic_settings import BaseSettings


class KafkaConfig(BaseSettings):
"""Config parameters needed for connecting to Apache Kafka."""

service_name: str = Field(
...,
default=...,
examples=["my-cool-special-service"],
description="The name of the (micro-)service from which messages are published.",
)
service_instance_id: str = Field(
...,
default=...,
examples=["germany-bw-instance-001"],
description=(
"A string that uniquely identifies this instance across all instances of"
Expand All @@ -40,7 +40,7 @@ class KafkaConfig(BaseSettings):
),
)
kafka_servers: list[str] = Field(
...,
default=...,
examples=[["localhost:9092"]],
description="A list of connection strings to connect to Kafka bootstrap servers.",
)
Expand Down Expand Up @@ -82,3 +82,62 @@ class KafkaConfig(BaseSettings):
+ " services that have a need to send/receive larger messages should set this.",
examples=[1024 * 1024, 16 * 1024 * 1024],
)
kafka_dlq_topic: str = Field(
default="",
description="The name of the service-specific topic used for the dead letter queue.",
examples=["dcs-dlq", "ifrs-dlq", "mass-dlq"],
Cito marked this conversation as resolved.
Show resolved Hide resolved
title="Kafka DLQ Topic",
)
kafka_retry_topic: str = Field(
default="",
description=(
"The name of the service-specific topic used to retry previously failed events."
),
title="Kafka Retry Topic",
examples=["dcs-dlq-retry", "ifrs-dlq-retry", "mass-dlq-retry"],
)
kafka_max_retries: NonNegativeInt = Field(
default=0,
description=(
"The maximum number of times to immediately retry consuming an event upon"
+ " failure. Works independently of the dead letter queue."
),
title="Kafka Max Retries",
examples=[0, 1, 2, 3, 5],
)
kafka_enable_dlq: bool = Field(
default=False,
description=(
"A flag to toggle the dead letter queue. If set to False, the service will"
+ " crash upon exhausting retries instead of publishing events to the DLQ."
+ " If set to True, the service will publish events to the DLQ topic after"
+ " exhausting all retries, and both `kafka_dlq_topic` and"
+ " `kafka_retry_topic` must be set."
),
title="Kafka Enable DLQ",
examples=[True, False],
)
kafka_retry_backoff: NonNegativeInt = Field(
default=0,
description=(
"The number of seconds to wait before retrying a failed event. The backoff"
+ " time is doubled for each retry attempt."
),
title="Kafka Retry Backoff",
examples=[0, 1, 2, 3, 5],
)

@model_validator(mode="after")
def validate_retry_topic(self):
"""Ensure that the retry topic is not the same as the DLQ topic."""
if self.kafka_retry_topic and self.kafka_retry_topic == self.kafka_dlq_topic:
raise ValueError(
"kafka_retry_topic and kafka_dlq_topic cannot be the same."
)
if self.kafka_enable_dlq and not (
self.kafka_dlq_topic and self.kafka_retry_topic
):
raise ValueError(
"Both kafka_dlq_topic and kafka_retry_topic must be set when the DLQ is enabled."
)
return self
10 changes: 8 additions & 2 deletions src/hexkit/providers/akafka/provider/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,22 @@
from .eventpub import KafkaEventPublisher
from .eventsub import (
ConsumerEvent,
ExtractedEventInfo,
KafkaDLQSubscriber,
KafkaEventSubscriber,
get_header_value,
headers_as_dict,
process_dlq_event,
validate_dlq_headers,
)

__all__ = [
"KafkaEventPublisher",
"ExtractedEventInfo",
"KafkaEventSubscriber",
"ConsumerEvent",
"get_header_value",
"headers_as_dict",
"KafkaOutboxSubscriber",
"KafkaDLQSubscriber",
"process_dlq_event",
"validate_dlq_headers",
]
21 changes: 18 additions & 3 deletions src/hexkit/providers/akafka/provider/daosub.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import logging
from collections.abc import Sequence
from contextlib import asynccontextmanager
from typing import Optional

from aiokafka import AIOKafkaConsumer
from pydantic import ValidationError
Expand All @@ -31,6 +32,7 @@
DaoSubscriberProtocol,
DtoValidationError,
)
from hexkit.protocols.eventpub import EventPublisherProtocol
from hexkit.protocols.eventsub import EventSubscriberProtocol
from hexkit.providers.akafka.config import KafkaConfig
from hexkit.providers.akafka.provider.eventsub import (
Expand Down Expand Up @@ -85,8 +87,8 @@ async def _consume_validated(
dto = translator.dto_model.model_validate(payload)
except ValidationError as error:
message = (
f"The event of type {type_} on topic {topic} was not valid wrt. the"
+ " DTO model."
f"The event of type {type_} on topic {topic}"
+ " was not valid wrt. the DTO model."
)
logging.error(message)
raise DtoValidationError(message) from error
Expand All @@ -110,21 +112,34 @@ async def construct(
*,
config: KafkaConfig,
translators: Sequence[DaoSubscriberProtocol],
dlq_publisher: Optional[EventPublisherProtocol] = None,
kafka_consumer_cls: type[KafkaConsumerCompatible] = AIOKafkaConsumer,
):
"""Setup and teardown an instance of the provider.

Args:
config: MongoDB-specific config parameters.
- `config`: MongoDB-specific config parameters.
- `translators`: A sequence of translators implementing the
`DaoSubscriberProtocol`.
- `dlq_publisher`: An instance of the publisher to use for the DLQ. Can be None
if not using the dead letter queue. It is used to publish events to the DLQ.
- `kafka_consumer_cls`: The Kafka consumer class to use. Defaults to
`AIOKafkaConsumer`.

Returns:
An instance of the provider.
"""
translator_converter = TranslatorConverter(translators=translators)

if config.kafka_enable_dlq and dlq_publisher is None:
error = ValueError("A publisher is required when the DLQ is enabled.")
logging.error(error)
raise error

async with KafkaEventSubscriber.construct(
config=config,
translator=translator_converter,
dlq_publisher=dlq_publisher,
kafka_consumer_cls=kafka_consumer_cls,
) as event_subscriber:
yield cls(event_subscriber=event_subscriber)
Expand Down
41 changes: 31 additions & 10 deletions src/hexkit/providers/akafka/provider/eventpub.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import json
import logging
import ssl
from collections.abc import Mapping
from contextlib import asynccontextmanager
from typing import Any, Callable, Optional, Protocol

Expand All @@ -41,6 +42,8 @@
generate_ssl_context,
)

RESERVED_HEADERS = ["type", "correlation_id"]


class KafkaProducerCompatible(Protocol):
"""A python duck type protocol describing an AIOKafkaProducer or equivalent."""
Expand Down Expand Up @@ -150,15 +153,22 @@ def __init__(
self._generate_correlation_id = generate_correlation_id

async def _publish_validated(
self, *, payload: JsonObject, type_: Ascii, key: Ascii, topic: Ascii
self,
*,
payload: JsonObject,
type_: Ascii,
key: Ascii,
topic: Ascii,
headers: Mapping[str, str],
) -> None:
"""Publish an event with already validated topic and type.

Args:
payload (JSON): The payload to ship with the event.
type_ (str): The event type. ASCII characters only.
key (str): The event type. ASCII characters only.
topic (str): The event type. ASCII characters only.
- `payload` (JSON): The payload to ship with the event.
- `type_` (str): The event type. ASCII characters only.
- `key` (str): The event type. ASCII characters only.
- `topic` (str): The event type. ASCII characters only.
TheByronHimes marked this conversation as resolved.
Show resolved Hide resolved
- `headers`: Additional headers to attach to the event.
"""
try:
correlation_id = get_correlation_id()
Expand All @@ -171,10 +181,21 @@ async def _publish_validated(

validate_correlation_id(correlation_id)

event_headers = [
("type", type_.encode("ascii")),
("correlation_id", correlation_id.encode("ascii")),
]
# Create a shallow copy of the headers
headers_copy = dict(headers)

# Check and log warnings for reserved headers
for header in RESERVED_HEADERS:
log_msg = (
f"The '{header}' header shouldn't be supplied, but was. Overwriting."
)
if header in headers_copy:
logging.warning(log_msg, extra={header: headers_copy[header]})

headers_copy["type"] = type_
headers_copy["correlation_id"] = correlation_id
encoded_headers_list = [(k, v.encode("ascii")) for k, v in headers_copy.items()]

await self._producer.send_and_wait(
topic, key=key, value=payload, headers=event_headers
topic, key=key, value=payload, headers=encoded_headers_list
)
Loading