From f26e25e14acfbcae2b79aa5cff4ad35669fddcf9 Mon Sep 17 00:00:00 2001 From: Diogo Behrens Date: Tue, 19 Sep 2017 14:23:15 +0200 Subject: [PATCH] let producerBuilder set partitioner in sarama.Config --- kafka/consumer.go | 5 ++--- kafka/group_consumer.go | 32 -------------------------------- kafka/producer.go | 7 ++----- options.go | 5 +++-- 4 files changed, 7 insertions(+), 42 deletions(-) diff --git a/kafka/consumer.go b/kafka/consumer.go index 78b1473a..2dfae807 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -3,7 +3,7 @@ package kafka import ( "time" - "github.com/Shopify/sarama" + cluster "github.com/bsm/sarama-cluster" metrics "github.com/rcrowley/go-metrics" ) @@ -51,9 +51,8 @@ type saramaConsumer struct { } // NewSaramaConsumer creates a new Consumer using sarama -func NewSaramaConsumer(brokers []string, group string, registry metrics.Registry) (Consumer, error) { +func NewSaramaConsumer(brokers []string, group string, config *cluster.Config, registry metrics.Registry) (Consumer, error) { events := make(chan Event, defaultChannelBufferSize) - config := CreateDefaultKafkaConfig("whatever", sarama.OffsetOldest, registry) g, err := newGroupConsumer(brokers, group, events, config) if err != nil { diff --git a/kafka/group_consumer.go b/kafka/group_consumer.go index 8fe77df2..acb302e9 100644 --- a/kafka/group_consumer.go +++ b/kafka/group_consumer.go @@ -5,40 +5,8 @@ import ( "github.com/Shopify/sarama" cluster "github.com/bsm/sarama-cluster" - metrics "github.com/rcrowley/go-metrics" ) -// CreateDefaultKafkaConfig creates a (bsm) sarama configuration with default values. -func CreateDefaultKafkaConfig(clientID string, initialOffset int64, registry metrics.Registry) *cluster.Config { - config := cluster.NewConfig() - - config.Version = sarama.V0_10_1_0 - config.ClientID = clientID - config.ChannelBufferSize = defaultChannelBufferSize - - // consumer configuration - config.Consumer.Return.Errors = true - config.Consumer.Offsets.Initial = initialOffset - config.Consumer.MaxProcessingTime = defaultMaxProcessingTime - - // producer configuration - config.Producer.RequiredAcks = sarama.WaitForLocal - config.Producer.Compression = sarama.CompressionSnappy - config.Producer.Flush.Frequency = defaultFlushFrequency - config.Producer.Flush.Bytes = defaultFlushBytes - config.Producer.Return.Successes = true - config.Producer.Return.Errors = true - config.Producer.Retry.Max = defaultProducerMaxRetries - - // consumer group configuration - config.Group.Return.Notifications = true - - // register registry to get kafka metrics - config.Config.MetricRegistry = registry - - return config -} - type groupConsumer struct { brokers []string config *cluster.Config diff --git a/kafka/producer.go b/kafka/producer.go index 64a37ce2..87c7b87d 100644 --- a/kafka/producer.go +++ b/kafka/producer.go @@ -11,8 +11,6 @@ import ( // Producer abstracts the kafka producer type Producer interface { // Emit sends a message to topic. - // TODO (franz): this method should return a promise, instead of getting one. - // Otherwise a callback is sufficient Emit(topic string, key string, value []byte) *Promise Close() error } @@ -25,9 +23,8 @@ type producer struct { } // NewProducer creates new kafka producer for passed brokers. -func NewProducer(brokers []string, registry metrics.Registry, log logger.Logger) (Producer, error) { - config := CreateDefaultKafkaConfig("whatever", sarama.OffsetOldest, registry) - aprod, err := sarama.NewAsyncProducer(brokers, &config.Config) +func NewProducer(brokers []string, config *sarama.Config, registry metrics.Registry, log logger.Logger) (Producer, error) { + aprod, err := sarama.NewAsyncProducer(brokers, config) if err != nil { return nil, fmt.Errorf("Failed to start Sarama producer: %v", err) } diff --git a/options.go b/options.go index ee3b2672..ee08025c 100644 --- a/options.go +++ b/options.go @@ -69,6 +69,7 @@ type producerBuilder func(brokers []string, registry metrics.Registry) (kafka.Pr type topicmgrBuilder func(brokers []string) (kafka.TopicManager, error) func defaultConsumerBuilder(brokers []string, group string, registry metrics.Registry) (kafka.Consumer, error) { + config := kafka.CreateDefaultSaramaConfig("goka", registry) return kafka.NewSaramaConsumer(brokers, group, registry) } @@ -142,7 +143,7 @@ func WithTopicManager(tm kafka.TopicManager) ProcessorOption { } } -// WithConsumer replaces goka's default consumer. Mainly for testing. +// WithConsumer replaces goka's default consumer. func WithConsumer(c kafka.Consumer) ProcessorOption { return func(o *poptions) { o.builders.consumer = func(brokers []string, group string, registry metrics.Registry) (kafka.Consumer, error) { @@ -154,7 +155,7 @@ func WithConsumer(c kafka.Consumer) ProcessorOption { } } -// WithProducer replaces goka'S default producer. Mainly for testing. +// WithProducer replaces goka's default producer. func WithProducer(p kafka.Producer) ProcessorOption { return func(o *poptions) { o.builders.producer = func(brokers []string, registry metrics.Registry) (kafka.Producer, error) {