Skip to content

Commit

Permalink
Add context support to executor to allow jobs being interrupted
Browse files Browse the repository at this point in the history
  • Loading branch information
adamwasila committed Jan 29, 2020
1 parent 7feda0b commit a55f6f1
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 10 deletions.
30 changes: 28 additions & 2 deletions executor.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
package slowjoe

import (
"context"
"sync"
)

type executor struct {
closeHandler func()
panicHandler func(interface{})
jobs []func()
intJobs []func(context.Context)
}

// Runner is the interface that wraps basic, argumentless Run method
type Runner interface {
Run()
Run(ctx context.Context)
}

// Executor returns new instance of concurrent jobs executor
Expand All @@ -32,6 +34,12 @@ func Execute(jobs ...func()) func(*executor) {
}
}

func ExecuteWithContext(jobs ...func(context.Context)) func(*executor) {
return func(e *executor) {
e.intJobs = append(e.intJobs, jobs...)
}
}

// WhenAllFinished adds handler that is called when all jobs finishes. It will
// be called only once and only when all jobs quit no matter of the result
// or panic they raise.
Expand All @@ -50,12 +58,17 @@ func WhenPanic(panicHandler func(interface{})) func(*executor) {
}

// Run executes all jobs concurrently, call handlers - if needed and provided, then returns
func (e *executor) Run() {
func (e *executor) Run(ctx context.Context) {
var wg sync.WaitGroup
for _, job := range e.jobs {
wg.Add(1)
go wrappedJob(&wg, e.panicHandler, job)
}

for _, job := range e.intJobs {
wg.Add(1)
go wrappedInterruptableJob(ctx, &wg, e.panicHandler, job)
}
wg.Wait()
if e.closeHandler != nil {
e.closeHandler()
Expand All @@ -74,3 +87,16 @@ func wrappedJob(wg *sync.WaitGroup, onPanic func(interface{}), f func()) {
}
f()
}

func wrappedInterruptableJob(ctx context.Context, wg *sync.WaitGroup, onPanic func(interface{}), f func(context.Context)) {
defer wg.Done()
if onPanic != nil {
defer func() {
p := recover()
if p != nil {
onPanic(p)
}
}()
}
f(ctx)
}
119 changes: 112 additions & 7 deletions executor_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package slowjoe

import (
"context"
"strings"
"testing"
"time"
Expand All @@ -12,7 +13,9 @@ func TestEmptyExecutor(t *testing.T) {
Convey("Given empty executor", t, func() {
e := Executor()
Convey("When empty executor runs it should not panic", func() {
So(e.Run, ShouldNotPanic)
So(func() {
e.Run(context.Background())
}, ShouldNotPanic)
})
})
}
Expand Down Expand Up @@ -44,7 +47,7 @@ func TestSimpleConcurrentExecution(t *testing.T) {
}),
)
Convey("When executor runs", func() {
e.Run()
e.Run(context.Background())
result := readChannel(response)

Convey("Then all operations should be executed concurrently in correct order determined by internal delays", func() {
Expand All @@ -66,7 +69,7 @@ func TestExecutionFinalizer(t *testing.T) {
}),
)
Convey("When executor runs", func() {
e.Run()
e.Run(context.Background())
result := readChannel(response)

Convey("Then operation should run then finalizer", func() {
Expand Down Expand Up @@ -95,7 +98,7 @@ func TestExecutionWithTwoFinalizers(t *testing.T) {
}),
)
Convey("When executor runs", func() {
e.Run()
e.Run(context.Background())
result := readChannel(response)

Convey("Then operations should run and only last defined finalizer", func() {
Expand All @@ -117,7 +120,7 @@ func TestExecutionFinalizerFirst(t *testing.T) {
}),
)
Convey("When executor runs", func() {
e.Run()
e.Run(context.Background())
result := readChannel(response)

Convey("Then all operation should still be executed before finalizer", func() {
Expand All @@ -136,7 +139,7 @@ func TestExecutionOnlyFinalizer(t *testing.T) {
}),
)
Convey("When executor runs", func() {
e.Run()
e.Run(context.Background())
result := readChannel(response)

Convey("Then all operations should be executed concurrently in correct order determined by internal delays", func() {
Expand All @@ -160,7 +163,7 @@ func TestExecutionCatchingPanic(t *testing.T) {
}),
)
Convey("When executor runs it should not panic", func() {
e.Run()
e.Run(context.Background())
result := readChannel(response)

Convey("Then panic is succesfuly recovered", func() {
Expand All @@ -170,6 +173,108 @@ func TestExecutionCatchingPanic(t *testing.T) {
})
}

func TestNilContextJobExecution(t *testing.T) {
Convey("Given executor that has single job", t, func() {
var receivedCtx context.Context = context.Background()
e := Executor(
ExecuteWithContext(func(ctx context.Context) {
receivedCtx = ctx
}),
)
Convey("When executor runs with nil context", func() {
e.Run(nil)

Convey("Then run should execute with no panic and context received in job should be nil as well", func() {
So(receivedCtx, ShouldBeNil)
})
})
})
}

func TestInterruptedJobExecution(t *testing.T) {
Convey("Given executor that executes single job and checks for interrupt inbetween", t, func() {
var response chan rune = make(chan rune, 6)
e := Executor(
ExecuteWithContext(func(ctx context.Context) {
response <- 'A'

time.Sleep(100 * time.Millisecond)

if ctx.Err() != nil {
response <- 'Q'
}
response <- 'B'

time.Sleep(200 * time.Millisecond)

if ctx.Err() != nil {
response <- 'Q'
}

time.Sleep(100 * time.Millisecond)

response <- 'C'

if ctx.Err() != nil {
response <- 'Q'
return
}
}),
)
Convey("When executor runs but is interrupted in the middle", func() {
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
e.Run(ctx)
result := readChannel(response)

Convey("Then interrupt check should return true only after interrupt was called", func() {
So(result, ShouldEqual, "ABQCQ")
})
})
})
}

func TestInterruptedJobUsingChannelExecution(t *testing.T) {
Convey("Given executor that executes single job and checks for interrupt inbetween using channel", t, func() {
var response chan rune = make(chan rune, 6)
e := Executor(
ExecuteWithContext(func(ctx context.Context) {
response <- 'A'

select {
case _, ok := <-ctx.Done():
response <- 'Q'
if ok {
response <- 'O'
}
case <-time.After(100 * time.Millisecond):
response <- 'T'
}

response <- 'B'

select {
case <-ctx.Done():
response <- 'Q'
case <-time.After(500 * time.Millisecond):
response <- 'T'
}
}),
)
Convey("When executor runs but is interrupted in the middle", func() {
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
e.Run(ctx)
time.Sleep(1 * time.Second)
result := readChannel(response)

Convey("Then channel should be closed only after executor job is interrupted", func() {
So(result, ShouldEqual, "ATBQ")
})
})
})
}

func readChannel(ch chan rune) string {
close(ch)
sb := strings.Builder{}
Expand Down
3 changes: 2 additions & 1 deletion proxy.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package slowjoe

import (
"context"
"errors"
"io"
"math/rand"
Expand Down Expand Up @@ -254,7 +255,7 @@ func (p *Proxy) ListenAndLoop() error {
logrus.WithField("panic", p).Fatalf("Unexpected panic")
},
),
).Run()
).Run(context.TODO())
}()
}
}
Expand Down

0 comments on commit a55f6f1

Please sign in to comment.