From e9922a3106d577874d037a03e7de86e4e32a1a86 Mon Sep 17 00:00:00 2001 From: Diogo Behrens Date: Tue, 19 Sep 2017 14:23:15 +0200 Subject: [PATCH 1/2] let producerBuilder set partitioner in sarama.Config --- context_test.go | 3 +++ kafka/config.go | 42 ++++++++++++++++++++++++++++++ kafka/consumer.go | 5 ++-- kafka/group_consumer.go | 32 ----------------------- kafka/producer.go | 7 ++--- options.go | 57 ++++++++++++++++++++++++++++++++++++----- processor.go | 3 +-- processor_test.go | 8 +++--- view.go | 3 +-- view_test.go | 3 ++- 10 files changed, 107 insertions(+), 56 deletions(-) create mode 100644 kafka/config.go diff --git a/context_test.go b/context_test.go index d2b2df15..3ae5484d 100644 --- a/context_test.go +++ b/context_test.go @@ -405,6 +405,9 @@ func TestContext_Lookup(t *testing.T) { }, }, }, + opts: &voptions{ + hasher: DefaultHasher(), + }, }, }, } diff --git a/kafka/config.go b/kafka/config.go new file mode 100644 index 00000000..59ba1eb3 --- /dev/null +++ b/kafka/config.go @@ -0,0 +1,42 @@ +package kafka + +import ( + "github.com/Shopify/sarama" + cluster "github.com/bsm/sarama-cluster" + metrics "github.com/rcrowley/go-metrics" +) + +// CreateDefaultSaramaConfig creates a (bsm) sarama configuration with default values. +func CreateDefaultSaramaConfig(clientID string, partitioner sarama.PartitionerConstructor, registry metrics.Registry) *cluster.Config { + config := cluster.NewConfig() + + config.Version = sarama.V0_10_1_0 + config.ClientID = clientID + config.ChannelBufferSize = defaultChannelBufferSize + + // consumer configuration + config.Consumer.Return.Errors = true + config.Consumer.Offsets.Initial = sarama.OffsetOldest + config.Consumer.MaxProcessingTime = defaultMaxProcessingTime + + // producer configuration + config.Producer.RequiredAcks = sarama.WaitForLocal + config.Producer.Compression = sarama.CompressionSnappy + config.Producer.Flush.Frequency = defaultFlushFrequency + config.Producer.Flush.Bytes = defaultFlushBytes + config.Producer.Return.Successes = true + config.Producer.Return.Errors = true + config.Producer.Retry.Max = defaultProducerMaxRetries + + // consumer group configuration + config.Group.Return.Notifications = true + + // register registry to get kafka metrics + config.Config.MetricRegistry = registry + + // set partitioner + if partitioner != nil { + config.Producer.Partitioner = partitioner + } + return config +} diff --git a/kafka/consumer.go b/kafka/consumer.go index 78b1473a..2dfae807 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -3,7 +3,7 @@ package kafka import ( "time" - "github.com/Shopify/sarama" + cluster "github.com/bsm/sarama-cluster" metrics "github.com/rcrowley/go-metrics" ) @@ -51,9 +51,8 @@ type saramaConsumer struct { } // NewSaramaConsumer creates a new Consumer using sarama -func NewSaramaConsumer(brokers []string, group string, registry metrics.Registry) (Consumer, error) { +func NewSaramaConsumer(brokers []string, group string, config *cluster.Config, registry metrics.Registry) (Consumer, error) { events := make(chan Event, defaultChannelBufferSize) - config := CreateDefaultKafkaConfig("whatever", sarama.OffsetOldest, registry) g, err := newGroupConsumer(brokers, group, events, config) if err != nil { diff --git a/kafka/group_consumer.go b/kafka/group_consumer.go index 8fe77df2..acb302e9 100644 --- a/kafka/group_consumer.go +++ b/kafka/group_consumer.go @@ -5,40 +5,8 @@ import ( "github.com/Shopify/sarama" cluster "github.com/bsm/sarama-cluster" - metrics "github.com/rcrowley/go-metrics" ) -// CreateDefaultKafkaConfig creates a (bsm) sarama configuration with default values. -func CreateDefaultKafkaConfig(clientID string, initialOffset int64, registry metrics.Registry) *cluster.Config { - config := cluster.NewConfig() - - config.Version = sarama.V0_10_1_0 - config.ClientID = clientID - config.ChannelBufferSize = defaultChannelBufferSize - - // consumer configuration - config.Consumer.Return.Errors = true - config.Consumer.Offsets.Initial = initialOffset - config.Consumer.MaxProcessingTime = defaultMaxProcessingTime - - // producer configuration - config.Producer.RequiredAcks = sarama.WaitForLocal - config.Producer.Compression = sarama.CompressionSnappy - config.Producer.Flush.Frequency = defaultFlushFrequency - config.Producer.Flush.Bytes = defaultFlushBytes - config.Producer.Return.Successes = true - config.Producer.Return.Errors = true - config.Producer.Retry.Max = defaultProducerMaxRetries - - // consumer group configuration - config.Group.Return.Notifications = true - - // register registry to get kafka metrics - config.Config.MetricRegistry = registry - - return config -} - type groupConsumer struct { brokers []string config *cluster.Config diff --git a/kafka/producer.go b/kafka/producer.go index 64a37ce2..87c7b87d 100644 --- a/kafka/producer.go +++ b/kafka/producer.go @@ -11,8 +11,6 @@ import ( // Producer abstracts the kafka producer type Producer interface { // Emit sends a message to topic. - // TODO (franz): this method should return a promise, instead of getting one. - // Otherwise a callback is sufficient Emit(topic string, key string, value []byte) *Promise Close() error } @@ -25,9 +23,8 @@ type producer struct { } // NewProducer creates new kafka producer for passed brokers. -func NewProducer(brokers []string, registry metrics.Registry, log logger.Logger) (Producer, error) { - config := CreateDefaultKafkaConfig("whatever", sarama.OffsetOldest, registry) - aprod, err := sarama.NewAsyncProducer(brokers, &config.Config) +func NewProducer(brokers []string, config *sarama.Config, registry metrics.Registry, log logger.Logger) (Producer, error) { + aprod, err := sarama.NewAsyncProducer(brokers, config) if err != nil { return nil, fmt.Errorf("Failed to start Sarama producer: %v", err) } diff --git a/options.go b/options.go index ee3b2672..6378f12b 100644 --- a/options.go +++ b/options.go @@ -2,8 +2,11 @@ package goka import ( "fmt" + "hash" + "hash/fnv" "path/filepath" + "github.com/Shopify/sarama" "github.com/lovoo/goka/kafka" "github.com/lovoo/goka/logger" "github.com/lovoo/goka/storage" @@ -64,17 +67,28 @@ func DefaultStorageBuilder(path string) StorageBuilder { } } +// DefaultHasher returns an FNV hasher builder to assign keys to partitions. +func DefaultHasher() func() hash.Hash32 { + return func() hash.Hash32 { + return fnv.New32a() + } + +} + type consumerBuilder func(brokers []string, group string, registry metrics.Registry) (kafka.Consumer, error) type producerBuilder func(brokers []string, registry metrics.Registry) (kafka.Producer, error) type topicmgrBuilder func(brokers []string) (kafka.TopicManager, error) func defaultConsumerBuilder(brokers []string, group string, registry metrics.Registry) (kafka.Consumer, error) { - return kafka.NewSaramaConsumer(brokers, group, registry) + config := kafka.CreateDefaultSaramaConfig("goka", nil, registry) + return kafka.NewSaramaConsumer(brokers, group, config, registry) } -func defaultProducerBuilder(log logger.Logger) producerBuilder { +func defaultProducerBuilder(hasher func() hash.Hash32, log logger.Logger) producerBuilder { return func(brokers []string, registry metrics.Registry) (kafka.Producer, error) { - return kafka.NewProducer(brokers, registry, log) + partitioner := sarama.NewCustomHashPartitioner(hasher) + config := kafka.CreateDefaultSaramaConfig("goka", partitioner, registry) + return kafka.NewProducer(brokers, &config.Config, registry, log) } } @@ -97,6 +111,7 @@ type poptions struct { updateCallback UpdateCallback registry metrics.Registry partitionChannelSize int + hasher func() hash.Hash32 builders struct { storage StorageBuilder @@ -142,7 +157,7 @@ func WithTopicManager(tm kafka.TopicManager) ProcessorOption { } } -// WithConsumer replaces goka's default consumer. Mainly for testing. +// WithConsumer replaces goka's default consumer. func WithConsumer(c kafka.Consumer) ProcessorOption { return func(o *poptions) { o.builders.consumer = func(brokers []string, group string, registry metrics.Registry) (kafka.Consumer, error) { @@ -154,7 +169,7 @@ func WithConsumer(c kafka.Consumer) ProcessorOption { } } -// WithProducer replaces goka'S default producer. Mainly for testing. +// WithProducer replaces goka's default producer. func WithProducer(p kafka.Producer) ProcessorOption { return func(o *poptions) { o.builders.producer = func(brokers []string, registry metrics.Registry) (kafka.Producer, error) { @@ -201,6 +216,13 @@ func WithRegistry(r metrics.Registry) ProcessorOption { } } +// WithHasher sets the hash function that assigns keys to partitions. +func WithHasher(hasher func() hash.Hash32) ProcessorOption { + return func(o *poptions) { + o.hasher = hasher + } +} + func (opt *poptions) applyOptions(group string, opts ...ProcessorOption) error { opt.clientID = defaultClientID @@ -208,6 +230,10 @@ func (opt *poptions) applyOptions(group string, opts ...ProcessorOption) error { o(opt) } + if opt.hasher == nil { + opt.hasher = DefaultHasher() + } + // StorageBuilder should always be set as a default option in NewProcessor if opt.builders.storage == nil { return fmt.Errorf("StorageBuilder not set") @@ -216,7 +242,7 @@ func (opt *poptions) applyOptions(group string, opts ...ProcessorOption) error { opt.builders.consumer = defaultConsumerBuilder } if opt.builders.producer == nil { - opt.builders.producer = defaultProducerBuilder(opt.log) + opt.builders.producer = defaultProducerBuilder(opt.hasher, opt.log) } if opt.builders.topicmgr == nil { opt.builders.topicmgr = defaultTopicManagerBuilder @@ -246,6 +272,7 @@ type voptions struct { updateCallback UpdateCallback registry metrics.Registry partitionChannelSize int + hasher func() hash.Hash32 builders struct { storage StorageBuilder @@ -335,6 +362,10 @@ func (opt *voptions) applyOptions(topic Table, opts ...ViewOption) error { o(opt) } + if opt.hasher == nil { + opt.hasher = DefaultHasher() + } + // StorageBuilder should always be set as a default option in NewView if opt.builders.storage == nil { return fmt.Errorf("StorageBuilder not set") @@ -372,6 +403,7 @@ type eoptions struct { registry metrics.Registry codec Codec + hasher func() hash.Hash32 builders struct { topicmgr topicmgrBuilder @@ -428,6 +460,13 @@ func WithEmitterKafkaMetrics(registry metrics.Registry) EmitterOption { } } +// WithEmitterHasher sets the hash function that assigns keys to partitions. +func WithEmitterHasher(hasher func() hash.Hash32) EmitterOption { + return func(o *eoptions) { + o.hasher = hasher + } +} + func (opt *eoptions) applyOptions(opts ...EmitterOption) error { opt.clientID = defaultClientID opt.log = logger.Default() @@ -436,9 +475,13 @@ func (opt *eoptions) applyOptions(opts ...EmitterOption) error { o(opt) } + if opt.hasher == nil { + opt.hasher = DefaultHasher() + } + // config not set, use default one if opt.builders.producer == nil { - opt.builders.producer = defaultProducerBuilder(opt.log) + opt.builders.producer = defaultProducerBuilder(opt.hasher, opt.log) } if opt.builders.topicmgr == nil { opt.builders.topicmgr = defaultTopicManagerBuilder diff --git a/processor.go b/processor.go index b565ca96..1f3a0b4c 100644 --- a/processor.go +++ b/processor.go @@ -3,7 +3,6 @@ package goka import ( "errors" "fmt" - "hash/fnv" "runtime/debug" "sync" "time" @@ -257,7 +256,7 @@ func (g *Processor) hash(key string) (int32, error) { // create a new hasher every time. Alternative would be to store the hash in // view and every time reset the hasher (ie, hasher.Reset()). But that would // also require us to protect the access of the hasher with a mutex. - hasher := fnv.New32a() + hasher := g.opts.hasher() _, err := hasher.Write([]byte(key)) if err != nil { diff --git a/processor_test.go b/processor_test.go index 69238333..a6d24593 100644 --- a/processor_test.go +++ b/processor_test.go @@ -1010,12 +1010,12 @@ func TestProcessor_HasGet(t *testing.T) { } func TestProcessor_HasGetStateless(t *testing.T) { - p := &Processor{graph: DefineGroup(group)} + p := &Processor{graph: DefineGroup(group), opts: &poptions{hasher: DefaultHasher()}} _, err := p.Get("item1") ensure.NotNil(t, err) ensure.StringContains(t, err.Error(), "stateless processor") - p = &Processor{graph: DefineGroup(group, Persist(c))} + p = &Processor{graph: DefineGroup(group, Persist(c)), opts: &poptions{hasher: DefaultHasher()}} p.partitions = map[int32]*partition{ 0: new(partition), } @@ -1024,7 +1024,7 @@ func TestProcessor_HasGetStateless(t *testing.T) { ensure.NotNil(t, err) ensure.StringContains(t, err.Error(), "0 partitions") - p = &Processor{graph: DefineGroup(group, Persist(c))} + p = &Processor{graph: DefineGroup(group, Persist(c)), opts: &poptions{hasher: DefaultHasher()}} p.partitions = map[int32]*partition{ 0: new(partition), } @@ -1037,7 +1037,7 @@ func TestProcessor_HasGetStateless(t *testing.T) { defer ctrl.Finish() st := mock.NewMockStorage(ctrl) - p = &Processor{graph: DefineGroup(group, Persist(c))} + p = &Processor{graph: DefineGroup(group, Persist(c)), opts: &poptions{hasher: DefaultHasher()}} p.partitions = map[int32]*partition{ 0: &partition{log: logger.Default(), st: &storageProxy{Storage: st, partition: 0}}, } diff --git a/view.go b/view.go index b173c573..46ab4490 100644 --- a/view.go +++ b/view.go @@ -3,7 +3,6 @@ package goka import ( "errors" "fmt" - "hash/fnv" "sync" "github.com/lovoo/goka/kafka" @@ -188,7 +187,7 @@ func (v *View) hash(key string) (int32, error) { // create a new hasher every time. Alternative would be to store the hash in // view and every time reset the hasher (ie, hasher.Reset()). But that would // also require us to protect the access of the hasher with a mutex. - hasher := fnv.New32a() + hasher := v.opts.hasher() _, err := hasher.Write([]byte(key)) if err != nil { diff --git a/view_test.go b/view_test.go index 2a0d1657..04ecb50e 100644 --- a/view_test.go +++ b/view_test.go @@ -34,6 +34,7 @@ func createTestView(t *testing.T, consumer kafka.Consumer, sb StorageBuilder, tm }, registry: metrics.DefaultRegistry, gokaRegistry: metrics.DefaultRegistry, + hasher: DefaultHasher(), } opts.builders.storage = sb opts.builders.topicmgr = func(brokers []string) (kafka.TopicManager, error) { @@ -222,7 +223,7 @@ func TestView_StartStopWithError(t *testing.T) { } func TestView_GetErrors(t *testing.T) { - v := &View{} + v := &View{opts: &voptions{hasher: DefaultHasher()}} _, err := v.Get("hey") ensure.NotNil(t, err) From bdf9fdb4401890e7036a54569a2d9c6e835d9a1c Mon Sep 17 00:00:00 2001 From: Diogo Behrens Date: Tue, 19 Sep 2017 15:54:39 +0200 Subject: [PATCH 2/2] add missing view hasher option --- options.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/options.go b/options.go index 6378f12b..fc2c6a97 100644 --- a/options.go +++ b/options.go @@ -357,6 +357,13 @@ func WithViewPartitionChannelSize(size int) ViewOption { } } +// WithViewHasher sets the hash function that assigns keys to partitions. +func WithViewHasher(hasher func() hash.Hash32) ViewOption { + return func(o *voptions) { + o.hasher = hasher + } +} + func (opt *voptions) applyOptions(topic Table, opts ...ViewOption) error { for _, o := range opts { o(opt)