Skip to content

Commit

Permalink
Merge pull request #36 from lovoo/sarama-config
Browse files Browse the repository at this point in the history
let producerBuilder set partitioner in sarama.Config
  • Loading branch information
db7 authored Sep 20, 2017
2 parents 78edea6 + bdf9fdb commit 7f8de28
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 56 deletions.
3 changes: 3 additions & 0 deletions context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,9 @@ func TestContext_Lookup(t *testing.T) {
},
},
},
opts: &voptions{
hasher: DefaultHasher(),
},
},
},
}
Expand Down
42 changes: 42 additions & 0 deletions kafka/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package kafka

import (
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
metrics "github.com/rcrowley/go-metrics"
)

// CreateDefaultSaramaConfig creates a (bsm) sarama configuration with default values.
func CreateDefaultSaramaConfig(clientID string, partitioner sarama.PartitionerConstructor, registry metrics.Registry) *cluster.Config {
config := cluster.NewConfig()

config.Version = sarama.V0_10_1_0
config.ClientID = clientID
config.ChannelBufferSize = defaultChannelBufferSize

// consumer configuration
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.MaxProcessingTime = defaultMaxProcessingTime

// producer configuration
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Compression = sarama.CompressionSnappy
config.Producer.Flush.Frequency = defaultFlushFrequency
config.Producer.Flush.Bytes = defaultFlushBytes
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Producer.Retry.Max = defaultProducerMaxRetries

// consumer group configuration
config.Group.Return.Notifications = true

// register registry to get kafka metrics
config.Config.MetricRegistry = registry

// set partitioner
if partitioner != nil {
config.Producer.Partitioner = partitioner
}
return config
}
5 changes: 2 additions & 3 deletions kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package kafka
import (
"time"

"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
metrics "github.com/rcrowley/go-metrics"
)

Expand Down Expand Up @@ -51,9 +51,8 @@ type saramaConsumer struct {
}

// NewSaramaConsumer creates a new Consumer using sarama
func NewSaramaConsumer(brokers []string, group string, registry metrics.Registry) (Consumer, error) {
func NewSaramaConsumer(brokers []string, group string, config *cluster.Config, registry metrics.Registry) (Consumer, error) {
events := make(chan Event, defaultChannelBufferSize)
config := CreateDefaultKafkaConfig("whatever", sarama.OffsetOldest, registry)

g, err := newGroupConsumer(brokers, group, events, config)
if err != nil {
Expand Down
32 changes: 0 additions & 32 deletions kafka/group_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,8 @@ import (

"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
metrics "github.com/rcrowley/go-metrics"
)

// CreateDefaultKafkaConfig creates a (bsm) sarama configuration with default values.
func CreateDefaultKafkaConfig(clientID string, initialOffset int64, registry metrics.Registry) *cluster.Config {
config := cluster.NewConfig()

config.Version = sarama.V0_10_1_0
config.ClientID = clientID
config.ChannelBufferSize = defaultChannelBufferSize

// consumer configuration
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = initialOffset
config.Consumer.MaxProcessingTime = defaultMaxProcessingTime

// producer configuration
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Compression = sarama.CompressionSnappy
config.Producer.Flush.Frequency = defaultFlushFrequency
config.Producer.Flush.Bytes = defaultFlushBytes
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Producer.Retry.Max = defaultProducerMaxRetries

// consumer group configuration
config.Group.Return.Notifications = true

// register registry to get kafka metrics
config.Config.MetricRegistry = registry

return config
}

type groupConsumer struct {
brokers []string
config *cluster.Config
Expand Down
7 changes: 2 additions & 5 deletions kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
// Producer abstracts the kafka producer
type Producer interface {
// Emit sends a message to topic.
// TODO (franz): this method should return a promise, instead of getting one.
// Otherwise a callback is sufficient
Emit(topic string, key string, value []byte) *Promise
Close() error
}
Expand All @@ -25,9 +23,8 @@ type producer struct {
}

// NewProducer creates new kafka producer for passed brokers.
func NewProducer(brokers []string, registry metrics.Registry, log logger.Logger) (Producer, error) {
config := CreateDefaultKafkaConfig("whatever", sarama.OffsetOldest, registry)
aprod, err := sarama.NewAsyncProducer(brokers, &config.Config)
func NewProducer(brokers []string, config *sarama.Config, registry metrics.Registry, log logger.Logger) (Producer, error) {
aprod, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
return nil, fmt.Errorf("Failed to start Sarama producer: %v", err)
}
Expand Down
64 changes: 57 additions & 7 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package goka

import (
"fmt"
"hash"
"hash/fnv"
"path/filepath"

"github.com/Shopify/sarama"
"github.com/lovoo/goka/kafka"
"github.com/lovoo/goka/logger"
"github.com/lovoo/goka/storage"
Expand Down Expand Up @@ -64,17 +67,28 @@ func DefaultStorageBuilder(path string) StorageBuilder {
}
}

// DefaultHasher returns an FNV hasher builder to assign keys to partitions.
func DefaultHasher() func() hash.Hash32 {
return func() hash.Hash32 {
return fnv.New32a()
}

}

type consumerBuilder func(brokers []string, group string, registry metrics.Registry) (kafka.Consumer, error)
type producerBuilder func(brokers []string, registry metrics.Registry) (kafka.Producer, error)
type topicmgrBuilder func(brokers []string) (kafka.TopicManager, error)

func defaultConsumerBuilder(brokers []string, group string, registry metrics.Registry) (kafka.Consumer, error) {
return kafka.NewSaramaConsumer(brokers, group, registry)
config := kafka.CreateDefaultSaramaConfig("goka", nil, registry)
return kafka.NewSaramaConsumer(brokers, group, config, registry)
}

func defaultProducerBuilder(log logger.Logger) producerBuilder {
func defaultProducerBuilder(hasher func() hash.Hash32, log logger.Logger) producerBuilder {
return func(brokers []string, registry metrics.Registry) (kafka.Producer, error) {
return kafka.NewProducer(brokers, registry, log)
partitioner := sarama.NewCustomHashPartitioner(hasher)
config := kafka.CreateDefaultSaramaConfig("goka", partitioner, registry)
return kafka.NewProducer(brokers, &config.Config, registry, log)
}
}

Expand All @@ -97,6 +111,7 @@ type poptions struct {
updateCallback UpdateCallback
registry metrics.Registry
partitionChannelSize int
hasher func() hash.Hash32

builders struct {
storage StorageBuilder
Expand Down Expand Up @@ -142,7 +157,7 @@ func WithTopicManager(tm kafka.TopicManager) ProcessorOption {
}
}

// WithConsumer replaces goka's default consumer. Mainly for testing.
// WithConsumer replaces goka's default consumer.
func WithConsumer(c kafka.Consumer) ProcessorOption {
return func(o *poptions) {
o.builders.consumer = func(brokers []string, group string, registry metrics.Registry) (kafka.Consumer, error) {
Expand All @@ -154,7 +169,7 @@ func WithConsumer(c kafka.Consumer) ProcessorOption {
}
}

// WithProducer replaces goka'S default producer. Mainly for testing.
// WithProducer replaces goka's default producer.
func WithProducer(p kafka.Producer) ProcessorOption {
return func(o *poptions) {
o.builders.producer = func(brokers []string, registry metrics.Registry) (kafka.Producer, error) {
Expand Down Expand Up @@ -201,13 +216,24 @@ func WithRegistry(r metrics.Registry) ProcessorOption {
}
}

// WithHasher sets the hash function that assigns keys to partitions.
func WithHasher(hasher func() hash.Hash32) ProcessorOption {
return func(o *poptions) {
o.hasher = hasher
}
}

func (opt *poptions) applyOptions(group string, opts ...ProcessorOption) error {
opt.clientID = defaultClientID

for _, o := range opts {
o(opt)
}

if opt.hasher == nil {
opt.hasher = DefaultHasher()
}

// StorageBuilder should always be set as a default option in NewProcessor
if opt.builders.storage == nil {
return fmt.Errorf("StorageBuilder not set")
Expand All @@ -216,7 +242,7 @@ func (opt *poptions) applyOptions(group string, opts ...ProcessorOption) error {
opt.builders.consumer = defaultConsumerBuilder
}
if opt.builders.producer == nil {
opt.builders.producer = defaultProducerBuilder(opt.log)
opt.builders.producer = defaultProducerBuilder(opt.hasher, opt.log)
}
if opt.builders.topicmgr == nil {
opt.builders.topicmgr = defaultTopicManagerBuilder
Expand Down Expand Up @@ -246,6 +272,7 @@ type voptions struct {
updateCallback UpdateCallback
registry metrics.Registry
partitionChannelSize int
hasher func() hash.Hash32

builders struct {
storage StorageBuilder
Expand Down Expand Up @@ -330,11 +357,22 @@ func WithViewPartitionChannelSize(size int) ViewOption {
}
}

// WithViewHasher sets the hash function that assigns keys to partitions.
func WithViewHasher(hasher func() hash.Hash32) ViewOption {
return func(o *voptions) {
o.hasher = hasher
}
}

func (opt *voptions) applyOptions(topic Table, opts ...ViewOption) error {
for _, o := range opts {
o(opt)
}

if opt.hasher == nil {
opt.hasher = DefaultHasher()
}

// StorageBuilder should always be set as a default option in NewView
if opt.builders.storage == nil {
return fmt.Errorf("StorageBuilder not set")
Expand Down Expand Up @@ -372,6 +410,7 @@ type eoptions struct {

registry metrics.Registry
codec Codec
hasher func() hash.Hash32

builders struct {
topicmgr topicmgrBuilder
Expand Down Expand Up @@ -428,6 +467,13 @@ func WithEmitterKafkaMetrics(registry metrics.Registry) EmitterOption {
}
}

// WithEmitterHasher sets the hash function that assigns keys to partitions.
func WithEmitterHasher(hasher func() hash.Hash32) EmitterOption {
return func(o *eoptions) {
o.hasher = hasher
}
}

func (opt *eoptions) applyOptions(opts ...EmitterOption) error {
opt.clientID = defaultClientID
opt.log = logger.Default()
Expand All @@ -436,9 +482,13 @@ func (opt *eoptions) applyOptions(opts ...EmitterOption) error {
o(opt)
}

if opt.hasher == nil {
opt.hasher = DefaultHasher()
}

// config not set, use default one
if opt.builders.producer == nil {
opt.builders.producer = defaultProducerBuilder(opt.log)
opt.builders.producer = defaultProducerBuilder(opt.hasher, opt.log)
}
if opt.builders.topicmgr == nil {
opt.builders.topicmgr = defaultTopicManagerBuilder
Expand Down
3 changes: 1 addition & 2 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package goka
import (
"errors"
"fmt"
"hash/fnv"
"runtime/debug"
"sync"
"time"
Expand Down Expand Up @@ -257,7 +256,7 @@ func (g *Processor) hash(key string) (int32, error) {
// create a new hasher every time. Alternative would be to store the hash in
// view and every time reset the hasher (ie, hasher.Reset()). But that would
// also require us to protect the access of the hasher with a mutex.
hasher := fnv.New32a()
hasher := g.opts.hasher()

_, err := hasher.Write([]byte(key))
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1010,12 +1010,12 @@ func TestProcessor_HasGet(t *testing.T) {
}

func TestProcessor_HasGetStateless(t *testing.T) {
p := &Processor{graph: DefineGroup(group)}
p := &Processor{graph: DefineGroup(group), opts: &poptions{hasher: DefaultHasher()}}
_, err := p.Get("item1")
ensure.NotNil(t, err)
ensure.StringContains(t, err.Error(), "stateless processor")

p = &Processor{graph: DefineGroup(group, Persist(c))}
p = &Processor{graph: DefineGroup(group, Persist(c)), opts: &poptions{hasher: DefaultHasher()}}
p.partitions = map[int32]*partition{
0: new(partition),
}
Expand All @@ -1024,7 +1024,7 @@ func TestProcessor_HasGetStateless(t *testing.T) {
ensure.NotNil(t, err)
ensure.StringContains(t, err.Error(), "0 partitions")

p = &Processor{graph: DefineGroup(group, Persist(c))}
p = &Processor{graph: DefineGroup(group, Persist(c)), opts: &poptions{hasher: DefaultHasher()}}
p.partitions = map[int32]*partition{
0: new(partition),
}
Expand All @@ -1037,7 +1037,7 @@ func TestProcessor_HasGetStateless(t *testing.T) {
defer ctrl.Finish()

st := mock.NewMockStorage(ctrl)
p = &Processor{graph: DefineGroup(group, Persist(c))}
p = &Processor{graph: DefineGroup(group, Persist(c)), opts: &poptions{hasher: DefaultHasher()}}
p.partitions = map[int32]*partition{
0: &partition{log: logger.Default(), st: &storageProxy{Storage: st, partition: 0}},
}
Expand Down
3 changes: 1 addition & 2 deletions view.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package goka
import (
"errors"
"fmt"
"hash/fnv"
"sync"

"github.com/lovoo/goka/kafka"
Expand Down Expand Up @@ -188,7 +187,7 @@ func (v *View) hash(key string) (int32, error) {
// create a new hasher every time. Alternative would be to store the hash in
// view and every time reset the hasher (ie, hasher.Reset()). But that would
// also require us to protect the access of the hasher with a mutex.
hasher := fnv.New32a()
hasher := v.opts.hasher()

_, err := hasher.Write([]byte(key))
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func createTestView(t *testing.T, consumer kafka.Consumer, sb StorageBuilder, tm
},
registry: metrics.DefaultRegistry,
gokaRegistry: metrics.DefaultRegistry,
hasher: DefaultHasher(),
}
opts.builders.storage = sb
opts.builders.topicmgr = func(brokers []string) (kafka.TopicManager, error) {
Expand Down Expand Up @@ -222,7 +223,7 @@ func TestView_StartStopWithError(t *testing.T) {
}

func TestView_GetErrors(t *testing.T) {
v := &View{}
v := &View{opts: &voptions{hasher: DefaultHasher()}}
_, err := v.Get("hey")
ensure.NotNil(t, err)

Expand Down

0 comments on commit 7f8de28

Please sign in to comment.