Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix examples initialization after auto creation of topics is disabled #384

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/1-simplest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func main() {
if err != nil {
log.Fatalf("Error creating topic manager: %v", err)
}
defer tm.Close()
err = tm.EnsureStreamExists(string(topic), 8)
if err != nil {
log.Printf("Error creating kafka topic %s: %v", topic, err)
Expand Down
1 change: 1 addition & 0 deletions examples/10-visit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func main() {
if err != nil {
log.Fatalf("Error creating topic manager: %v", err)
}
defer tm.Close()
err = tm.EnsureStreamExists(string(topic), 8)
if err != nil {
log.Fatalf("Error creating kafka topic %s: %v", topic, err)
Expand Down
38 changes: 31 additions & 7 deletions examples/2-clicks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"time"

Expand All @@ -16,6 +17,8 @@ var (
brokers = []string{"127.0.0.1:9092"}
topic goka.Stream = "user-click"
group goka.Group = "mini-group"

tmc *goka.TopicManagerConfig
)

// A user is the object that is stored in the processor's group table
Expand All @@ -28,6 +31,12 @@ type user struct {
// group table.
type userCodec struct{}

func init() {
tmc = goka.NewTopicManagerConfig()
tmc.Table.Replication = 1
tmc.Stream.Replication = 1
}

// Encodes a user into []byte
func (jc *userCodec) Encode(value interface{}) ([]byte, error) {
if _, isUser := value.(*user); !isUser {
Expand Down Expand Up @@ -82,14 +91,11 @@ func process(ctx goka.Context, msg interface{}) {
fmt.Printf("[proc] key: %s clicks: %d, msg: %v\n", ctx.Key(), u.Clicks, msg)
}

func runProcessor() {
func runProcessor(initialized chan struct{}) {
g := goka.DefineGroup(group,
goka.Input(topic, new(codec.String), process),
goka.Persist(new(userCodec)),
)
tmc := goka.NewTopicManagerConfig()
tmc.Table.Replication = 1
tmc.Stream.Replication = 1
p, err := goka.NewProcessor(brokers,
g,
goka.WithTopicManagerBuilder(goka.TopicManagerBuilderWithTopicManagerConfig(tmc)),
Expand All @@ -99,10 +105,14 @@ func runProcessor() {
panic(err)
}

close(initialized)

p.Run(context.Background())
}

func runView() {
func runView(initialized chan struct{}) {
<-initialized

view, err := goka.NewView(brokers,
goka.GroupTable(group),
new(userCodec),
Expand All @@ -124,7 +134,21 @@ func runView() {
}

func main() {
tm, err := goka.NewTopicManager(brokers, goka.DefaultConfig(), tmc)
if err != nil {
log.Fatalf("Error creating topic manager: %v", err)
}
defer tm.Close()
err = tm.EnsureStreamExists(string(topic), 8)
if err != nil {
log.Printf("Error creating kafka topic %s: %v", topic, err)
}

// When this example is run the first time, wait for creation of all internal topics (this is done
// by goka.NewProcessor)
initialized := make(chan struct{})

go runEmitter()
go runProcessor()
runView()
go runProcessor(initialized)
runView(initialized)
}
2 changes: 1 addition & 1 deletion examples/3-messaging/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func collect(ctx goka.Context, msg interface{}) {
ml = append(ml, *m)

if len(ml) > maxMessages {
ml = ml[len(ml)-maxMessages-1:]
ml = ml[len(ml)-maxMessages:]
}
ctx.SetValue(ml)
}
Expand Down
7 changes: 6 additions & 1 deletion examples/3-messaging/blocker/blocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package blocker
import (
"context"
"encoding/json"

"github.com/lovoo/goka"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
)

var (
Expand Down Expand Up @@ -58,6 +58,10 @@ func block(ctx goka.Context, msg interface{}) {
ctx.SetValue(s)
}

func PrepareTopics(brokers []string) {
topicinit.EnsureStreamExists(string(Stream), brokers)
}

func Run(ctx context.Context, brokers []string) func() error {
return func() error {
g := goka.DefineGroup(group,
Expand All @@ -68,6 +72,7 @@ func Run(ctx context.Context, brokers []string) func() error {
if err != nil {
return err
}

return p.Run(ctx)
}
}
17 changes: 17 additions & 0 deletions examples/3-messaging/cmd/processor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,23 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
grp, ctx := errgroup.WithContext(ctx)

// Create topics if they do not already exist
if *runCollector {
collector.PrepareTopics(brokers)
}
if *runFilter {
filter.PrepareTopics(brokers)
}
if *runBlocker {
blocker.PrepareTopics(brokers)
}
if *runDetector {
detector.PrepareTopics(brokers)
}
if *runTranslator {
translator.PrepareTopics(brokers)
}

if *runCollector {
log.Println("starting collector")
grp.Go(collector.Run(ctx, brokers))
Expand Down
6 changes: 5 additions & 1 deletion examples/3-messaging/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package collector
import (
"context"
"encoding/json"

"github.com/lovoo/goka"
"github.com/lovoo/goka/examples/3-messaging"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
)

const maxMessages = 5
Expand Down Expand Up @@ -42,6 +42,10 @@ func collect(ctx goka.Context, msg interface{}) {
ctx.SetValue(ml)
}

func PrepareTopics(brokers []string) {
topicinit.EnsureStreamExists(string(messaging.ReceivedStream), brokers)
}

func Run(ctx context.Context, brokers []string) func() error {
return func() error {
g := goka.DefineGroup(group,
Expand Down
6 changes: 5 additions & 1 deletion examples/3-messaging/detector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package detector
import (
"context"
"encoding/json"

"github.com/lovoo/goka"
"github.com/lovoo/goka/examples/3-messaging"
"github.com/lovoo/goka/examples/3-messaging/blocker"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
)

const (
Expand Down Expand Up @@ -49,6 +49,10 @@ func detectSpammer(ctx goka.Context, c *Counters) bool {
return total >= minMessages && rate >= maxRate
}

func PrepareTopics(brokers []string) {
topicinit.EnsureStreamExists(string(messaging.SentStream), brokers)
}

func Run(ctx context.Context, brokers []string) func() error {
return func() error {
g := goka.DefineGroup(group,
Expand Down
15 changes: 13 additions & 2 deletions examples/3-messaging/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package filter

import (
"context"
"strings"

"github.com/lovoo/goka"
messaging "github.com/lovoo/goka/examples/3-messaging"
"github.com/lovoo/goka/examples/3-messaging/blocker"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
"github.com/lovoo/goka/examples/3-messaging/translator"
"strings"
)

var (
Expand Down Expand Up @@ -41,6 +41,16 @@ func translate(ctx goka.Context, m *messaging.Message) *messaging.Message {
}
}

func PrepareTopics(brokers []string) {
topicinit.EnsureStreamExists(string(messaging.SentStream), brokers)

// We refer to these tables, ensure that they exist initially also in the
// case that the translator or blocker processors are not started
for _, topicName := range []string{string(translator.Table), string(blocker.Table)} {
topicinit.EnsureTableExists(topicName, brokers)
}
}

func Run(ctx context.Context, brokers []string) func() error {
return func() error {
g := goka.DefineGroup(group,
Expand All @@ -53,6 +63,7 @@ func Run(ctx context.Context, brokers []string) func() error {
if err != nil {
return err
}

return p.Run(ctx)
}
}
35 changes: 35 additions & 0 deletions examples/3-messaging/topicinit/topicinit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package topicinit

import (
"github.com/lovoo/goka"
"log"
)

// EnsureStreamExists is a convenience wrapper for TopicManager.EnsureStreamExists
func EnsureStreamExists(topic string, brokers []string) {
tm := createTopicManager(brokers)
norbertklawikowski marked this conversation as resolved.
Show resolved Hide resolved
defer tm.Close()
err := tm.EnsureStreamExists(topic, 8)
if err != nil {
log.Printf("Error creating kafka topic %s: %v", topic, err)
}
}

// EnsureTableExists is a convenience wrapper for TopicManager.EnsureTableExists
func EnsureTableExists(topic string, brokers []string) {
tm := createTopicManager(brokers)
defer tm.Close()
err := tm.EnsureTableExists(topic, 8)
if err != nil {
log.Printf("Error creating kafka topic %s: %v", topic, err)
}
}

func createTopicManager(brokers []string) goka.TopicManager {
tmc := goka.NewTopicManagerConfig()
tm, err := goka.NewTopicManager(brokers, goka.DefaultConfig(), tmc)
if err != nil {
log.Fatalf("Error creating topic manager: %v", err)
}
return tm
}
7 changes: 6 additions & 1 deletion examples/3-messaging/translator/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package translator

import (
"context"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
)

var (
Expand All @@ -21,6 +21,10 @@ func translate(ctx goka.Context, msg interface{}) {
ctx.SetValue(msg.(string))
}

func PrepareTopics(brokers []string) {
topicinit.EnsureStreamExists(string(Stream), brokers)
}

func Run(ctx context.Context, brokers []string) func() error {
return func() error {
g := goka.DefineGroup(group,
Expand All @@ -31,6 +35,7 @@ func Run(ctx context.Context, brokers []string) func() error {
if err != nil {
return err
}

return p.Run(ctx)
}
}
40 changes: 36 additions & 4 deletions examples/5-multiple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,36 @@ func process(ctx goka.Context, msg interface{}) {
ctx.SetValue(u)
}

func runProcessor(ctx context.Context, monitor *monitor.Server, query *query.Server) error {
func runProcessor(ctx context.Context,
monitor *monitor.Server,
query *query.Server,
groupInitialized chan struct{}) error {

tmc := goka.NewTopicManagerConfig()
tm, err := goka.NewTopicManager(brokers, goka.DefaultConfig(), tmc)
if err != nil {
log.Fatalf("Error creating topic manager: %v", err)
}
defer tm.Close()
for _, topicName := range []string{string(inputA), string(inputB)} {
err = tm.EnsureStreamExists(topicName, 8)
if err != nil {
log.Printf("Error creating kafka topic %s: %v", topicName, err)
}
}

p, err := goka.NewProcessor(brokers, goka.DefineGroup(group,
goka.Input(inputA, new(codec.String), process),
goka.Input(inputB, new(codec.String), process),
goka.Persist(new(userCodec)),
),
goka.WithStorageBuilder(randomStorageBuilder("proc")),
)
if err != nil {
return err
}

close(groupInitialized)

// attach the processor to the monitor
monitor.AttachProcessor(p)
Expand All @@ -135,7 +157,14 @@ func runProcessor(ctx context.Context, monitor *monitor.Server, query *query.Ser
return err
}

func runView(ctx context.Context, errg *multierr.ErrGroup, root *mux.Router, monitor *monitor.Server) error {
func runView(ctx context.Context,
errg *multierr.ErrGroup,
root *mux.Router,
monitor *monitor.Server,
groupInitialized chan struct{}) error {

<-groupInitialized

view, err := goka.NewView(brokers,
goka.GroupTable(group),
new(userCodec),
Expand Down Expand Up @@ -220,16 +249,19 @@ func main() {
cancel()
}()

// runView uses the group table, which first has to be initialized by runProcessor
groupInitialized := make(chan struct{})

errg, ctx := multierr.NewErrGroup(ctx)
errg.Go(func() error {
defer log.Printf("emitter done")
return runEmitter(ctx)
})
errg.Go(func() error {
defer log.Printf("processor done")
return runProcessor(ctx, monitorServer, queryServer)
return runProcessor(ctx, monitorServer, queryServer, groupInitialized)
})
if err := runView(ctx, errg, root, monitorServer); err != nil {
if err := runView(ctx, errg, root, monitorServer, groupInitialized); err != nil {
log.Printf("Error running view, will shutdown: %v", err)
cancel()
}
Expand Down
Loading