Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Groupgraphhook option #175

Merged
merged 2 commits into from
Mar 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 32 additions & 12 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ func WithHasher(hasher func() hash.Hash32) ProcessorOption {
}
}

// WithGroupGraphHook allows a function to obtain the group graph when a processor is started.
func WithGroupGraphHook(hook func(gg *GroupGraph)) ProcessorOption {
return func(o *poptions, gg *GroupGraph) {
hook(gg)
}
}

// NilHandling defines how nil messages should be handled by the processor.
type NilHandling int

Expand Down Expand Up @@ -177,6 +184,7 @@ type Tester interface {
TopicManagerBuilder() kafka.TopicManagerBuilder
RegisterGroupGraph(*GroupGraph)
RegisterEmitter(Stream, Codec)
RegisterView(Table, Codec)
}

// WithTester configures all external connections of a processor, ie, storage,
Expand Down Expand Up @@ -223,7 +231,7 @@ func (opt *poptions) applyOptions(gg *GroupGraph, opts ...ProcessorOption) error
///////////////////////////////////////////////////////////////////////////////

// ViewOption defines a configuration option to be used when creating a view.
type ViewOption func(*voptions)
type ViewOption func(*voptions, Table, Codec)

type voptions struct {
log logger.Logger
Expand All @@ -244,36 +252,36 @@ type voptions struct {
// WithViewLogger sets the logger the view should use. By default, views
// use the standard library logger.
func WithViewLogger(log logger.Logger) ViewOption {
return func(o *voptions) {
return func(o *voptions, table Table, codec Codec) {
o.log = log
}
}

// WithViewCallback defines the callback called upon recovering a message
// from the log.
func WithViewCallback(cb UpdateCallback) ViewOption {
return func(o *voptions) {
return func(o *voptions, table Table, codec Codec) {
o.updateCallback = cb
}
}

// WithViewStorageBuilder defines a builder for the storage of each partition.
func WithViewStorageBuilder(sb storage.Builder) ViewOption {
return func(o *voptions) {
return func(o *voptions, table Table, codec Codec) {
o.builders.storage = sb
}
}

// WithViewConsumerBuilder replaces default view consumer.
func WithViewConsumerBuilder(cb kafka.ConsumerBuilder) ViewOption {
return func(o *voptions) {
return func(o *voptions, table Table, codec Codec) {
o.builders.consumer = cb
}
}

// WithViewTopicManagerBuilder replaces the default topic manager.
func WithViewTopicManagerBuilder(tmb kafka.TopicManagerBuilder) ViewOption {
return func(o *voptions) {
return func(o *voptions, table Table, codec Codec) {
o.builders.topicmgr = tmb
}
}
Expand All @@ -282,21 +290,21 @@ func WithViewTopicManagerBuilder(tmb kafka.TopicManagerBuilder) ViewOption {
// This is mostly used for testing by setting it to 0 to have synchronous behavior
// of goka.
func WithViewPartitionChannelSize(size int) ViewOption {
return func(o *voptions) {
return func(o *voptions, table Table, codec Codec) {
o.partitionChannelSize = size
}
}

// WithViewHasher sets the hash function that assigns keys to partitions.
func WithViewHasher(hasher func() hash.Hash32) ViewOption {
return func(o *voptions) {
return func(o *voptions, table Table, codec Codec) {
o.hasher = hasher
}
}

// WithViewClientID defines the client ID used to identify with Kafka.
func WithViewClientID(clientID string) ViewOption {
return func(o *voptions) {
return func(o *voptions, table Table, codec Codec) {
o.clientID = clientID
}
}
Expand All @@ -305,18 +313,30 @@ func WithViewClientID(clientID string) ViewOption {
// returns errors. If the view is restartable, the client must call Terminate()
// to release all resources, ie, close the local storage.
func WithViewRestartable() ViewOption {
return func(o *voptions) {
return func(o *voptions, table Table, codec Codec) {
o.restartable = true
}
}

func (opt *voptions) applyOptions(topic Table, opts ...ViewOption) error {
// WithViewTester configures all external connections of a processor, ie, storage,
// consumer and producer
func WithViewTester(t Tester) ViewOption {
return func(o *voptions, table Table, codec Codec) {
o.builders.storage = t.StorageBuilder()
o.builders.consumer = t.ConsumerBuilder()
o.builders.topicmgr = t.TopicManagerBuilder()
o.partitionChannelSize = 0
t.RegisterView(table, codec)
}
}

func (opt *voptions) applyOptions(topic Table, codec Codec, opts ...ViewOption) error {
opt.clientID = defaultClientID
opt.log = logger.Default()
opt.hasher = DefaultHasher()

for _, o := range opts {
o(opt)
o(opt, topic, codec)
}

// StorageBuilder should always be set as a default option in NewView
Expand Down
6 changes: 6 additions & 0 deletions tester/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ func (km *Tester) RegisterGroupGraph(gg *goka.GroupGraph) {

}

// RegisterView registers a view to be working with the tester.
func (km *Tester) RegisterView(table goka.Table, c goka.Codec) {
km.getOrCreateQueue(string(table)).expectSimpleConsumer()
km.registerCodec(string(table), c)
}

// RegisterEmitter registers an emitter to be working with the tester.
func (km *Tester) RegisterEmitter(topic goka.Stream, codec goka.Codec) {
km.registerCodec(string(topic), codec)
Expand Down
2 changes: 1 addition & 1 deletion view.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewView(brokers []string, topic Table, codec Codec, options ...ViewOption)

// figure out how many partitions the group has
opts := new(voptions)
err := opts.applyOptions(topic, options...)
err := opts.applyOptions(topic, codec, options...)
if err != nil {
return nil, fmt.Errorf("Error applying user-defined options: %v", err)
}
Expand Down