Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka DLQ #123

Merged
merged 35 commits into from
Aug 26, 2024
Merged

Kafka DLQ #123

merged 35 commits into from
Aug 26, 2024

Conversation

TheByronHimes
Copy link
Member

@TheByronHimes TheByronHimes commented Jul 29, 2024

UPDATE August 1, 2024: Services implementing the EventPublisherProtocol will have to add the headers parameter to _publish_validated. Currently, this only affects the auth-service, access-request-service, and metldata.

This PR adds a not-quite backwards-compatible way to handle (retry, dismiss) Kafka events that fail at some point while being processed by the translator. Some config options are added to manage this functionality, and in their default state they change nothing: events that fail will crash the service and be retried upon restart.

Changes introduced are summarized below:

New Config Fields:

  • kafka_dlq_topic: Name of the topic for events when they initially fail
  • kafka_retry_topic: Name of the topic for failed events that are to be requeued
  • kafka_max_retries: The number of times to retry a failed event before sending it to the DLQ topic, if enabled
  • kafka_enable_dlq: Whether or not to use the DLQ
  • kafka_retry_backoff: The number of seconds to wait between "immediate" retries. This value is doubled with each subsequent retry.

New Functionality:

  • Retry events after a pause. Configuring kafka_max_retries will set the number of times to directly retry a failed event after waiting kafka_retry_backoff seconds. The default number of retries is 0, which will propagate the error just like hexkit <4. If kafka_max_retries is greater than 0, final failure will result in a RetriesExhaustedError.
  • Publish failed events to a dead letter queue automatically. Set kafka_dlq_topic, kafka_retry_topic, and kafka_enable_dlq to use the DLQ functionality and publish failed events to the configured DLQ topic. The original topic name is preserved by appending it to the event payload headers in a field named _original_topic. When an event is consumed from the configured kafka_retry_topic, its headers will be inspected to find the original topic (so the service knows how to treat the event). For events in kafka_retry_topic, having no _original_topic header will get them ignored. The same goes for kafka_dlq_topic if using the default process_dlq_event function.
  • Ignore (discard) events in a DLQ topic or send them to a retry topic. The KafkaDLQSubscriber class is a dedicated provider that listens to the configured DLQ topic. When run, the event is sent to the configured retry topic or, if ignore=True is used, the event is simply ignored. This class does not have to be used by a particular service, and could be used by a dedicated DLQ triage service. Additionally, there is a process_dlq_event parameter that takes a callable. This callback is executed when ignore=False, and can be a simple standalone function or a method on a complex, configurable class instance. Through process_dlq_event, events can be modified or examined as needed before being republished or discarded.
  • Encapsulate the way we use ConsumerEvent details with ExtractedEventInfo (open to naming suggestions). The class will contain the type_, topic, key, payload, and dict-form decoded headers from ConsumerEvent, and can be instantiated both with kwargs and a ConsumerEvent-compatible object.

Note

The immediate retry and dlq mechanisms are independent. Events can be published to the dead letter queue with 0 retries, retried a few times and never sent to the DLQ, etc.

Adoption

For existing services, no changes are needed beyond adjusting the configuration parameters and adding a KafkaEventPublisher to the KafkaEventSubscriber.construct() or KafkaOutboxSubscriber.construct() calls (e.g. in the injection module). For deployment, at least one DLQ and one Retry topic will need to be established (maybe one each for all services is enough to begin with?)

To add the ability to move events from a DLQ topic to a Retry topic, there needs to be a way to invoke a KafkaDLQSubscriber, as illustrated below. We need to discuss how best to do this: a dedicated DLQ service? CLI command on each kafka-consuming service? Where applicable, events can be manually sent to a retry queue with Kafka UI as long as _original_topic is placed in the headers.

Example Usage for DLQ Subscriber and Process Function

from typing import Optional

from hexkit.providers.akafka.provider import (
    ConsumerEvent,
    ExtractedEventInfo,
    KafkaDLQSubscriber,
    KafkaEventPublisher,
    validate_dlq_headers,
)

from ifrs.config import Config

# This is an example of a callback function that can be passed to the KafkaDLQSubscriber
# Here it's very simple, but it could also be a complex function on a config-driven class.
async def fix_dlq_event(event: ConsumerEvent) -> Optional[ExtractedEventInfo]:
    """Update old events for config changes or pass event unchanged."""
    validate_dlq_headers(event)
    event_info = ExtractedEventInfo(event)
    if event.topic == "uploads" and event_info.headers["type"] == "outdated_type":
        event_info.headers["type"] = "new_type"
    return event_info

# This is an example of an entrypoint function that could be triggered for DLQ event resolution
async def resolve_dlq_event(ignore: bool = False):
    """Requeue or discard a DLQ event."""
    config = Config()
    async with (
        KafkaEventPublisher.construct(config=config) as publisher,
        KafkaDLQSubscriber.construct(
            config=config, dlq_publisher=publisher, process_dlq_event=fix_dlq_event
        ) as dlq_subscriber,
    ):
        await dlq_subscriber.run(ignore)

@coveralls
Copy link

coveralls commented Jul 29, 2024

Pull Request Test Coverage Report for Build 10561778057

Details

  • 218 of 224 (97.32%) changed or added relevant lines in 9 files are covered.
  • No unchanged relevant lines lost coverage.
  • Overall coverage increased (+0.4%) to 92.264%

Changes Missing Coverage Covered Lines Changed/Added Lines %
src/hexkit/providers/akafka/provider/eventpub.py 9 10 90.0%
src/hexkit/providers/akafka/provider/eventsub.py 182 184 98.91%
src/hexkit/providers/akafka/provider/daosub.py 3 6 50.0%
Totals Coverage Status
Change from base Build 10194968592: 0.4%
Covered Lines: 1944
Relevant Lines: 2107

💛 - Coveralls

@TheByronHimes TheByronHimes requested a review from Cito July 30, 2024 11:36
@TheByronHimes TheByronHimes marked this pull request as ready for review July 30, 2024 13:04
Copy link
Member

@Cito Cito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome work.

I have left some feedback below.

src/hexkit/providers/akafka/config.py Show resolved Hide resolved
src/hexkit/providers/akafka/provider/daosub.py Outdated Show resolved Hide resolved
src/hexkit/providers/akafka/provider/daosub.py Outdated Show resolved Hide resolved
src/hexkit/providers/akafka/provider/eventsub.py Outdated Show resolved Hide resolved
src/hexkit/providers/akafka/provider/eventsub.py Outdated Show resolved Hide resolved
tests/fixtures/utils.py Outdated Show resolved Hide resolved
tests/unit/test_dlqsub.py Outdated Show resolved Hide resolved
src/hexkit/providers/akafka/provider/eventsub.py Outdated Show resolved Hide resolved
src/hexkit/providers/akafka/provider/eventsub.py Outdated Show resolved Hide resolved
@TheByronHimes TheByronHimes force-pushed the kafka_retry_v2_GSI-845 branch from 791515b to beeae63 Compare July 31, 2024 13:48
@TheByronHimes TheByronHimes force-pushed the kafka_retry_v2_GSI-845 branch from beeae63 to 753af80 Compare August 1, 2024 08:42
@TheByronHimes TheByronHimes changed the base branch from main to v4 August 1, 2024 08:42
@TheByronHimes TheByronHimes requested a review from Cito August 5, 2024 12:29
@TheByronHimes TheByronHimes marked this pull request as draft August 5, 2024 16:14
@TheByronHimes TheByronHimes marked this pull request as ready for review August 6, 2024 07:25
Copy link
Member

@Cito Cito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry it took so long to review.

Just some minor suggestions.

src/hexkit/protocols/eventpub.py Outdated Show resolved Hide resolved
src/hexkit/providers/akafka/provider/eventpub.py Outdated Show resolved Hide resolved
tests/fixtures/utils.py Outdated Show resolved Hide resolved
src/hexkit/providers/akafka/provider/eventsub.py Outdated Show resolved Hide resolved
src/hexkit/providers/akafka/provider/eventsub.py Outdated Show resolved Hide resolved
src/hexkit/providers/akafka/provider/eventsub.py Outdated Show resolved Hide resolved
src/hexkit/providers/akafka/provider/eventsub.py Outdated Show resolved Hide resolved
src/hexkit/providers/akafka/provider/eventsub.py Outdated Show resolved Hide resolved
src/hexkit/providers/akafka/provider/eventsub.py Outdated Show resolved Hide resolved
src/hexkit/providers/akafka/provider/eventsub.py Outdated Show resolved Hide resolved
@TheByronHimes TheByronHimes requested a review from mephenor August 14, 2024 07:19
src/hexkit/providers/akafka/provider/eventsub.py Outdated Show resolved Hide resolved
Copy link
Member

@mephenor mephenor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice.
I encountered only some minor inconsistencies in the documentation, but nothing in the public facing part.

src/hexkit/providers/akafka/provider/eventpub.py Outdated Show resolved Hide resolved
src/hexkit/providers/akafka/provider/eventsub.py Outdated Show resolved Hide resolved
src/hexkit/providers/akafka/provider/eventsub.py Outdated Show resolved Hide resolved
src/hexkit/providers/akafka/provider/eventsub.py Outdated Show resolved Hide resolved
tests/unit/test_dlqsub.py Outdated Show resolved Hide resolved
tests/unit/test_dlqsub.py Outdated Show resolved Hide resolved
tests/unit/test_dlqsub.py Outdated Show resolved Hide resolved
@TheByronHimes TheByronHimes requested a review from mephenor August 26, 2024 15:38
@TheByronHimes TheByronHimes merged commit 9c11826 into v4 Aug 26, 2024
8 checks passed
@TheByronHimes TheByronHimes deleted the kafka_retry_v2_GSI-845 branch August 26, 2024 16:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants