-
Notifications
You must be signed in to change notification settings - Fork 178
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Proposal: replace Start() and Stop() with Run(context.Context) #120
Comments
Hey guys, what do you think of this change in the interface? Would it make the library easier to us? @SamiHiltunen @j0hnsmith @frairon @burdiyan @andrewmunro I'd like to either create a PR for this or close the ticket. (cc: @edganiukov we are now using |
I support this. It would simplify the lifecycle management. |
I like the idea of using a single context to stop multiple processors. I actually have some time this week, I'll take a look maybe tomorrow if there's a general 👍 |
I like it. Looks a lot cleaner than I wonder if you could even abstract this further for beginners using the framework, maybe that patterns module you were talking about elsewhere... 🤔 |
I actually used to wrap Start and Stop methods exactly like this using context.Context. But I found out that when you want to control the order in which multiple processors should stop, then you'll have a problem. So if you have processor A and B, and you need to stop B before A using a single context doesn't give you that control. I guess that's why standard library have io.Closer interface that many things like http.Server implement providing It's also handy that So, I personally would not implement Run with context, unless what I described above doesn't make sense for anybody :) |
@burdiyan if I am not mistaken, you can have such logic with context as well - using either different contexts for each processor or child-parent context pattern (child context will be canceled after parent). |
@burdiyan that is an interesting point! AFAIK, we don't have that issue, but I can imagine having it at some point. So either we require the user to apply child contexts or we provide a Thanks for the quick feedback from everybody! That's awesome. |
PR #127 replaces My feeling is that for simple examples, things get more complicated because one has to create a context and a cancel function. But I like the result in Next step to close this issue would be to come up with some simple wrapper to still provide the |
PR #128 introduce Processors and views are "runnables" because they implement
Here is a complete example of how to use runsets (other variants are possible): // create processors and views (they are runnables because have Run(ctx) method)
p, _ := goka.NewProcessor(brokers, DefineGroup(group, Input(topic, codec, cb)))
v, _ := goka.NewView(brokers, topic2, codec)
// start runnables creating a Runset.
rs := goka.Start(p,v)
// wait for bad things to happen
wait := make(chan os.Signal, 1)
signal.Notify(wait, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
select {
case <-rs.Done(): // wait for one of the runnables to return
case <-wait: // wait for SIGINT/SIGTERM
rs.Stop() // gracefully stop runnables
}
// wait for all runnables to return and collect error messages
if err := rs.Wait(); err != nil {
log.Fatalln(err)
} If the order of stopping the runnables is relevant, one can create multiple runsets (eg, one for each processors/view) and stop them accordingly. @burdiyan do you think that this would be helpful and sufficient for your use cases? Do you have suggestions how to improve it? |
I normally do signal trapping as part of top level errgroup, like this: g, ctx := errgroup.WithContext(context.Background())
g.Go(func() error {
done := make(chan os.Signal, 1)
signal.Notify(done, syscall.SIGTERM, syscall.SIGINT)
select {
case <-ctx.Done():
return ctx.Err()
case <-done:
signal.Stop(done)
return errors.New("signal received")
}
})
g.Go(func() error {
// Start monitoring HTTP server
})
g.Go(func() error {
// Start goka processor.
})
g.Go(func() error {
// Start another goka processor.
})
// Separate goroutine for shutdown logic.
g.Go(func() error {
<-ctx.Done()
// Stop processor 1.
// Stop processor 2.
// Shutdown the HTTP server.
// Do other cleanups.
})
g.Wait() // plus error handling for errgroup. This way the program ends cleanly if one of the "actors" in the errgroup returns error or signal is trapped. I'm concerned about hiding context in I'd be fine with just having The ordering problem for shutdown could be handled with derived context as discussed elsewhere. |
The change has been implemented and merged. Thanks for the feedback from everybody. |
Currently the
Start()
method of processors and views useerrgroup
to create goroutines for each partition and passes acontext.Context
to them. TheStop()
method simply cancels the context to stop all goroutines.I think it would be nice to use the same mechanism to control multiple processors and views running on the same program. For that I'd propose to replace the
Start()
andStop()
methods with aRun(context.Context)
method. The usage would be something like this:Perhaps we could still support
Start()
andStop()
via some wrapper... something like this:Any opinions?
The text was updated successfully, but these errors were encountered: