Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/run context #127

Merged
merged 3 commits into from
May 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ To locally start a dockerized Zookeeper and Kafka instances, execute `make start
package main

import (
"context"
"fmt"
"log"
"os"
Expand Down Expand Up @@ -124,16 +125,20 @@ func runProcessor() {
if err != nil {
log.Fatalf("error creating processor: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
done := make(chan bool)
go func() {
if err = p.Start(); err != nil {
defer close(done)
if err = p.Run(ctx); err != nil {
log.Fatalf("error running processor: %v", err)
}
}()

wait := make(chan os.Signal, 1)
signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
<-wait // wait for SIGINT/SIGTERM
p.Stop() // gracefully stop processor
cancel() // gracefully stop processor
<-done
}

func main() {
Expand Down
17 changes: 4 additions & 13 deletions examples/1-simplest/main.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package main

import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
Expand Down Expand Up @@ -60,16 +58,9 @@ func runProcessor() {
if err != nil {
log.Fatalf("error creating processor: %v", err)
}
go func() {
if err = p.Start(); err != nil {
log.Fatalf("error running processor: %v", err)
}
}()

wait := make(chan os.Signal, 1)
signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
<-wait // wait for SIGINT/SIGTERM
p.Stop() // gracefully stop processor
if err = p.Run(context.Background()); err != nil {
log.Fatalf("error running processor: %v", err)
}
}

func main() {
Expand Down
5 changes: 2 additions & 3 deletions examples/2-clicks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ update it in our group table (10) by calling `ctx.SetValue(u)`.
We conclude process() with a print statement showing message’s content, the
current count of the user, and the user ID fetched with ctx.Key().

The context interface never returns errors to the callbacks. Instead, if an error is encountered while executing the context functions, the processor instance is stopped and its Start() method returns an error.
The context interface never returns errors to the callbacks. Instead, if an error is encountered while executing the context functions, the processor instance is stopped and its Run() method returns an error.


We configure the processor using `goka.DefineGroup`, which we later
Expand Down Expand Up @@ -128,8 +128,7 @@ func runView() {
if err != nil {
panic(err)
}
go view.Start()
defer view.Stop()
go view.Run(context.Background())

root := mux.NewRouter()
root.HandleFunc("/{key}", func(w http.ResponseWriter, r *http.Request) {
Expand Down
14 changes: 5 additions & 9 deletions examples/2-clicks/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"encoding/json"
"fmt"
"net/http"
Expand Down Expand Up @@ -91,12 +92,7 @@ func runProcessor() {
panic(err)
}

err = p.Start()
if err != nil {
panic(err)
} else {
fmt.Println("Processor stopped without errors")
}
p.Run(context.Background())
}

func runView() {
Expand All @@ -107,8 +103,6 @@ func runView() {
if err != nil {
panic(err)
}
go view.Start()
defer view.Stop()

root := mux.NewRouter()
root.HandleFunc("/{key}", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -117,7 +111,9 @@ func runView() {
w.Write(data)
})
fmt.Println("View opened at http://localhost:9095/")
http.ListenAndServe(":9095", root)
go http.ListenAndServe(":9095", root)

view.Run(context.Background())
}

func main() {
Expand Down
6 changes: 0 additions & 6 deletions examples/3-messaging/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ g := goka.DefineGroup(goka.Group("collector"),
goka.Input(messaging.ReceivedStream, new(messaging.MessageCodec), collect),
)
p, _ := goka.NewProcessor(brokers, g)
go p.Start()

```

### Feed endpoint
Expand Down Expand Up @@ -165,7 +163,6 @@ view, _ := goka.NewView(
collector.Table,
new(collector.MessageListCodec),
)
go view.Start()
router.HandleFunc("/{user}/feed", feed(view)).Methods("GET")
```

Expand Down Expand Up @@ -289,8 +286,6 @@ g := goka.DefineGroup(goka.Group("filter"),
)

p, _ := goka.NewProcessor(brokers, g)
_ = p.Start()

```

Nothing has to be changed in the collector processor or in the feed endpoint.
Expand Down Expand Up @@ -493,7 +488,6 @@ g := goka.DefineGroup(goka.Group("detector"),
)

p, _ := goka.NewProcessor(brokers, g)
_ = p.Start()
```

### Running the example
Expand Down
21 changes: 12 additions & 9 deletions examples/3-messaging/blocker/blocker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package blocker

import (
"context"
"encoding/json"

"github.com/lovoo/goka"
Expand Down Expand Up @@ -57,14 +58,16 @@ func block(ctx goka.Context, msg interface{}) {
ctx.SetValue(s)
}

func Run(brokers []string) {
g := goka.DefineGroup(group,
goka.Input(Stream, new(BlockEventCodec), block),
goka.Persist(new(BlockValueCodec)),
)
if p, err := goka.NewProcessor(brokers, g); err != nil {
panic(err)
} else if err = p.Start(); err != nil {
panic(err)
func Run(ctx context.Context, brokers []string) func() error {
return func() error {
g := goka.DefineGroup(group,
goka.Input(Stream, new(BlockEventCodec), block),
goka.Persist(new(BlockValueCodec)),
)
p, err := goka.NewProcessor(brokers, g)
if err != nil {
return err
}
return p.Run(ctx)
}
}
25 changes: 17 additions & 8 deletions examples/3-messaging/cmd/processor/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"flag"
"log"
"os"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/lovoo/goka/examples/3-messaging/detector"
"github.com/lovoo/goka/examples/3-messaging/filter"
"github.com/lovoo/goka/examples/3-messaging/translator"
"golang.org/x/sync/errgroup"
)

var (
Expand All @@ -26,33 +28,40 @@ var (

func main() {
flag.Parse()
ctx, cancel := context.WithCancel(context.Background())
grp, ctx := errgroup.WithContext(ctx)

if *runCollector {
log.Println("starting collector")
go collector.Run(brokers)
grp.Go(collector.Run(ctx, brokers))
}
if *runFilter {
log.Println("starting filter")
go filter.Run(brokers)
grp.Go(filter.Run(ctx, brokers))
}
if *runBlocker {
log.Println("starting blocker")
go blocker.Run(brokers)
grp.Go(blocker.Run(ctx, brokers))
}
if *runDetector {
log.Println("starting detector")
go detector.Run(brokers)
grp.Go(detector.Run(ctx, brokers))
}
if *runTranslator {
log.Println("starting translator")
go translator.Run(brokers)
grp.Go(translator.Run(ctx, brokers))
}

// Wait for SIGINT/SIGTERM
waiter := make(chan os.Signal, 1)
signal.Notify(waiter, syscall.SIGINT, syscall.SIGTERM)

select {
case signal := <-waiter:
log.Printf("Got interrupted by %v", signal)
case <-waiter:
case <-ctx.Done():
}
cancel()
if err := grp.Wait(); err != nil {
log.Println(err)
}
log.Println("done")
}
21 changes: 12 additions & 9 deletions examples/3-messaging/collector/collector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package collector

import (
"context"
"encoding/json"

"github.com/lovoo/goka"
Expand Down Expand Up @@ -41,14 +42,16 @@ func collect(ctx goka.Context, msg interface{}) {
ctx.SetValue(ml)
}

func Run(brokers []string) {
g := goka.DefineGroup(group,
goka.Input(messaging.ReceivedStream, new(messaging.MessageCodec), collect),
goka.Persist(new(MessageListCodec)),
)
if p, err := goka.NewProcessor(brokers, g); err != nil {
panic(err)
} else if err = p.Start(); err != nil {
panic(err)
func Run(ctx context.Context, brokers []string) func() error {
return func() error {
g := goka.DefineGroup(group,
goka.Input(messaging.ReceivedStream, new(messaging.MessageCodec), collect),
goka.Persist(new(MessageListCodec)),
)
p, err := goka.NewProcessor(brokers, g)
if err != nil {
return err
}
return p.Run(ctx)
}
}
64 changes: 34 additions & 30 deletions examples/3-messaging/detector/detector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package detector

import (
"context"
"encoding/json"

"github.com/lovoo/goka"
Expand Down Expand Up @@ -48,38 +49,41 @@ func detectSpammer(ctx goka.Context, c *Counters) bool {
return total >= minMessages && rate >= maxRate
}

func Run(brokers []string) {
g := goka.DefineGroup(group,
goka.Input(messaging.SentStream, new(messaging.MessageCodec), func(ctx goka.Context, msg interface{}) {
c := getValue(ctx)
c.Sent++
ctx.SetValue(c)
func Run(ctx context.Context, brokers []string) func() error {
return func() error {
g := goka.DefineGroup(group,
goka.Input(messaging.SentStream, new(messaging.MessageCodec), func(ctx goka.Context, msg interface{}) {
c := getValue(ctx)
c.Sent++
ctx.SetValue(c)

// check if sender is a spammer
if detectSpammer(ctx, c) {
ctx.Emit(blocker.Stream, ctx.Key(), new(blocker.BlockEvent))
}
// check if sender is a spammer
if detectSpammer(ctx, c) {
ctx.Emit(blocker.Stream, ctx.Key(), new(blocker.BlockEvent))
}

// Loop to receiver
m := msg.(*messaging.Message)
ctx.Loopback(m.To, m)
}),
goka.Loop(new(messaging.MessageCodec), func(ctx goka.Context, msg interface{}) {
c := getValue(ctx)
c.Received++
ctx.SetValue(c)
// Loop to receiver
m := msg.(*messaging.Message)
ctx.Loopback(m.To, m)
}),
goka.Loop(new(messaging.MessageCodec), func(ctx goka.Context, msg interface{}) {
c := getValue(ctx)
c.Received++
ctx.SetValue(c)

// check if receiver is a spammer
if detectSpammer(ctx, c) {
ctx.Emit(blocker.Stream, ctx.Key(), new(blocker.BlockEvent))
}
}),
goka.Output(blocker.Stream, new(blocker.BlockEventCodec)),
goka.Persist(new(CountersCodec)),
)
if p, err := goka.NewProcessor(brokers, g); err != nil {
panic(err)
} else if err = p.Start(); err != nil {
panic(err)
// check if receiver is a spammer
if detectSpammer(ctx, c) {
ctx.Emit(blocker.Stream, ctx.Key(), new(blocker.BlockEvent))
}
}),
goka.Output(blocker.Stream, new(blocker.BlockEventCodec)),
goka.Persist(new(CountersCodec)),
)
p, err := goka.NewProcessor(brokers, g)
if err != nil {
return err
}

return p.Run(ctx)
}
}
Loading