-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka DLQ #123
Kafka DLQ #123
Conversation
Pull Request Test Coverage Report for Build 10561778057Details
💛 - Coveralls |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome work.
I have left some feedback below.
791515b
to
beeae63
Compare
Rename ExtractedEvent to ExtractedEventInfo
Move OriginalTopicError into DLQ subscriber only
Improve error handling in _handle_consumption()
beeae63
to
753af80
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry it took so long to review.
Just some minor suggestions.
Co-authored-by: Christoph Zwerschke <[email protected]>
Co-authored-by: Christoph Zwerschke <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice.
I encountered only some minor inconsistencies in the documentation, but nothing in the public facing part.
This PR adds a not-quite backwards-compatible way to handle (retry, dismiss) Kafka events that fail at some point while being processed by the translator. Some config options are added to manage this functionality, and in their default state they change nothing: events that fail will crash the service and be retried upon restart.
Changes introduced are summarized below:
New Config Fields:
kafka_dlq_topic
: Name of the topic for events when they initially failkafka_retry_topic
: Name of the topic for failed events that are to be requeuedkafka_max_retries
: The number of times to retry a failed event before sending it to the DLQ topic, if enabledkafka_enable_dlq
: Whether or not to use the DLQkafka_retry_backoff
: The number of seconds to wait between "immediate" retries. This value is doubled with each subsequent retry.New Functionality:
kafka_max_retries
will set the number of times to directly retry a failed event after waitingkafka_retry_backoff
seconds. The default number of retries is 0, which will propagate the error just likehexkit <4
. Ifkafka_max_retries
is greater than 0, final failure will result in aRetriesExhaustedError
.kafka_dlq_topic
,kafka_retry_topic
, andkafka_enable_dlq
to use the DLQ functionality and publish failed events to the configured DLQ topic. The original topic name is preserved by appending it to the eventpayloadheaders in a field named_original_topic
. When an event is consumed from the configuredkafka_retry_topic
, its headers will be inspected to find the original topic (so the service knows how to treat the event). For events inkafka_retry_topic
, having no_original_topic
header will get them ignored. The same goes forkafka_dlq_topic
if using the defaultprocess_dlq_event
function.KafkaDLQSubscriber
class is a dedicated provider that listens to the configured DLQ topic. When run, the event is sent to the configured retry topic or, ifignore=True
is used, the event is simply ignored. This class does not have to be used by a particular service, and could be used by a dedicated DLQ triage service. Additionally, there is aprocess_dlq_event
parameter that takes a callable. This callback is executed whenignore=False
, and can be a simple standalone function or a method on a complex, configurable class instance. Throughprocess_dlq_event
, events can be modified or examined as needed before being republished or discarded.ConsumerEvent
details withExtractedEventInfo
(open to naming suggestions). The class will contain the type_, topic, key, payload, and dict-form decoded headers fromConsumerEvent
, and can be instantiated both with kwargs and aConsumerEvent
-compatible object.Note
The immediate retry and dlq mechanisms are independent. Events can be published to the dead letter queue with 0 retries, retried a few times and never sent to the DLQ, etc.
Adoption
For existing services, no changes are needed beyond adjusting the configuration parameters and adding a
KafkaEventPublisher
to theKafkaEventSubscriber.construct()
orKafkaOutboxSubscriber.construct()
calls (e.g. in the injection module). For deployment, at least one DLQ and one Retry topic will need to be established (maybe one each for all services is enough to begin with?)To add the ability to move events from a DLQ topic to a Retry topic, there needs to be a way to invoke a
KafkaDLQSubscriber
, as illustrated below. We need to discuss how best to do this: a dedicated DLQ service? CLI command on each kafka-consuming service? Where applicable, events can be manually sent to a retry queue with Kafka UI as long as_original_topic
is placed in the headers.Example Usage for DLQ Subscriber and Process Function