Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mostly complying with gometalinter #114

Merged
merged 1 commit into from
Mar 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ type cbContext struct {
// Emit sends a message asynchronously to a topic.
func (ctx *cbContext) Emit(topic Stream, key string, value interface{}) {
if topic == "" {
ctx.Fail(errors.New("Cannot emit to empty topic"))
ctx.Fail(errors.New("cannot emit to empty topic"))
}
if loopName(ctx.graph.Group()) == string(topic) {
ctx.Fail(errors.New("Cannot emit to loop topic, use Loopback() instead."))
ctx.Fail(errors.New("cannot emit to loop topic (use Loopback instead)"))
}
if tableName(ctx.graph.Group()) == string(topic) {
ctx.Fail(errors.New("Cannot emit to table topic, use SetValue() instead."))
ctx.Fail(errors.New("cannot emit to table topic (use SetValue instead)"))
}
c := ctx.graph.codec(string(topic))
if c == nil {
Expand All @@ -115,12 +115,12 @@ func (ctx *cbContext) Emit(topic Stream, key string, value interface{}) {
func (ctx *cbContext) Loopback(key string, value interface{}) {
l := ctx.graph.LoopStream()
if l == nil {
ctx.Fail(errors.New("No loop topic configured"))
ctx.Fail(errors.New("no loop topic configured"))
}

data, err := l.Codec().Encode(value)
if err != nil {
ctx.Fail(fmt.Errorf("Error encoding message for key %s: %v", key, err))
ctx.Fail(fmt.Errorf("error encoding message for key %s: %v", key, err))
}

ctx.emit(l.Topic(), key, data)
Expand Down Expand Up @@ -158,7 +158,7 @@ func (ctx *cbContext) Value() interface{} {

// SetValue updates the value of the key in the group table.
func (ctx *cbContext) SetValue(value interface{}) {
if err := ctx.setValueForKey(string(ctx.msg.Key), value); err != nil {
if err := ctx.setValueForKey(ctx.msg.Key, value); err != nil {
ctx.Fail(err)
}
}
Expand All @@ -169,7 +169,7 @@ func (ctx *cbContext) Timestamp() time.Time {
}

func (ctx *cbContext) Key() string {
return string(ctx.msg.Key)
return ctx.msg.Key
}

func (ctx *cbContext) Topic() Stream {
Expand Down Expand Up @@ -258,7 +258,7 @@ func (ctx *cbContext) setValueForKey(key string, value interface{}) error {
}

if value == nil {
return fmt.Errorf("Cannot set nil as value.")
return fmt.Errorf("cannot set nil as value")
}

encodedValue, err := ctx.graph.GroupTable().Codec().Encode(value)
Expand Down Expand Up @@ -309,7 +309,7 @@ func (ctx *cbContext) start() {
// if some emit failed.
func (ctx *cbContext) tryCommit(err error) {
if err != nil {
ctx.errors.Collect(err)
_ = ctx.errors.Collect(err)
}

// not all calls are done yet, do not send the ack upstream.
Expand Down
21 changes: 5 additions & 16 deletions context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,6 @@ import (
"github.com/golang/mock/gomock"
)

func waitForValue(t *testing.T, value *int, targetValue int, timeout time.Duration) {
maxTries := 25
for i := 0; i < maxTries; i++ {
if *value == targetValue {
return
}

time.Sleep(time.Duration(int64(timeout) / int64(maxTries)))
}
t.Fatal("Timeout")
}

func newEmitter(err error, done func(err error)) emitter {
return func(topic string, key string, value []byte) *kafka.Promise {
p := kafka.NewPromise()
Expand Down Expand Up @@ -133,15 +121,15 @@ func TestContext_EmitError(t *testing.T) {
func TestContext_EmitToStateTopic(t *testing.T) {
ctx := &cbContext{graph: DefineGroup(group, Persist(c), Loop(c, cb))}
func() {
defer ensure.PanicDeepEqual(t, errors.New("Cannot emit to table topic, use SetValue() instead."))
defer ensure.PanicDeepEqual(t, errors.New("cannot emit to table topic (use SetValue instead)"))
ctx.Emit(Stream(tableName(group)), "key", []byte("value"))
}()
func() {
defer ensure.PanicDeepEqual(t, errors.New("Cannot emit to loop topic, use Loopback() instead."))
defer ensure.PanicDeepEqual(t, errors.New("cannot emit to loop topic (use Loopback instead)"))
ctx.Emit(Stream(loopName(group)), "key", []byte("value"))
}()
func() {
defer ensure.PanicDeepEqual(t, errors.New("Cannot emit to empty topic"))
defer ensure.PanicDeepEqual(t, errors.New("cannot emit to empty topic"))
ctx.Emit("", "key", []byte("value"))
}()
}
Expand Down Expand Up @@ -337,6 +325,7 @@ func TestContext_SetErrors(t *testing.T) {
offset int64 = 123
wg = new(sync.WaitGroup)
failed error
_ = failed // make linter happy
)

ctx := &cbContext{
Expand All @@ -350,7 +339,7 @@ func TestContext_SetErrors(t *testing.T) {

err := ctx.setValueForKey(key, nil)
ensure.NotNil(t, err)
ensure.StringContains(t, err.Error(), "Cannot set nil")
ensure.StringContains(t, err.Error(), "cannot set nil")

err = ctx.setValueForKey(key, 123) // cannot encode 123 as string
ensure.NotNil(t, err)
Expand Down
7 changes: 4 additions & 3 deletions emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/lovoo/goka/kafka"
)

// Emitter emits messages into a specific Kafka topic, first encoding the message with the given codec.
type Emitter struct {
codec Codec
producer kafka.Producer
Expand All @@ -16,7 +17,7 @@ type Emitter struct {
wg sync.WaitGroup
}

// NewEmitter creates a new emitter using passed brokers, topic, codec and possibly options
// NewEmitter creates a new emitter using passed brokers, topic, codec and possibly options.
func NewEmitter(brokers []string, topic Stream, codec Codec, options ...EmitterOption) (*Emitter, error) {
options = append(
// default options comes first
Expand Down Expand Up @@ -64,7 +65,7 @@ func (e *Emitter) Emit(key string, msg interface{}) (*kafka.Promise, error) {
}), nil
}

// EmitSync sends a message to passed topic and key
// EmitSync sends a message to passed topic and key.
func (e *Emitter) EmitSync(key string, msg interface{}) error {
var (
err error
Expand All @@ -85,7 +86,7 @@ func (e *Emitter) EmitSync(key string, msg interface{}) error {
return err
}

// Finish waits until the emitter is finished producing all pending messages
// Finish waits until the emitter is finished producing all pending messages.
func (e *Emitter) Finish() {
e.wg.Wait()
}
5 changes: 2 additions & 3 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package goka

var (
errBuildConsumer = "error creating Kafka consumer: %v"

errBuildProducer = "Error creating Kafka producer: %v"
errApplyOptions = "Error applying user-defined options: %v"
errBuildProducer = "error creating Kafka producer: %v"
errApplyOptions = "error applying options: %v"
)
74 changes: 65 additions & 9 deletions graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,26 @@ var (
loopSuffix = "-loop"
)

// Stream is the name of an event stream topic in Kafka, ie, a topic with
// cleanup.policy=delete
type Stream string

// Streams is a slice of Stream names.
type Streams []Stream

// Table is the name of a table topic in Kafka, ie, a topic with
// cleanup.policy=compact
type Table string

// Group is the name of a consumer group in Kafka and represents a processor
// group in Goka. A processor group may have a group table and a group loopback
// stream. By default, the group table is named <group>-table and the loopback
// stream <group>-loop.
type Group string

// GroupGraph is the specification of a processor group. It contains all input,
// output, and any other topic from which and into which the processor group
// may consume or produce events. Each of these links to Kafka is called Edge.
type GroupGraph struct {
group string
inputTables []Edge
Expand All @@ -31,22 +46,27 @@ type GroupGraph struct {
joinCheck map[string]bool
}

// Group returns the group name.
func (gg *GroupGraph) Group() Group {
return Group(gg.group)
}

// InputStreams returns all input stream edges of the group.
func (gg *GroupGraph) InputStreams() Edges {
return gg.inputStreams
}

// JointTables retuns all joint table edges of the group.
func (gg *GroupGraph) JointTables() Edges {
return gg.inputTables
}

// LookupTables retuns all lookup table edges of the group.
func (gg *GroupGraph) LookupTables() Edges {
return gg.crossTables
}

// LoopStream returns the loopback edge of the group.
func (gg *GroupGraph) LoopStream() Edge {
// only 1 loop stream is valid
if len(gg.loopStream) > 0 {
Expand All @@ -55,6 +75,7 @@ func (gg *GroupGraph) LoopStream() Edge {
return nil
}

// GroupTable returns the group table edge of the group.
func (gg *GroupGraph) GroupTable() Edge {
// only 1 group table is valid
if len(gg.groupTable) > 0 {
Expand All @@ -63,6 +84,7 @@ func (gg *GroupGraph) GroupTable() Edge {
return nil
}

// OutputStreams returns the output stream edges of the group.
func (gg *GroupGraph) OutputStreams() Edges {
return gg.outputStreams
}
Expand All @@ -89,6 +111,8 @@ func (gg *GroupGraph) joint(topic string) bool {
return gg.joinCheck[topic]
}

// DefineGroup creates a group graph with a given group name and a list of
// edges.
func DefineGroup(group Group, edges ...Edge) *GroupGraph {
gg := GroupGraph{group: string(group),
codecs: make(map[string]Codec),
Expand Down Expand Up @@ -133,6 +157,12 @@ func DefineGroup(group Group, edges ...Edge) *GroupGraph {
return &gg
}

// Validate validates the group graph and returns an error if invalid.
// Main validation checks are:
// - at most one loopback stream edge is allowed
// - at most one group table edge is allowed
// - at least one input stream is required
// - table and loopback topics cannot be used in any other edge.
func (gg *GroupGraph) Validate() error {
if len(gg.loopStream) > 1 {
return errors.New("more than one loop stream in group graph")
Expand All @@ -155,14 +185,18 @@ func (gg *GroupGraph) Validate() error {
return nil
}

// Edge represents a topic in Kafka and the corresponding codec to encode and
// decode the messages of that topic.
type Edge interface {
String() string
Topic() string
Codec() Codec
}

// Edges is a slice of edge objects.
type Edges []Edge

// Topics returns the names of the topics of the edges.
func (e Edges) Topics() []string {
var t []string
for _, i := range e {
Expand Down Expand Up @@ -193,9 +227,11 @@ type inputStream struct {
cb ProcessCallback
}

// Stream returns a subscription for a co-partitioned topic. The processor
// subscribing for a stream topic will start reading from the newest offset of
// the partition.
// Input represents an edge of an input stream topic. The edge
// specifies the topic name, its codec and the ProcessorCallback used to
// process it. The topic has to be copartitioned with any other input stream of
// the group and with the group table.
// The group starts reading the topic from the newest offset.
func Input(topic Stream, c Codec, cb ProcessCallback) Edge {
return &inputStream{&topicDef{string(topic), c}, cb}
}
Expand Down Expand Up @@ -229,7 +265,7 @@ func (is inputStreams) Codec() Codec {
return is[0].Codec()
}

// Inputs creates Edges for multiple input streams sharing the same
// Inputs creates edges of multiple input streams sharing the same
// codec and callback.
func Inputs(topics Streams, c Codec, cb ProcessCallback) Edge {
if len(topics) == 0 {
Expand All @@ -244,22 +280,27 @@ func Inputs(topics Streams, c Codec, cb ProcessCallback) Edge {

type loopStream inputStream

// Loop defines a consume callback on the loop topic
// Loop represents the edge of the loopback topic of the group. The edge
// specifies the codec of the messages in the topic and ProcesCallback to
// process the messages of the topic. Context.Loopback() is used to write
// messages into this topic from any callback of the group.
func Loop(c Codec, cb ProcessCallback) Edge {
return &loopStream{&topicDef{codec: c}, cb}
}

func (s *loopStream) setGroup(group Group) {
s.topicDef.name = string(loopName(group))
s.topicDef.name = loopName(group)
}

type inputTable struct {
*topicDef
}

// Table is one or more co-partitioned, log-compacted topic. The processor
// subscribing for a table topic will start reading from the oldest offset
// of the partition.
// Join represents an edge of a copartitioned, log-compacted table topic. The
// edge specifies the topic name and the codec of the messages of the topic.
// The group starts reading the topic from the oldest offset.
// The processing of input streams is blocked until all partitions of the table
// are recovered.
func Join(topic Table, c Codec) Edge {
return &inputTable{&topicDef{string(topic), c}}
}
Expand All @@ -268,6 +309,11 @@ type crossTable struct {
*topicDef
}

// Lookup represents an edge of a non-copartitioned, log-compacted table
// topic. The edge specifies the topic name and the codec of the messages of
// the topic. The group starts reading the topic from the oldest offset.
// The processing of input streams is blocked until the table is fully
// recovered.
func Lookup(topic Table, c Codec) Edge {
return &crossTable{&topicDef{string(topic), c}}
}
Expand All @@ -276,6 +322,11 @@ type groupTable struct {
*topicDef
}

// Persist represents the edge of the group table, which is log-compacted and
// copartitioned with the input streams. This edge specifies the codec of the
// messages in the topic, ie, the codec of the values of the table.
// The processing of input streams is blocked until all partitions of the group
// table are recovered.
func Persist(c Codec) Edge {
return &groupTable{&topicDef{codec: c}}
}
Expand All @@ -288,6 +339,11 @@ type outputStream struct {
*topicDef
}

// Output represents an edge of an output stream topic. The edge
// specifies the topic name and the codec of the messages of the topic.
// Context.Emit() only emits messages into Output edges defined in the group
// graph.
// The topic does not have to be copartitioned with the input streams.
func Output(topic Stream, c Codec) Edge {
return &outputStream{&topicDef{string(topic), c}}
}
Expand Down
1 change: 1 addition & 0 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/lovoo/goka/storage"
)

// Iterator allows one to iterate over the keys of a view.
type Iterator interface {
Next() bool
Key() string
Expand Down
Loading