diff --git a/ocis-pkg/runner/grouprunner_test.go b/ocis-pkg/runner/grouprunner_test.go index 110960a5bfc..3d20ad78506 100644 --- a/ocis-pkg/runner/grouprunner_test.go +++ b/ocis-pkg/runner/grouprunner_test.go @@ -19,14 +19,14 @@ var _ = Describe("GroupRunner", func() { task1Ch := make(chan error) task1 := TimedTask(task1Ch, 30*time.Second) - gr.Add(runner.New("task1", task1, func() { + gr.Add(runner.New("task1", 30*time.Second, task1, func() { task1Ch <- nil close(task1Ch) })) task2Ch := make(chan error) task2 := TimedTask(task2Ch, 20*time.Second) - gr.Add(runner.New("task2", task2, func() { + gr.Add(runner.New("task2", 30*time.Second, task2, func() { task2Ch <- nil close(task2Ch) })) @@ -35,7 +35,7 @@ var _ = Describe("GroupRunner", func() { Describe("Add", func() { It("Duplicated runner id panics", func() { Expect(func() { - gr.Add(runner.New("task1", func() error { + gr.Add(runner.New("task1", 30*time.Second, func() error { time.Sleep(6 * time.Second) return nil }, func() { @@ -64,7 +64,7 @@ var _ = Describe("GroupRunner", func() { task3Ch := make(chan error) task3 := TimedTask(task3Ch, 15*time.Second) Expect(func() { - gr.Add(runner.New("task3", task3, func() { + gr.Add(runner.New("task3", 30*time.Second, task3, func() { task3Ch <- nil close(task3Ch) })) @@ -78,7 +78,7 @@ var _ = Describe("GroupRunner", func() { Expect(func() { task3Ch := make(chan error) task3 := TimedTask(task3Ch, 15*time.Second) - gr.Add(runner.New("task3", task3, func() { + gr.Add(runner.New("task3", 30*time.Second, task3, func() { task3Ch <- nil close(task3Ch) })) @@ -90,7 +90,7 @@ var _ = Describe("GroupRunner", func() { It("Context is done", func(ctx SpecContext) { task3Ch := make(chan error) task3 := TimedTask(task3Ch, 15*time.Second) - gr.Add(runner.New("task3", task3, func() { + gr.Add(runner.New("task3", 30*time.Second, task3, func() { task3Ch <- nil close(task3Ch) })) @@ -117,7 +117,7 @@ var _ = Describe("GroupRunner", func() { It("One task finishes early", func(ctx SpecContext) { task3Ch := make(chan error) task3 := TimedTask(task3Ch, 1*time.Second) - gr.Add(runner.New("task3", task3, func() { + gr.Add(runner.New("task3", 30*time.Second, task3, func() { task3Ch <- nil close(task3Ch) })) @@ -157,7 +157,7 @@ var _ = Describe("GroupRunner", func() { It("Wait in channel", func(ctx SpecContext) { task3Ch := make(chan error) task3 := TimedTask(task3Ch, 1*time.Second) - gr.Add(runner.New("task3", task3, func() { + gr.Add(runner.New("task3", 30*time.Second, task3, func() { task3Ch <- nil close(task3Ch) })) @@ -182,7 +182,7 @@ var _ = Describe("GroupRunner", func() { It("Interrupt async", func(ctx SpecContext) { task3Ch := make(chan error) task3 := TimedTask(task3Ch, 15*time.Second) - gr.Add(runner.New("task3", task3, func() { + gr.Add(runner.New("task3", 30*time.Second, task3, func() { task3Ch <- nil close(task3Ch) })) diff --git a/ocis-pkg/runner/helper.go b/ocis-pkg/runner/helper.go deleted file mode 100644 index fca604977d7..00000000000 --- a/ocis-pkg/runner/helper.go +++ /dev/null @@ -1,55 +0,0 @@ -package runner - -import ( - "fmt" - "time" -) - -// InterruptedTimeoutRunner will create a new runner (R2) based an original -// runner (R1). -// The new runner (R2) will monitor the original (R1). Once the `Interrupt` -// method is called in the new (R2), the interruption will be delivered to -// the original (R1), but a timeout will start. If we reach the timeout -// before the original runner (R1) is finished, the new runner (R2) will -// return an error. -// -// Any valid duration can be provided for the timeout, but you should give -// enough time for the task to finish in order to get the error from the -// original task (R1) and not the timeout one from the new (R2). -// Depending on the task, 5s, 10s or 30s might be reasonable timeout values. -// -// The timeout will start once the new (R2) runner is interrupted, either -// manually or via context -// -// Note that R2 can't stop R1 in any way. Even if R2 returns a "timeout" error, -// R1 might still be running and consuming resources. -// This method is intended to provide a way to ensure that the main thread -// won't be blocked forever. -func InterruptedTimeoutRunner(r *Runner, d time.Duration) *Runner { - timeoutCh := make(chan time.Time) - return New(r.ID, func() error { - ch := make(chan *Result) - r.RunAsync(ch) - - select { - case result := <-ch: - return result.RunnerError // forward the runner error - case t := <-timeoutCh: - // timeout reached. We can't stop the task, but we'll return - // an error instead to prevent blocking the thread. - return fmt.Errorf("Timeout reached at %s after waiting for %s after being interrupted", t.String(), d.String()) - } - }, func() { - go func() { - select { - case <-r.Finished(): - // Task finished -> runner should be delivering the result - case t := <-time.After(d): - // timeout reached -> send it through the channel so our runner - // can abort - timeoutCh <- t - } - }() - r.Interrupt() - }) -} diff --git a/ocis-pkg/runner/helper_test.go b/ocis-pkg/runner/helper_test.go deleted file mode 100644 index a64ea9c25bc..00000000000 --- a/ocis-pkg/runner/helper_test.go +++ /dev/null @@ -1,87 +0,0 @@ -package runner_test - -import ( - "context" - "time" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "github.com/owncloud/ocis/v2/ocis-pkg/runner" -) - -var _ = Describe("Helper", func() { - Describe("InterruptedTimeoutRunner", func() { - It("Context done, no timeout", func(ctx SpecContext) { - r1 := runner.New("task", func() error { - time.Sleep(10 * time.Millisecond) - return nil - }, func() { - }) - - r2 := runner.InterruptedTimeoutRunner(r1, 2*time.Second) - - // context will be done in 1 second (task will finishes before) - myCtx, cancel := context.WithTimeout(ctx, 1*time.Second) - defer cancel() - - // spawn a new goroutine and return the result in the channel - ch2 := make(chan *runner.Result) - go func(ch2 chan *runner.Result) { - ch2 <- r2.Run(myCtx) - close(ch2) - }(ch2) - - expectedResult := &runner.Result{ - RunnerID: "task", - RunnerError: nil, - } - - Eventually(ctx, ch2).Should(Receive(Equal(expectedResult))) - }, SpecTimeout(5*time.Second)) - - It("Context done, timeout reached", func(ctx SpecContext) { - r1 := runner.New("task", func() error { - time.Sleep(10 * time.Second) - return nil - }, func() { - }) - - r2 := runner.InterruptedTimeoutRunner(r1, 2*time.Second) - - // context will be done in 1 second (task will finishes before) - myCtx, cancel := context.WithTimeout(ctx, 1*time.Second) - defer cancel() - - // spawn a new goroutine and return the result in the channel - ch2 := make(chan *runner.Result) - go func(ch2 chan *runner.Result) { - ch2 <- r2.Run(myCtx) - close(ch2) - }(ch2) - - var expectedResult *runner.Result - Eventually(ctx, ch2).Should(Receive(&expectedResult)) - Expect(expectedResult.RunnerID).To(Equal("task")) - Expect(expectedResult.RunnerError.Error()).To(ContainSubstring("Timeout reached")) - }, SpecTimeout(5*time.Second)) - - It("Interrupted, timeout reached", func(ctx SpecContext) { - r1 := runner.New("task", func() error { - time.Sleep(10 * time.Second) - return nil - }, func() { - }) - - r2 := runner.InterruptedTimeoutRunner(r1, 2*time.Second) - - ch2 := make(chan *runner.Result) - r2.RunAsync(ch2) - r2.Interrupt() - - var expectedResult *runner.Result - Eventually(ctx, ch2).Should(Receive(&expectedResult)) - Expect(expectedResult.RunnerID).To(Equal("task")) - Expect(expectedResult.RunnerError.Error()).To(ContainSubstring("Timeout reached")) - }, SpecTimeout(5*time.Second)) - }) -}) diff --git a/ocis-pkg/runner/runner.go b/ocis-pkg/runner/runner.go index e80ad194db9..d720a45b8e0 100644 --- a/ocis-pkg/runner/runner.go +++ b/ocis-pkg/runner/runner.go @@ -2,7 +2,9 @@ package runner import ( "context" + "fmt" "sync/atomic" + "time" ) // Runner represents the one executing a long running task, such as a server @@ -10,13 +12,18 @@ import ( // The ID of the runner is public to make identification easier, and the // Result that it will generated will contain the same ID, so we can // know which runner provided which result. +// +// Runners are intended to be used only once. Reusing them isn't possible. +// You'd need to create a new runner if you want to rerun the same task. type Runner struct { - ID string - fn Runable - interrupt Stopper - running atomic.Bool - interrupted atomic.Bool - finished chan struct{} + ID string + interruptDur time.Duration + fn Runable + interrupt Stopper + running atomic.Bool + interrupted atomic.Bool + interruptedCh chan time.Duration + finished chan struct{} } // New will create a new runner. @@ -24,15 +31,24 @@ type Runner struct { // otherwise undefined behavior might occur), and will run the provided // runable task, using the "interrupt" function to stop that task if needed. // +// The interrupt duration will be used to ensure the runner doesn't block +// forever. The interrupt duration will be used to start a timeout when the +// runner gets interrupted (either the context of the `Run` method is done +// or this runner's `Interrupt` method is called). If the timeout is reached, +// a timeout result will be returned instead of whatever result the task should +// be returning. +// // Note that it's your responsibility to provide a proper stopper for the task. // The runner will just call that method assuming it will be enough to // eventually stop the task at some point. -func New(id string, fn Runable, interrupt Stopper) *Runner { +func New(id string, interruptDur time.Duration, fn Runable, interrupt Stopper) *Runner { return &Runner{ - ID: id, - fn: fn, - interrupt: interrupt, - finished: make(chan struct{}), + ID: id, + interruptDur: interruptDur, + fn: fn, + interrupt: interrupt, + interruptedCh: make(chan time.Duration), + finished: make(chan struct{}), } } @@ -48,6 +64,11 @@ func New(id string, fn Runable, interrupt Stopper) *Runner { // make the task to eventually complete. // // Once the task finishes, the result will be returned. +// When the context is done, or if the runner is interrupted, a timeout will +// start using the provided "interrupt duration". If this timeout is reached, +// a timeout result will be returned instead of the one from the task. This is +// intended to prevent blocking the main thread indefinitely. A suitable +// duration should be used depending on the task, usually 5, 10 or 30 secs // // Some nice things you can do: // - Use signal.NotifyContext(...) to call the stopper and provide a clean @@ -97,9 +118,24 @@ func (r *Runner) RunAsync(ch chan<- *Result) { // in order for it to finish. // The stopper will be called immediately, although it's expected the // consequences to take a while (task might need a while to stop) +// A timeout will start using the provided "interrupt duration". Once that +// timeout is reached, the task must provide a result with a timeout error. +// Note that, even after returning the timeout result, the task could still +// be being executed and consuming resource. // This method will be called only once. Further calls won't do anything func (r *Runner) Interrupt() { if r.interrupted.CompareAndSwap(false, true) { + go func() { + select { + case <-r.Finished(): + // Task finished -> runner should be delivering the result + case <-time.After(r.interruptDur): + // timeout reached -> send it through the channel so our runner + // can abort + r.interruptedCh <- r.interruptDur + close(r.interruptedCh) + } + }() r.interrupt() } } @@ -115,17 +151,41 @@ func (r *Runner) Finished() <-chan struct{} { // doTask will perform this runner's task and write the result in the provided // channel. The channel will be closed if requested. +// A result will be provided when either the task finishes naturally or we +// reach the timeout after being interrupted func (r *Runner) doTask(ch chan<- *Result, closeChan bool) { - err := r.fn() + tmpCh := make(chan *Result) + + // spawn the task and return the result in a temporary channel + go func(tmpCh chan *Result) { + err := r.fn() + + close(r.finished) - close(r.finished) + result := &Result{ + RunnerID: r.ID, + RunnerError: err, + } + tmpCh <- result - result := &Result{ - RunnerID: r.ID, - RunnerError: err, + close(tmpCh) + }(tmpCh) + + // wait for the result in the temporary channel or until we get the + // interrupted signal + var result *Result + select { + case d := <-r.interruptedCh: + result = &Result{ + RunnerID: r.ID, + RunnerError: fmt.Errorf("runner %s timed out after waiting for %s", r.ID, d.String()), + } + case result = <-tmpCh: + // Just assign the received value, nothing else to do } - ch <- result + // send the result + ch <- result if closeChan { close(ch) } diff --git a/ocis-pkg/runner/runner_suite_test.go b/ocis-pkg/runner/runner_suite_test.go index 0d45944ed7a..800c47fc80d 100644 --- a/ocis-pkg/runner/runner_suite_test.go +++ b/ocis-pkg/runner/runner_suite_test.go @@ -7,7 +7,7 @@ import ( . "github.com/onsi/gomega" ) -func TestGraph(t *testing.T) { +func TestRunner(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "Runner Suite") } diff --git a/ocis-pkg/runner/runner_test.go b/ocis-pkg/runner/runner_test.go index bf44fdf798b..8ee2a4f899c 100644 --- a/ocis-pkg/runner/runner_test.go +++ b/ocis-pkg/runner/runner_test.go @@ -44,7 +44,7 @@ var _ = Describe("Runner", func() { // channel, so the task can finish // Worst case, the task will finish after 15 secs ch := make(chan error) - r := runner.New("run001", TimedTask(ch, 15*time.Second), func() { + r := runner.New("run001", 30*time.Second, TimedTask(ch, 15*time.Second), func() { ch <- nil close(ch) }) @@ -76,7 +76,7 @@ var _ = Describe("Runner", func() { // channel, so the task can finish // Worst case, the task will finish after 15 secs ch := make(chan error) - r := runner.New("run001", TimedTask(ch, 15*time.Second), func() { + r := runner.New("run001", 30*time.Second, TimedTask(ch, 15*time.Second), func() { ch <- nil close(ch) }) @@ -106,7 +106,7 @@ var _ = Describe("Runner", func() { It("Task finishes naturally", func(ctx SpecContext) { e := errors.New("overslept!") - r := runner.New("run002", func() error { + r := runner.New("run002", 30*time.Second, func() error { time.Sleep(50 * time.Millisecond) return e }, func() { @@ -134,7 +134,7 @@ var _ = Describe("Runner", func() { }, SpecTimeout(5*time.Second)) It("Task doesn't finish", func(ctx SpecContext) { - r := runner.New("run003", func() error { + r := runner.New("run003", 30*time.Second, func() error { time.Sleep(20 * time.Second) return nil }, func() { @@ -156,9 +156,37 @@ var _ = Describe("Runner", func() { Consistently(ctx, ch2).WithTimeout(4500 * time.Millisecond).ShouldNot(Receive()) }, SpecTimeout(5*time.Second)) + It("Task doesn't finish and times out", func(ctx SpecContext) { + r := runner.New("run003", 3*time.Second, func() error { + time.Sleep(20 * time.Second) + return nil + }, func() { + }) + + // context will be done in 1 second + myCtx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + + ch2 := make(chan *runner.Result) + go func(ch2 chan *runner.Result) { + ch2 <- r.Run(myCtx) + close(ch2) + }(ch2) + + var expectedResult *runner.Result + // Task will finish naturally in 60 secs + // Task's context will finish in 1 sec, but task won't receive + // the notification and it will keep going + // Task will time out in 3 seconds after being interrupted (when + // context is done), so test should finish in 4 seconds + Eventually(ctx, ch2).Should(Receive(&expectedResult)) + Expect(expectedResult.RunnerID).To(Equal("run003")) + Expect(expectedResult.RunnerError.Error()).To(ContainSubstring("timed out")) + }, SpecTimeout(5*time.Second)) + It("Run mutiple times panics", func(ctx SpecContext) { e := errors.New("overslept!") - r := runner.New("run002", func() error { + r := runner.New("run002", 30*time.Second, func() error { time.Sleep(50 * time.Millisecond) return e }, func() { @@ -180,7 +208,7 @@ var _ = Describe("Runner", func() { ch := make(chan *runner.Result) e := errors.New("Task has finished") - r := runner.New("run004", func() error { + r := runner.New("run004", 30*time.Second, func() error { time.Sleep(50 * time.Millisecond) return e }, func() { @@ -199,7 +227,7 @@ var _ = Describe("Runner", func() { ch := make(chan *runner.Result) e := errors.New("Task has finished") - r := runner.New("run004", func() error { + r := runner.New("run004", 30*time.Second, func() error { time.Sleep(50 * time.Millisecond) return e }, func() { @@ -217,7 +245,7 @@ var _ = Describe("Runner", func() { e := errors.New("Task interrupted") taskCh := make(chan error) - r := runner.New("run005", TimedTask(taskCh, 20*time.Second), func() { + r := runner.New("run005", 30*time.Second, TimedTask(taskCh, 20*time.Second), func() { taskCh <- e close(taskCh) }) @@ -233,12 +261,33 @@ var _ = Describe("Runner", func() { Eventually(ctx, ch).Should(Receive(Equal(expectedResult))) }, SpecTimeout(5*time.Second)) + It("Interrupt async times out", func(ctx SpecContext) { + ch := make(chan *runner.Result) + e := errors.New("Task interrupted") + + r := runner.New("run005", 3*time.Second, func() error { + time.Sleep(30 * time.Second) + return e + }, func() { + }) + + r.RunAsync(ch) + r.Interrupt() + + var expectedResult *runner.Result + + // Task will timeout after 3 second of receiving the interruption + Eventually(ctx, ch).Should(Receive(&expectedResult)) + Expect(expectedResult.RunnerID).To(Equal("run005")) + Expect(expectedResult.RunnerError.Error()).To(ContainSubstring("timed out")) + }, SpecTimeout(5*time.Second)) + It("Interrupt async multiple times", func(ctx SpecContext) { ch := make(chan *runner.Result) e := errors.New("Task interrupted") taskCh := make(chan error) - r := runner.New("run005", TimedTask(taskCh, 20*time.Second), func() { + r := runner.New("run005", 30*time.Second, TimedTask(taskCh, 20*time.Second), func() { taskCh <- e close(taskCh) }) @@ -260,7 +309,7 @@ var _ = Describe("Runner", func() { Describe("Finished", func() { It("Finish channel closes", func(ctx SpecContext) { - r := runner.New("run006", func() error { + r := runner.New("run006", 30*time.Second, func() error { time.Sleep(50 * time.Millisecond) return nil }, func() {