Skip to content

Commit

Permalink
fix: ensure the task hasn't finished before interrupt it
Browse files Browse the repository at this point in the history
  • Loading branch information
jvillafanez committed Apr 10, 2024
1 parent 6a64918 commit 208c53a
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 6 deletions.
21 changes: 15 additions & 6 deletions ocis-pkg/runner/grouprunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,15 @@ func (gr *GroupRunner) Run(ctx context.Context) []*Result {
// interrupt the rest of the runners
for _, runner := range gr.runners {
if _, ok := results[runner.ID]; !ok {
// there might still be race conditions because the result might not have
// been made available even though the runner has finished. We assume
// that calling the `Interrupt` method multiple times and / or calling
// the `Interrupt` method when the task has finished is safe
runner.Interrupt()
select {
case <-runner.Finished():
// No data should be sent through the channel, so we'd be
// here only if the channel is closed. This means the task
// has finished and we don't need to interrupt. We do
// nothing in this case
default:
runner.Interrupt()
}
}
}

Expand Down Expand Up @@ -122,8 +126,13 @@ func (gr *GroupRunner) RunAsync(ch chan<- *Result) {
//
// As said, this will affect ALL the tasks in the group. It isn't possible to
// try to stop just one task.
// If a task has finished, the corresponding stopper won't be called
func (gr *GroupRunner) Interrupt() {
for _, runner := range gr.runners {
runner.Interrupt()
select {
case <-runner.Finished():
default:
runner.Interrupt()
}
}
}
13 changes: 13 additions & 0 deletions ocis-pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Runner struct {
ID string
fn Runable
interrupt Stopper
finished chan struct{}
}

// New will create a new runner.
Expand All @@ -28,6 +29,7 @@ func New(id string, fn Runable, interrupt Stopper) *Runner {
ID: id,
fn: fn,
interrupt: interrupt,
finished: make(chan struct{}),
}
}

Expand Down Expand Up @@ -85,11 +87,22 @@ func (r *Runner) Interrupt() {
r.interrupt()
}

// Finished will return a receive-only channel that can be used to know when
// the task has finished but the result hasn't been made available yet. The
// channel will be closed (without sending any message) when the task has finished.
// This can be used specially with the `RunAsync` method when multiple runners
// use the same channel: results could be waiting on your side of the channel
func (r *Runner) Finished() <-chan struct{} {
return r.finished
}

// doTask will perform this runner's task and write the result in the provided
// channel. The channel will be closed if requested.
func (r *Runner) doTask(ch chan<- *Result, closeChan bool) {
err := r.fn()

close(r.finished)

result := &Result{
RunnerID: r.ID,
RunnerError: err,
Expand Down

0 comments on commit 208c53a

Please sign in to comment.