Skip to content

Commit

Permalink
feat: add runners to startup the ocis' services
Browse files Browse the repository at this point in the history
  • Loading branch information
jvillafanez committed Apr 9, 2024
1 parent 80ef8ca commit c7119a3
Show file tree
Hide file tree
Showing 3 changed files with 269 additions and 0 deletions.
130 changes: 130 additions & 0 deletions ocis-pkg/runner/grouprunner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package runner

import (
"context"
)

// GroupRunner represent a group of tasks that need to run together.
// The expectation is that all the tasks will run at the same time, and when
// one of them stops, the rest will also stop.
//
// The GroupRunner is intended to be used to run multiple services, which are
// more or less independent from eachother, but at the same time it doesn't
// make sense to have any of them stopped while the rest are running.
// Basically, either all of them run, or none of them.
// For example, you can have a GRPC and HTTP servers running, each of them
// providing a piece of functionality, however, if any of them fails, the
// feature provided by them would be incomplete or broken.
//
// Note that, as services, the task aren't expected to stop by default.
// This means that, if a task finishes naturally, the rest of the task will
// asked to stop as well.
type GroupRunner struct {
runners []*Runner
}

// NewGroupRunner will create a GroupRunner
func NewGroupRunner() *GroupRunner {
return &GroupRunner{
runners: []*Runner{},
}
}

// Add will add a runner to the group.
//
// It's mandatory that each runner in the group has an unique id, otherwise
// there will be issues
func (gr *GroupRunner) Add(r *Runner) {
gr.runners = append(gr.runners, r)
}

// Run will execute all the tasks in the group at the same time.
//
// Similarly to the "regular" runner's `Run` method, the execution thread
// will be blocked here until all tasks are completed, and their results
// will be available (each result will have the runner's id so it's easy to
// find which one failed). Note that there is no guarantee about the result's
// order, so the first result in the slice might or might not be the first
// result to be obtained.
//
// When the context is marked as done, the groupRunner will call all the
// stoppers for each runner to notify each task to stop. Note that the tasks
// might still take a while to complete.
//
// If a task finishes naturally (with the context still "alive"), it will also
// cause the groupRunner to call the stoppers of the rest of the tasks. So if
// a task finishes, the rest will also finish.
// Note that it is NOT expected for the finished task's stopper to be called
// in this case.
func (gr *GroupRunner) Run(ctx context.Context) []*Result {
results := make(map[string]*Result)

ch := make(chan *Result, len(gr.runners)) // no need to block writing results
for _, runner := range gr.runners {
runner.RunNoContext(ch)
}

// wait for a result or for the context to be done
select {
case result := <-ch:
results[result.RunnerID] = result
case <-ctx.Done():
// Do nothing
}

// interrupt the rest of the runners
for _, runner := range gr.runners {
if _, ok := results[runner.ID]; !ok {
// there might still be race conditions because the result might not have
// been made available even though the runner has finished
runner.Interrupt()
}
}

// Having notified that the context has been finished, we still need to
// wait for the rest of the results
for i := len(results); i < len(gr.runners); i++ {
result := <-ch
results[result.RunnerID] = result
}

close(ch)

values := make([]*Result, 0, len(gr.runners))
for _, val := range results {
values = append(values, val)
}
return values
}

// RunNoContext will execute the tasks in the group asynchronously.
// Each tasks will run a separated goroutine (one goroutine per task), and then
// this method will finish.
// The tasks' result will be available in the provided channel when it's
// available, so you can wait for it if needed. It's up to you to decide
// to use a blocking or non-blocking channel, but the task will always finish
// before writing in the channel.
//
// This method guarantees that there will be the same number of results as the
// number of provided tasks (one result per task), but it won't guarantee
// any order
func (gr *GroupRunner) RunNoContext(ch chan<- *Result) {
for _, runner := range gr.runners {
runner.RunNoContext(ch)
}
}

// Interrupt will execute the stopper function of ALL the tasks, which should
// notify the tasks in order for them to finish.
// The stoppers will be called immediately but sequentially. This means that
// the second stopper won't be called until the first one has returned. This
// usually isn't a problem because the service `Stop`'s methods either don't
// take a long time to return, or they run asynchronously in another goroutine.
//
// As said, this will affect ALL the tasks in the group. It isn't possible to
// try to stop just one task.
func (gr *GroupRunner) Interrupt() {
for _, runner := range gr.runners {
runner.Interrupt()
}
}
104 changes: 104 additions & 0 deletions ocis-pkg/runner/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package runner

import (
"context"
)

// Runner represents the one executing a long running task, such as a server
// or a service.
// The ID of the runner is public to make identification easier, and the
// Result that it will generated will contain the same ID, so we can
// know which runner provided which result.
type Runner struct {
ID string
fn Runable
interrupt Stopper
}

// NewRunner will create a new runner.
// The runner will be created with the provided id (the id must be unique,
// otherwise undefined behavior might occur), and will run the provided
// runable task, using the "interrupt" function to stop that task if needed.
//
// Note that it's your responsibility to provide a proper stopper for the task.
// The runner will just call that method assuming it will be enough to
// eventually stop the task at some point.
func NewRunner(id string, fn Runable, interrupt Stopper) *Runner {
return &Runner{
ID: id,
fn: fn,
interrupt: interrupt,
}
}

// Run will execute the task associated to this runner in a synchronous way.
// The task will be spawned in a new goroutine, and the current thread will
// wait until the task finishes.
//
// The task will finish "naturally". The stopper will be called in the
// following ways:
// - Manually calling this runner's `Interrupt` method
// - When the provided context is done
// As said, it's expected that calling the provided stopper will be enough to
// make the task to eventually complete.
//
// Once the task finishes, the result will be returned.
//
// Some nice things you can do:
// - Use signal.NotifyContext(...) to call the stopper and provide a clean
// shutdown procedure when an OS signal is received
// - Use context.WithDeadline(...) or context.WithTimeout(...) to run the task
// for a limited time
func (r *Runner) Run(ctx context.Context) *Result {
ch := make(chan *Result)

go func(ch chan<- *Result) {
err := r.fn()

result := &Result{
RunnerID: r.ID,
RunnerError: err,
}
ch <- result
close(ch)
}(ch)

select {
case result := <-ch:
return result
case <-ctx.Done():
r.interrupt()
}

return <-ch
}

// RunNoContext will execute the task associated to this runner asynchronously.
// The task will be spawned in a new goroutine and this method will finish.
// The task's result will be written in the provided channel when it's
// available, so you can wait for it if needed. It's up to you to decide
// to use a blocking or non-blocking channel, but the task will always finish
// before writing in the channel.
//
// To interrupt the running task, the only option is to call the `Interrupt`
// method at some point.
func (r *Runner) RunNoContext(ch chan<- *Result) {
go func(ch chan<- *Result) {
err := r.fn()

result := &Result{
RunnerID: r.ID,
RunnerError: err,
}
ch <- result
// Do not close the channel here
}(ch)
}

// Interrupt will execute the stopper function, which should notify the task
// in order for it to finish.
// The stopper will be called immediately, although it's expected the
// consequences to take a while (task might need a while to stop)
func (r *Runner) Interrupt() {
r.interrupt()
}
35 changes: 35 additions & 0 deletions ocis-pkg/runner/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package runner

// Runable represent a task that can be executed by the Runner.
// It expected to be a long running task with an indefinite execution time,
// so it's suitable for servers or services.
// The task can eventually return an error, or nil if the execution finishes
// without errors
type Runable func() error

// Stopper represent a function that will stop the Runable.
// The stopper acts as a notification to the runable to know that the task
// needs to be finished now.
//
// The stopper won't need to crash the runable or force the runable to stop,
// instead, it will let the runable to know it has to stop and let it finish.
// This means that the runable might still run for a while.
//
// It's recommended the stopper to run asynchronously. This means that the
// stopper might need to spawn a goroutine. The intention is avoid blocking
// the running thread.
//
// Usually, the stoppers are the servers's `Shutdown()` or `Close()` methods,
// that will cause the server to start its shutdown procedure. As said, there
// is no need to force the shutdown, so graceful shutdowns are preferred if
// they're available
type Stopper func()

// RunnerResult represents the result of a runner.
// The result contains the provided runner's id (for easier identification
// in case of multiple results) and the runner's error, which is the result
// of the Runable function (might be nil if no error happened)
type Result struct {
RunnerID string
RunnerError error
}

0 comments on commit c7119a3

Please sign in to comment.