Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for cascading retries #35

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
5 changes: 5 additions & 0 deletions events/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package events

import (
"errors"

"github.com/Shopify/sarama"
"github.com/kelseyhightower/envconfig"
)

const DeadLetterSuffix = "-dead-letters"

// CloudEventsConfig describes the configuration for a consumer of cloud events from a Kafka topic
type CloudEventsConfig struct {
EventSource string `envconfig:"CLOUD_EVENTS_SOURCE" required:"true"`
KafkaBrokers []string `envconfig:"KAFKA_BROKERS" required:"true"`
Expand All @@ -19,6 +21,9 @@ type CloudEventsConfig struct {
KafkaVersion string `envconfig:"KAFKA_VERSION" required:"true"`
KafkaUsername string `envconfig:"KAFKA_USERNAME" required:"false"`
KafkaPassword string `envconfig:"KAFKA_PASSWORD" required:"false"`
KafkaDelay int `envconfig:"KAFKA_DELAY" default:"0"`
CascadeDelays []int `envconfig:"KAFKA_CASCADE_DELAYS" default:""`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think envocnfig supports parsing time.Duration out of the box, so it should be possible to use it directly, instead of int.

Should we provide sensible defaults?

CascadePattern string `envconfig:"KAFKA_CASCADE_PATTERN" default:"%s-delay-%d"`
SaramaConfig *sarama.Config
}

Expand Down
82 changes: 80 additions & 2 deletions events/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package events

import (
"context"
"fmt"
"log"
"sync"
"time"

"github.com/Shopify/sarama"
"github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"log"
"sync"
)

type SaramaConsumer struct {
Expand All @@ -19,6 +22,62 @@ type SaramaConsumer struct {
deadLetterProducer *KafkaCloudEventsProducer
}

//CascadingEventConsumer an event consumer that cascaded failures
type CascadingEventConsumer struct {
Consumers []EventConsumer
}

// NewCascadingCloudEventsConsumer create a cascading events consumer
func NewCascadingCloudEventsConsumer(config *CloudEventsConfig) (EventConsumer, error) {
topic := config.KafkaTopic
delay := config.KafkaDelay
var consumers []EventConsumer
for nextDelay := range config.CascadeDelays {
newconfig := *config
newconfig.KafkaDelay = delay
newconfig.KafkaTopic = topic
newconfig.KafkaDeadLettersTopic = fmt.Sprintf(config.CascadePattern, config.KafkaTopic, nextDelay)
consumer, err := NewSaramaCloudEventsConsumer(&newconfig)
if err != nil {
return nil, err
}
consumers = append(consumers, consumer)
topic = newconfig.KafkaDeadLettersTopic
delay = nextDelay
}
newconfig := *config
newconfig.KafkaDelay = delay
newconfig.KafkaTopic = topic
consumer, err := NewSaramaCloudEventsConsumer(&newconfig)
if err != nil {
return nil, err
}
consumers = append(consumers, consumer)

return &CascadingEventConsumer{
Consumers: consumers,
}, nil
}

//RegisterHandler registers the handler with all consumers in the cascade
func (c *CascadingEventConsumer) RegisterHandler(handler EventHandler) {
for _, consumer := range c.Consumers {
consumer.RegisterHandler(handler)
}
}

//Start starts an async consumer goproc (that may sleep)
func (c *CascadingEventConsumer) Start(ctx context.Context) error {
for _, consumer := range c.Consumers {
err := consumer.Start(ctx)
if err != nil {
return err
}
}
return nil
}

//NewSaramaCloudEventsConsumer creates a new cloud events consumer
func NewSaramaCloudEventsConsumer(config *CloudEventsConfig) (EventConsumer, error) {
if err := validateConsumerConfig(config); err != nil {
return nil, err
Expand All @@ -32,19 +91,36 @@ func NewSaramaCloudEventsConsumer(config *CloudEventsConfig) (EventConsumer, err
}, nil
}

//Setup marks a consumer to be ready
func (s *SaramaConsumer) Setup(session sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(s.ready)
return nil
}

//Cleanup frees any resources
func (s *SaramaConsumer) Cleanup(session sarama.ConsumerGroupSession) error {
return nil
}

//ConsumeClaim synchronously consumes all the messages in a claim, optionally delaying consumption a fixed amount of time
func (s *SaramaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
delay := time.Second * time.Duration(s.config.KafkaDelay)
for message := range claim.Messages() {
m := kafka_sarama.NewMessageFromConsumerMessage(message)
timestamp := message.Timestamp
scheduledTime := timestamp.Add(delay)
remaining := time.Now().Sub(scheduledTime)
if delay == 0 || remaining > 0 {
timer := time.NewTimer(remaining)
select {
case <-timer.C:
break
case <-session.Context().Done():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the context terminated when a rebalance happens?

timer.Stop()
return nil
}
}
// just ignore non-cloud event messages
if rs, rserr := binding.ToEvent(context.Background(), m); rserr == nil {
s.handleCloudEvent(*rs)
Expand Down Expand Up @@ -76,10 +152,12 @@ func (s *SaramaConsumer) sendToDeadLetterTopic(ce cloudevents.Event) {
}
}

//RegisterHandler register a handler to process events
func (s *SaramaConsumer) RegisterHandler(handler EventHandler) {
s.handlers = append(s.handlers, handler)
}

//Start starts an async consumer goproc (that may sleep)
func (s *SaramaConsumer) Start(ctx context.Context) error {
if err := s.initialize(); err != nil {
return err
Expand Down