Skip to content

Commit

Permalink
Merge pull request #175 from lovoo/groupgraphhook-option
Browse files Browse the repository at this point in the history
Groupgraphhook option
  • Loading branch information
frairon authored Mar 8, 2019
2 parents 0519bbb + 3dfb23f commit 3b534da
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 13 deletions.
44 changes: 32 additions & 12 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ func WithHasher(hasher func() hash.Hash32) ProcessorOption {
}
}

// WithGroupGraphHook allows a function to obtain the group graph when a processor is started.
func WithGroupGraphHook(hook func(gg *GroupGraph)) ProcessorOption {
return func(o *poptions, gg *GroupGraph) {
hook(gg)
}
}

// NilHandling defines how nil messages should be handled by the processor.
type NilHandling int

Expand Down Expand Up @@ -177,6 +184,7 @@ type Tester interface {
TopicManagerBuilder() kafka.TopicManagerBuilder
RegisterGroupGraph(*GroupGraph)
RegisterEmitter(Stream, Codec)
RegisterView(Table, Codec)
}

// WithTester configures all external connections of a processor, ie, storage,
Expand Down Expand Up @@ -223,7 +231,7 @@ func (opt *poptions) applyOptions(gg *GroupGraph, opts ...ProcessorOption) error
///////////////////////////////////////////////////////////////////////////////

// ViewOption defines a configuration option to be used when creating a view.
type ViewOption func(*voptions)
type ViewOption func(*voptions, Table, Codec)

type voptions struct {
log logger.Logger
Expand All @@ -244,36 +252,36 @@ type voptions struct {
// WithViewLogger sets the logger the view should use. By default, views
// use the standard library logger.
func WithViewLogger(log logger.Logger) ViewOption {
return func(o *voptions) {
return func(o *voptions, table Table, codec Codec) {
o.log = log
}
}

// WithViewCallback defines the callback called upon recovering a message
// from the log.
func WithViewCallback(cb UpdateCallback) ViewOption {
return func(o *voptions) {
return func(o *voptions, table Table, codec Codec) {
o.updateCallback = cb
}
}

// WithViewStorageBuilder defines a builder for the storage of each partition.
func WithViewStorageBuilder(sb storage.Builder) ViewOption {
return func(o *voptions) {
return func(o *voptions, table Table, codec Codec) {
o.builders.storage = sb
}
}

// WithViewConsumerBuilder replaces default view consumer.
func WithViewConsumerBuilder(cb kafka.ConsumerBuilder) ViewOption {
return func(o *voptions) {
return func(o *voptions, table Table, codec Codec) {
o.builders.consumer = cb
}
}

// WithViewTopicManagerBuilder replaces the default topic manager.
func WithViewTopicManagerBuilder(tmb kafka.TopicManagerBuilder) ViewOption {
return func(o *voptions) {
return func(o *voptions, table Table, codec Codec) {
o.builders.topicmgr = tmb
}
}
Expand All @@ -282,21 +290,21 @@ func WithViewTopicManagerBuilder(tmb kafka.TopicManagerBuilder) ViewOption {
// This is mostly used for testing by setting it to 0 to have synchronous behavior
// of goka.
func WithViewPartitionChannelSize(size int) ViewOption {
return func(o *voptions) {
return func(o *voptions, table Table, codec Codec) {
o.partitionChannelSize = size
}
}

// WithViewHasher sets the hash function that assigns keys to partitions.
func WithViewHasher(hasher func() hash.Hash32) ViewOption {
return func(o *voptions) {
return func(o *voptions, table Table, codec Codec) {
o.hasher = hasher
}
}

// WithViewClientID defines the client ID used to identify with Kafka.
func WithViewClientID(clientID string) ViewOption {
return func(o *voptions) {
return func(o *voptions, table Table, codec Codec) {
o.clientID = clientID
}
}
Expand All @@ -305,18 +313,30 @@ func WithViewClientID(clientID string) ViewOption {
// returns errors. If the view is restartable, the client must call Terminate()
// to release all resources, ie, close the local storage.
func WithViewRestartable() ViewOption {
return func(o *voptions) {
return func(o *voptions, table Table, codec Codec) {
o.restartable = true
}
}

func (opt *voptions) applyOptions(topic Table, opts ...ViewOption) error {
// WithViewTester configures all external connections of a processor, ie, storage,
// consumer and producer
func WithViewTester(t Tester) ViewOption {
return func(o *voptions, table Table, codec Codec) {
o.builders.storage = t.StorageBuilder()
o.builders.consumer = t.ConsumerBuilder()
o.builders.topicmgr = t.TopicManagerBuilder()
o.partitionChannelSize = 0
t.RegisterView(table, codec)
}
}

func (opt *voptions) applyOptions(topic Table, codec Codec, opts ...ViewOption) error {
opt.clientID = defaultClientID
opt.log = logger.Default()
opt.hasher = DefaultHasher()

for _, o := range opts {
o(opt)
o(opt, topic, codec)
}

// StorageBuilder should always be set as a default option in NewView
Expand Down
6 changes: 6 additions & 0 deletions tester/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ func (km *Tester) RegisterGroupGraph(gg *goka.GroupGraph) {

}

// RegisterView registers a view to be working with the tester.
func (km *Tester) RegisterView(table goka.Table, c goka.Codec) {
km.getOrCreateQueue(string(table)).expectSimpleConsumer()
km.registerCodec(string(table), c)
}

// RegisterEmitter registers an emitter to be working with the tester.
func (km *Tester) RegisterEmitter(topic goka.Stream, codec goka.Codec) {
km.registerCodec(string(topic), codec)
Expand Down
2 changes: 1 addition & 1 deletion view.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewView(brokers []string, topic Table, codec Codec, options ...ViewOption)

// figure out how many partitions the group has
opts := new(voptions)
err := opts.applyOptions(topic, options...)
err := opts.applyOptions(topic, codec, options...)
if err != nil {
return nil, fmt.Errorf("Error applying user-defined options: %v", err)
}
Expand Down

0 comments on commit 3b534da

Please sign in to comment.