diff --git a/examples/1-simplest/main.go b/examples/1-simplest/main.go index c41afeac..9a326e50 100644 --- a/examples/1-simplest/main.go +++ b/examples/1-simplest/main.go @@ -112,6 +112,7 @@ func main() { if err != nil { log.Fatalf("Error creating topic manager: %v", err) } + defer tm.Close() err = tm.EnsureStreamExists(string(topic), 8) if err != nil { log.Printf("Error creating kafka topic %s: %v", topic, err) diff --git a/examples/10-visit/main.go b/examples/10-visit/main.go index b2c2050f..fb364726 100644 --- a/examples/10-visit/main.go +++ b/examples/10-visit/main.go @@ -59,6 +59,7 @@ func main() { if err != nil { log.Fatalf("Error creating topic manager: %v", err) } + defer tm.Close() err = tm.EnsureStreamExists(string(topic), 8) if err != nil { log.Fatalf("Error creating kafka topic %s: %v", topic, err) diff --git a/examples/2-clicks/main.go b/examples/2-clicks/main.go index acf2e82b..51a0e888 100644 --- a/examples/2-clicks/main.go +++ b/examples/2-clicks/main.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "log" "net/http" "time" @@ -16,6 +17,8 @@ var ( brokers = []string{"127.0.0.1:9092"} topic goka.Stream = "user-click" group goka.Group = "mini-group" + + tmc *goka.TopicManagerConfig ) // A user is the object that is stored in the processor's group table @@ -28,6 +31,12 @@ type user struct { // group table. type userCodec struct{} +func init() { + tmc = goka.NewTopicManagerConfig() + tmc.Table.Replication = 1 + tmc.Stream.Replication = 1 +} + // Encodes a user into []byte func (jc *userCodec) Encode(value interface{}) ([]byte, error) { if _, isUser := value.(*user); !isUser { @@ -82,14 +91,11 @@ func process(ctx goka.Context, msg interface{}) { fmt.Printf("[proc] key: %s clicks: %d, msg: %v\n", ctx.Key(), u.Clicks, msg) } -func runProcessor() { +func runProcessor(initialized chan struct{}) { g := goka.DefineGroup(group, goka.Input(topic, new(codec.String), process), goka.Persist(new(userCodec)), ) - tmc := goka.NewTopicManagerConfig() - tmc.Table.Replication = 1 - tmc.Stream.Replication = 1 p, err := goka.NewProcessor(brokers, g, goka.WithTopicManagerBuilder(goka.TopicManagerBuilderWithTopicManagerConfig(tmc)), @@ -99,10 +105,14 @@ func runProcessor() { panic(err) } + close(initialized) + p.Run(context.Background()) } -func runView() { +func runView(initialized chan struct{}) { + <-initialized + view, err := goka.NewView(brokers, goka.GroupTable(group), new(userCodec), @@ -124,7 +134,21 @@ func runView() { } func main() { + tm, err := goka.NewTopicManager(brokers, goka.DefaultConfig(), tmc) + if err != nil { + log.Fatalf("Error creating topic manager: %v", err) + } + defer tm.Close() + err = tm.EnsureStreamExists(string(topic), 8) + if err != nil { + log.Printf("Error creating kafka topic %s: %v", topic, err) + } + + // When this example is run the first time, wait for creation of all internal topics (this is done + // by goka.NewProcessor) + initialized := make(chan struct{}) + go runEmitter() - go runProcessor() - runView() + go runProcessor(initialized) + runView(initialized) } diff --git a/examples/3-messaging/README.md b/examples/3-messaging/README.md index d3ed63f9..5ed7182a 100644 --- a/examples/3-messaging/README.md +++ b/examples/3-messaging/README.md @@ -96,7 +96,7 @@ func collect(ctx goka.Context, msg interface{}) { ml = append(ml, *m) if len(ml) > maxMessages { - ml = ml[len(ml)-maxMessages-1:] + ml = ml[len(ml)-maxMessages:] } ctx.SetValue(ml) } diff --git a/examples/3-messaging/blocker/blocker.go b/examples/3-messaging/blocker/blocker.go index a9ba948e..71d27d08 100644 --- a/examples/3-messaging/blocker/blocker.go +++ b/examples/3-messaging/blocker/blocker.go @@ -3,8 +3,8 @@ package blocker import ( "context" "encoding/json" - "github.com/lovoo/goka" + "github.com/lovoo/goka/examples/3-messaging/topicinit" ) var ( @@ -58,6 +58,10 @@ func block(ctx goka.Context, msg interface{}) { ctx.SetValue(s) } +func PrepareTopics(brokers []string) { + topicinit.EnsureStreamExists(string(Stream), brokers) +} + func Run(ctx context.Context, brokers []string) func() error { return func() error { g := goka.DefineGroup(group, @@ -68,6 +72,7 @@ func Run(ctx context.Context, brokers []string) func() error { if err != nil { return err } + return p.Run(ctx) } } diff --git a/examples/3-messaging/cmd/processor/main.go b/examples/3-messaging/cmd/processor/main.go index 93247dc4..44f29a79 100644 --- a/examples/3-messaging/cmd/processor/main.go +++ b/examples/3-messaging/cmd/processor/main.go @@ -31,6 +31,23 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) grp, ctx := errgroup.WithContext(ctx) + // Create topics if they do not already exist + if *runCollector { + collector.PrepareTopics(brokers) + } + if *runFilter { + filter.PrepareTopics(brokers) + } + if *runBlocker { + blocker.PrepareTopics(brokers) + } + if *runDetector { + detector.PrepareTopics(brokers) + } + if *runTranslator { + translator.PrepareTopics(brokers) + } + if *runCollector { log.Println("starting collector") grp.Go(collector.Run(ctx, brokers)) diff --git a/examples/3-messaging/collector/collector.go b/examples/3-messaging/collector/collector.go index f90cbd85..73fd86e1 100644 --- a/examples/3-messaging/collector/collector.go +++ b/examples/3-messaging/collector/collector.go @@ -3,9 +3,9 @@ package collector import ( "context" "encoding/json" - "github.com/lovoo/goka" "github.com/lovoo/goka/examples/3-messaging" + "github.com/lovoo/goka/examples/3-messaging/topicinit" ) const maxMessages = 5 @@ -42,6 +42,10 @@ func collect(ctx goka.Context, msg interface{}) { ctx.SetValue(ml) } +func PrepareTopics(brokers []string) { + topicinit.EnsureStreamExists(string(messaging.ReceivedStream), brokers) +} + func Run(ctx context.Context, brokers []string) func() error { return func() error { g := goka.DefineGroup(group, diff --git a/examples/3-messaging/detector/detector.go b/examples/3-messaging/detector/detector.go index 8e730e57..706a62c3 100644 --- a/examples/3-messaging/detector/detector.go +++ b/examples/3-messaging/detector/detector.go @@ -3,10 +3,10 @@ package detector import ( "context" "encoding/json" - "github.com/lovoo/goka" "github.com/lovoo/goka/examples/3-messaging" "github.com/lovoo/goka/examples/3-messaging/blocker" + "github.com/lovoo/goka/examples/3-messaging/topicinit" ) const ( @@ -49,6 +49,10 @@ func detectSpammer(ctx goka.Context, c *Counters) bool { return total >= minMessages && rate >= maxRate } +func PrepareTopics(brokers []string) { + topicinit.EnsureStreamExists(string(messaging.SentStream), brokers) +} + func Run(ctx context.Context, brokers []string) func() error { return func() error { g := goka.DefineGroup(group, diff --git a/examples/3-messaging/filter/filter.go b/examples/3-messaging/filter/filter.go index 666efc2a..c0433716 100644 --- a/examples/3-messaging/filter/filter.go +++ b/examples/3-messaging/filter/filter.go @@ -2,12 +2,12 @@ package filter import ( "context" - "strings" - "github.com/lovoo/goka" messaging "github.com/lovoo/goka/examples/3-messaging" "github.com/lovoo/goka/examples/3-messaging/blocker" + "github.com/lovoo/goka/examples/3-messaging/topicinit" "github.com/lovoo/goka/examples/3-messaging/translator" + "strings" ) var ( @@ -41,6 +41,16 @@ func translate(ctx goka.Context, m *messaging.Message) *messaging.Message { } } +func PrepareTopics(brokers []string) { + topicinit.EnsureStreamExists(string(messaging.SentStream), brokers) + + // We refer to these tables, ensure that they exist initially also in the + // case that the translator or blocker processors are not started + for _, topicName := range []string{string(translator.Table), string(blocker.Table)} { + topicinit.EnsureTableExists(topicName, brokers) + } +} + func Run(ctx context.Context, brokers []string) func() error { return func() error { g := goka.DefineGroup(group, @@ -53,6 +63,7 @@ func Run(ctx context.Context, brokers []string) func() error { if err != nil { return err } + return p.Run(ctx) } } diff --git a/examples/3-messaging/topicinit/topicinit.go b/examples/3-messaging/topicinit/topicinit.go new file mode 100644 index 00000000..6c5282f6 --- /dev/null +++ b/examples/3-messaging/topicinit/topicinit.go @@ -0,0 +1,35 @@ +package topicinit + +import ( + "github.com/lovoo/goka" + "log" +) + +// EnsureStreamExists is a convenience wrapper for TopicManager.EnsureStreamExists +func EnsureStreamExists(topic string, brokers []string) { + tm := createTopicManager(brokers) + defer tm.Close() + err := tm.EnsureStreamExists(topic, 8) + if err != nil { + log.Printf("Error creating kafka topic %s: %v", topic, err) + } +} + +// EnsureTableExists is a convenience wrapper for TopicManager.EnsureTableExists +func EnsureTableExists(topic string, brokers []string) { + tm := createTopicManager(brokers) + defer tm.Close() + err := tm.EnsureTableExists(topic, 8) + if err != nil { + log.Printf("Error creating kafka topic %s: %v", topic, err) + } +} + +func createTopicManager(brokers []string) goka.TopicManager { + tmc := goka.NewTopicManagerConfig() + tm, err := goka.NewTopicManager(brokers, goka.DefaultConfig(), tmc) + if err != nil { + log.Fatalf("Error creating topic manager: %v", err) + } + return tm +} diff --git a/examples/3-messaging/translator/translator.go b/examples/3-messaging/translator/translator.go index deb6b5d0..c8ccf801 100644 --- a/examples/3-messaging/translator/translator.go +++ b/examples/3-messaging/translator/translator.go @@ -2,9 +2,9 @@ package translator import ( "context" - "github.com/lovoo/goka" "github.com/lovoo/goka/codec" + "github.com/lovoo/goka/examples/3-messaging/topicinit" ) var ( @@ -21,6 +21,10 @@ func translate(ctx goka.Context, msg interface{}) { ctx.SetValue(msg.(string)) } +func PrepareTopics(brokers []string) { + topicinit.EnsureStreamExists(string(Stream), brokers) +} + func Run(ctx context.Context, brokers []string) func() error { return func() error { g := goka.DefineGroup(group, @@ -31,6 +35,7 @@ func Run(ctx context.Context, brokers []string) func() error { if err != nil { return err } + return p.Run(ctx) } } diff --git a/examples/5-multiple/main.go b/examples/5-multiple/main.go index 3a1ad1c4..2d8d2439 100644 --- a/examples/5-multiple/main.go +++ b/examples/5-multiple/main.go @@ -115,7 +115,24 @@ func process(ctx goka.Context, msg interface{}) { ctx.SetValue(u) } -func runProcessor(ctx context.Context, monitor *monitor.Server, query *query.Server) error { +func runProcessor(ctx context.Context, + monitor *monitor.Server, + query *query.Server, + groupInitialized chan struct{}) error { + + tmc := goka.NewTopicManagerConfig() + tm, err := goka.NewTopicManager(brokers, goka.DefaultConfig(), tmc) + if err != nil { + log.Fatalf("Error creating topic manager: %v", err) + } + defer tm.Close() + for _, topicName := range []string{string(inputA), string(inputB)} { + err = tm.EnsureStreamExists(topicName, 8) + if err != nil { + log.Printf("Error creating kafka topic %s: %v", topicName, err) + } + } + p, err := goka.NewProcessor(brokers, goka.DefineGroup(group, goka.Input(inputA, new(codec.String), process), goka.Input(inputB, new(codec.String), process), @@ -123,6 +140,11 @@ func runProcessor(ctx context.Context, monitor *monitor.Server, query *query.Ser ), goka.WithStorageBuilder(randomStorageBuilder("proc")), ) + if err != nil { + return err + } + + close(groupInitialized) // attach the processor to the monitor monitor.AttachProcessor(p) @@ -135,7 +157,14 @@ func runProcessor(ctx context.Context, monitor *monitor.Server, query *query.Ser return err } -func runView(ctx context.Context, errg *multierr.ErrGroup, root *mux.Router, monitor *monitor.Server) error { +func runView(ctx context.Context, + errg *multierr.ErrGroup, + root *mux.Router, + monitor *monitor.Server, + groupInitialized chan struct{}) error { + + <-groupInitialized + view, err := goka.NewView(brokers, goka.GroupTable(group), new(userCodec), @@ -220,6 +249,9 @@ func main() { cancel() }() + // runView uses the group table, which first has to be initialized by runProcessor + groupInitialized := make(chan struct{}) + errg, ctx := multierr.NewErrGroup(ctx) errg.Go(func() error { defer log.Printf("emitter done") @@ -227,9 +259,9 @@ func main() { }) errg.Go(func() error { defer log.Printf("processor done") - return runProcessor(ctx, monitorServer, queryServer) + return runProcessor(ctx, monitorServer, queryServer, groupInitialized) }) - if err := runView(ctx, errg, root, monitorServer); err != nil { + if err := runView(ctx, errg, root, monitorServer, groupInitialized); err != nil { log.Printf("Error running view, will shutdown: %v", err) cancel() } diff --git a/examples/6-reconnecting-view/main.go b/examples/6-reconnecting-view/main.go index 65e62db8..3c9da134 100644 --- a/examples/6-reconnecting-view/main.go +++ b/examples/6-reconnecting-view/main.go @@ -12,12 +12,25 @@ import ( ) func main() { + var brokers = []string{"127.0.0.1:9092"} + var topic goka.Table = "restartable-view-test-table" + + tmc := goka.NewTopicManagerConfig() + tm, err := goka.NewTopicManager(brokers, goka.DefaultConfig(), tmc) + if err != nil { + log.Fatalf("Error creating topic manager: %v", err) + } + defer tm.Close() + err = tm.EnsureStreamExists(string(topic), 8) + if err != nil { + log.Printf("Error creating kafka topic %s: %v", topic, err) + } view, err := goka.NewView( // connect to example kafka cluster []string{"localhost:9092"}, // name does not matter, table will be empty - "restartable-view-test-table", + topic, // codec doesn't matter, the table will be empty new(codec.String), // start the view autoconnecting diff --git a/examples/7-redis/consumer.go b/examples/7-redis/consumer.go index 28e52a11..d78b1b4c 100644 --- a/examples/7-redis/consumer.go +++ b/examples/7-redis/consumer.go @@ -2,6 +2,7 @@ package main import ( "context" + "log" "github.com/lovoo/goka" storage "github.com/lovoo/goka/storage/redis" @@ -19,6 +20,17 @@ type Publisher interface { func Consume(pub Publisher, brokers []string, group string, stream string, store string, namespace string) error { codec := new(Codec) + tmc := goka.NewTopicManagerConfig() + tm, err := goka.NewTopicManager(brokers, goka.DefaultConfig(), tmc) + if err != nil { + log.Fatalf("Error creating topic manager: %v", err) + } + defer tm.Close() + err = tm.EnsureStreamExists(stream, 8) + if err != nil { + log.Printf("Error creating kafka topic %s: %v", stream, err) + } + input := goka.Input(goka.Stream(stream), codec, func(ctx goka.Context, msg interface{}) { event, ok := msg.(*Event) if ok { diff --git a/examples/8-monitoring/main.go b/examples/8-monitoring/main.go index 5a2db609..06ca6366 100644 --- a/examples/8-monitoring/main.go +++ b/examples/8-monitoring/main.go @@ -122,7 +122,10 @@ func runStatelessProcessor(ctx context.Context, monitor *monitor.Server) error { return p.Run(ctx) } -func runJoinProcessor(ctx context.Context, monitor *monitor.Server) error { +func runJoinProcessor(ctx context.Context, + monitor *monitor.Server, + joinGroupInitialized chan struct{}) error { + g := goka.DefineGroup(joinGroup, goka.Input(topic, new(codec.String), @@ -145,13 +148,19 @@ func runJoinProcessor(ctx context.Context, monitor *monitor.Server) error { return err } + close(joinGroupInitialized) + // attach the processor to the monitor monitor.AttachProcessor(p) return p.Run(ctx) } -func runProcessor(ctx context.Context, monitor *monitor.Server, query *query.Server, actions *actions.Server) error { +func runProcessor(ctx context.Context, + monitor *monitor.Server, + query *query.Server, + actions *actions.Server, + joinGroupInitialized chan struct{}) error { // helper function that waits the configured number of times waitVisitor := func(ctx goka.Context, value interface{}) { @@ -175,6 +184,9 @@ func runProcessor(ctx context.Context, monitor *monitor.Server, query *query.Ser goka.Visitor("action2", waitVisitor), goka.Visitor("action3", waitVisitor), ) + + <-joinGroupInitialized + p, err := goka.NewProcessor(brokers, g) if err != nil { return err @@ -285,6 +297,7 @@ func main() { if err != nil { log.Fatalf("error creating topic manager: %v", err) } + defer tmgr.Close() tmgr.EnsureStreamExists(string(topic), 2) tmgr.EnsureTableExists(string(goka.GroupTable(group)), 2) @@ -313,9 +326,13 @@ func main() { defer log.Printf("emitter done") return runEmitter(ctx) }) + + // in runProcessor we use joinGroup, which depends on its initialization in runJoinProcessor + joinGroupInitialized := make(chan struct{}) + errg.Go(func() error { defer log.Printf("processor done") - return runProcessor(ctx, monitorServer, queryServer, actionServer) + return runProcessor(ctx, monitorServer, queryServer, actionServer, joinGroupInitialized) }) errg.Go(func() error { defer log.Printf("stateless processor done") @@ -323,7 +340,7 @@ func main() { }) errg.Go(func() error { defer log.Printf("join procdessor done") - return runJoinProcessor(ctx, monitorServer) + return runJoinProcessor(ctx, monitorServer, joinGroupInitialized) }) if err := runView(errg, ctx, root, monitorServer); err != nil { log.Printf("Error running view, will shutdown: %v", err)