Skip to content

Commit

Permalink
fix(runner): canceling non-running jobs causes deadlock
Browse files Browse the repository at this point in the history
leg100 committed Jan 23, 2025
1 parent 0e2f958 commit bad9fce
Showing 12 changed files with 140 additions and 80 deletions.
1 change: 1 addition & 0 deletions internal/daemon/config.go
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@ type Config struct {
EnableRequestLogging bool
DevMode bool
DisableScheduler bool
DisableRunner bool
RestrictOrganizationCreation bool
SiteAdmins []string
SkipTLSVerification bool
18 changes: 11 additions & 7 deletions internal/daemon/daemon.go
Original file line number Diff line number Diff line change
@@ -543,12 +543,14 @@ func (d *Daemon) Start(ctx context.Context, started chan struct{}) error {
LockID: internal.Int64(sql.RunnerManagerLockID),
System: d.Runners.NewManager(),
},
{
}
if !d.DisableRunner {
subsystems = append(subsystems, &Subsystem{
Name: "runner-daemon",
Logger: d.Logger,
DB: d.DB,
System: d.runner,
},
})
}
if !d.DisableScheduler {
subsystems = append(subsystems, &Subsystem{
@@ -577,11 +579,13 @@ func (d *Daemon) Start(ctx context.Context, started chan struct{}) error {
return fmt.Errorf("timed out waiting for database events listener to start")
case <-d.listener.Started():
}
// Wait for agent to register; otherwise some tests may fail
select {
case <-ctx.Done():
return ctx.Err()
case <-d.runner.Registered():
// Wait for runner to register; otherwise some tests may fail
if !d.DisableRunner {
select {
case <-ctx.Done():
return ctx.Err()
case <-d.runner.Registered():
}
}

// Run HTTP/JSON-API server and web app
4 changes: 2 additions & 2 deletions internal/integration/agent_pools_ui_test.go
Original file line number Diff line number Diff line change
@@ -175,7 +175,7 @@ func TestAgentPoolsUI(t *testing.T) {

// shut agent down and wait for it to exit
shutdownAgent()
Wait(t, runnersSub, func(event pubsub.Event[*runner.RunnerMeta]) bool {
wait(t, runnersSub, func(event pubsub.Event[*runner.RunnerMeta]) bool {
return event.Payload.Status == runner.RunnerExited
})

@@ -227,7 +227,7 @@ func TestAgentPoolsUI(t *testing.T) {
require.NoError(t, err)

// confirm pool was deleted
Wait(t, poolsSub, func(event pubsub.Event[*runner.Pool]) bool {
wait(t, poolsSub, func(event pubsub.Event[*runner.Pool]) bool {
return event.Type == pubsub.DeletedEvent
})
})
4 changes: 2 additions & 2 deletions internal/integration/agent_test.go
Original file line number Diff line number Diff line change
@@ -60,7 +60,7 @@ func TestIntegration_Agents(t *testing.T) {
_ = daemon.createRun(t, ctx, ws1, nil, nil)

// wait for job to be allocated to agent1
Wait(t, jobsSub, func(event pubsub.Event[*runner.Job]) bool {
wait(t, jobsSub, func(event pubsub.Event[*runner.Job]) bool {
return event.Payload.Status == runner.JobAllocated &&
*event.Payload.RunnerID == agent1.ID
})
@@ -69,7 +69,7 @@ func TestIntegration_Agents(t *testing.T) {
_ = daemon.createRun(t, ctx, ws2, nil, nil)

// wait for job to be allocated to agent2
Wait(t, jobsSub, func(event pubsub.Event[*runner.Job]) bool {
wait(t, jobsSub, func(event pubsub.Event[*runner.Job]) bool {
return event.Payload.Status == runner.JobAllocated &&
*event.Payload.RunnerID == agent2.ID
})
8 changes: 5 additions & 3 deletions internal/integration/helpers_test.go
Original file line number Diff line number Diff line change
@@ -82,9 +82,11 @@ func userFromContext(t *testing.T, ctx context.Context) *user.User {
return user
}

// Wait for an event to arrive satisfying the condition within a 10 second timeout.
func Wait[T any](t *testing.T, c <-chan pubsub.Event[T], cond func(pubsub.Event[T]) bool) {
timeout := time.After(10 * time.Second)
// wait for an event to arrive satisfying the condition within a timeout.
func wait[T any](t *testing.T, c <-chan pubsub.Event[T], cond func(pubsub.Event[T]) bool) {
t.Helper()

timeout := time.After(5 * time.Second)
for {
select {
case <-timeout:
Original file line number Diff line number Diff line change
@@ -9,14 +9,16 @@ import (

"github.com/leg100/otf/internal"
"github.com/leg100/otf/internal/releases"
"github.com/leg100/otf/internal/run"
"github.com/leg100/otf/internal/runner"
"github.com/leg100/otf/internal/variable"
"github.com/leg100/otf/internal/workspace"
"github.com/stretchr/testify/require"
)

// TestIntegration_RunCancel demonstrates a run being canceled mid-flow.
func TestIntegration_RunCancel(t *testing.T) {
// TestIntegration_RunCancelInterrupt tests cancelling a run via an interrupt
// signal, which occurs when a run is in the planning or applying state.
func TestIntegration_RunCancelInterrupt(t *testing.T) {
integrationTest(t)

// stage a fake terraform bin that sleeps until it receives an interrupt
@@ -81,4 +83,8 @@ func TestIntegration_RunCancel(t *testing.T) {

// fake bin has received interrupt
require.Equal(t, "canceled", <-got)

// canceling the job should result in the run then entering the canceled
// state.
daemon.waitRunStatus(t, r.ID, run.RunCanceled)
}
46 changes: 46 additions & 0 deletions internal/integration/run_job_cancel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package integration

import (
"testing"

"github.com/leg100/otf/internal/daemon"
"github.com/leg100/otf/internal/pubsub"
"github.com/leg100/otf/internal/run"
"github.com/leg100/otf/internal/runner"
"github.com/stretchr/testify/require"
)

// TestIntegration_RunJobCancel tests the cancelation of a run and one of its
// jobs, when the job is not yet running.
func TestIntegration_RunJobCancel(t *testing.T) {
integrationTest(t)

// Disable runner to prevent the run's plan job from running.
daemon, _, ctx := setup(t, &config{
Config: daemon.Config{
DisableRunner: true,
},
})
// Watch job events
jobs, unsub := daemon.Runners.WatchJobs(ctx)
defer unsub()

// Create run, and wait til it reaches plan queued state
r := daemon.createRun(t, ctx, nil, nil, nil)
daemon.waitRunStatus(t, r.ID, run.RunPlanQueued)
// Job should be automatically created
wait(t, jobs, func(event pubsub.Event[*runner.Job]) bool {
return event.Payload.RunID == r.ID
})

// Cancel run
err := daemon.Runs.Cancel(ctx, r.ID)
require.NoError(t, err)

// Run and job should now enter canceled state.
daemon.waitRunStatus(t, r.ID, run.RunCanceled)
wait(t, jobs, func(event pubsub.Event[*runner.Job]) bool {
return event.Payload.Status == runner.JobCanceled &&
event.Payload.RunID == r.ID
})
}
9 changes: 6 additions & 3 deletions internal/runner/allocator.go
Original file line number Diff line number Diff line change
@@ -147,10 +147,13 @@ func (a *allocator) allocate(ctx context.Context, job *Job) error {
reallocate = true
}
case JobFinished, JobCanceled, JobErrored:
// job has completed: remove and adjust number of current jobs
// runner has
// job has completed
delete(a.jobs, job.ID)
a.decrementCurrentJobs(*job.RunnerID)
// adjust current jobs of job's runner if allocated (an unallocated job
// could have been canceled).
if job.RunnerID != nil {
a.decrementCurrentJobs(*job.RunnerID)
}
return nil
case JobRunning:
return nil
7 changes: 2 additions & 5 deletions internal/runner/db.go
Original file line number Diff line number Diff line change
@@ -260,15 +260,12 @@ func (db *db) updateJob(ctx context.Context, jobID resource.ID, fn func(context.
)
}

func (db *db) updateJobByRunPhase(ctx context.Context, runID resource.ID, runPhase internal.PhaseType, fn func(context.Context, *Job) error) (*Job, error) {
func (db *db) updateUnfinishedJobByRunID(ctx context.Context, runID resource.ID, fn func(context.Context, *Job) error) (*Job, error) {
return sql.Updater(
ctx,
db.DB,
func(ctx context.Context, q *sqlc.Queries) (*Job, error) {
result, err := q.FindJobForUpdateByRunPhase(ctx, sqlc.FindJobForUpdateByRunPhaseParams{
RunID: runID,
Phase: sql.String(string(runPhase)),
})
result, err := q.FindUnfinishedJobForUpdateByRunID(ctx, runID)
if err != nil {
return nil, err
}
2 changes: 1 addition & 1 deletion internal/runner/service.go
Original file line number Diff line number Diff line change
@@ -317,7 +317,7 @@ func (s *Service) createJob(ctx context.Context, run *otfrun.Run) error {
// job should be canceled.
func (s *Service) cancelJob(ctx context.Context, run *otfrun.Run) error {
var signal *bool
job, err := s.db.updateJobByRunPhase(ctx, run.ID, run.Phase(), func(ctx context.Context, job *Job) (err error) {
job, err := s.db.updateUnfinishedJobByRunID(ctx, run.ID, func(ctx context.Context, job *Job) (err error) {
signal, err = job.cancel(run)
return err
})
8 changes: 6 additions & 2 deletions internal/sql/queries/job.sql
Original file line number Diff line number Diff line change
@@ -62,7 +62,11 @@ WHERE j.job_id = sqlc.arg('job_id')
FOR UPDATE OF j
;

-- name: FindJobForUpdateByRunPhase :one

-- FindUnfinishedJobForUpdateByRunID finds an unfinished job belonging to a run.
-- (There should only be one such job for a run).
--
-- name: FindUnfinishedJobForUpdateByRunID :one
SELECT
j.job_id,
j.run_id,
@@ -77,7 +81,7 @@ FROM jobs j
JOIN runs r USING (run_id)
JOIN workspaces w USING (workspace_id)
WHERE j.run_id = sqlc.arg('run_id')
AND j.phase = sqlc.arg('phase')
AND j.status IN ('unallocated', 'allocated', 'running')
FOR UPDATE OF j
;

103 changes: 50 additions & 53 deletions internal/sql/sqlc/job.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit bad9fce

Please sign in to comment.