diff --git a/internal/integration/daemon_helpers_test.go b/internal/integration/daemon_helpers_test.go index 671abe6fb..a51816e22 100644 --- a/internal/integration/daemon_helpers_test.go +++ b/internal/integration/daemon_helpers_test.go @@ -20,6 +20,7 @@ import ( "github.com/leg100/otf/internal/module" "github.com/leg100/otf/internal/notifications" "github.com/leg100/otf/internal/organization" + "github.com/leg100/otf/internal/pubsub" "github.com/leg100/otf/internal/releases" "github.com/leg100/otf/internal/resource" "github.com/leg100/otf/internal/run" @@ -43,6 +44,8 @@ type ( *github.TestServer // dowloader allows tests to download terraform downloader + // run subscription for tests to check on run events + runEvents <-chan pubsub.Event[*run.Run] } // downloader downloads terraform versions @@ -130,6 +133,10 @@ func setup(t *testing.T, cfg *config, gopts ...github.TestServerOption) (*testDa // don't proceed until daemon has started. <-started + // Subscribe to run events + runEvents, unsub := d.Runs.Watch(ctx) + t.Cleanup(unsub) + t.Cleanup(func() { cancel() // terminates daemon <-done // don't exit test until daemon is fully terminated @@ -139,6 +146,7 @@ func setup(t *testing.T, cfg *config, gopts ...github.TestServerOption) (*testDa Daemon: d, TestServer: githubServer, downloader: releases.NewDownloader(cfg.terraformBinDir), + runEvents: runEvents, } // create a dedicated user account and context for test to use. @@ -191,6 +199,29 @@ func (s *testDaemon) getWorkspace(t *testing.T, ctx context.Context, workspaceID return ws } +func (s *testDaemon) getRun(t *testing.T, ctx context.Context, runID resource.ID) *run.Run { + t.Helper() + + run, err := s.Runs.Get(ctx, runID) + require.NoError(t, err) + return run +} + +func (s *testDaemon) waitRunStatus(t *testing.T, runID resource.ID, status run.Status) { + t.Helper() + + for event := range s.runEvents { + if event.Payload.ID == runID { + if event.Payload.Status == status { + break + } + if event.Payload.Done() && event.Payload.Status != status { + t.Fatalf("expected run status %s but run finished with status %s", status, event.Payload.Status) + } + } + } +} + func (s *testDaemon) createVCSProvider(t *testing.T, ctx context.Context, org *organization.Organization) *vcsprovider.VCSProvider { t.Helper() diff --git a/internal/integration/run_scheduler_test.go b/internal/integration/run_scheduler_test.go new file mode 100644 index 000000000..6f6407ac0 --- /dev/null +++ b/internal/integration/run_scheduler_test.go @@ -0,0 +1,92 @@ +package integration + +import ( + "testing" + "time" + + "github.com/leg100/otf/internal/pubsub" + "github.com/leg100/otf/internal/resource" + otfrun "github.com/leg100/otf/internal/run" + "github.com/leg100/otf/internal/workspace" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRunScheduler(t *testing.T) { + integrationTest(t) + + daemon, _, ctx := setup(t, &config{}) + user := userFromContext(t, ctx) + + // watch workspace events + workspaceEvents, unsub := daemon.Workspaces.Watch(ctx) + defer unsub() + + ws := daemon.createWorkspace(t, ctx, nil) + cv := daemon.createAndUploadConfigurationVersion(t, ctx, ws, nil) + run1 := daemon.createRun(t, ctx, ws, cv, nil) + run2 := daemon.createRun(t, ctx, ws, cv, nil) + + // Wait for Run#1 to lock workspace + waitWorkspaceLock(t, workspaceEvents, &run1.ID) + + // Wait for Run#1 to be planned + daemon.waitRunStatus(t, run1.ID, otfrun.RunPlanned) + // Run#2 should still be pending + assert.Equal(t, otfrun.RunPending, daemon.getRun(t, ctx, run2.ID).Status) + + // Apply Run#1 + err := daemon.Runs.Apply(ctx, run1.ID) + require.NoError(t, err) + + // Wait for Run#1 to be applied + daemon.waitRunStatus(t, run1.ID, otfrun.RunApplied) + + // Wait for Run#2 to lock workspace + waitWorkspaceLock(t, workspaceEvents, &run2.ID) + + // Wait for Run#2 to be planned&finished (because there are no changes) + daemon.waitRunStatus(t, run2.ID, otfrun.RunPlannedAndFinished) + + // Wait for workspace to be unlocked + waitWorkspaceLock(t, workspaceEvents, nil) + + // User locks workspace + _, err = daemon.Workspaces.Lock(ctx, ws.ID, nil) + require.NoError(t, err) + + // Create another run, it should remain in pending status. + run3 := daemon.createRun(t, ctx, ws, cv, nil) + + // Workspace should still be locked by user + waitWorkspaceLock(t, workspaceEvents, &user.ID) + + // User unlocks workspace + _, err = daemon.Workspaces.Unlock(ctx, ws.ID, nil, false) + require.NoError(t, err) + + // Run #3 should now proceed to planned&finished + daemon.waitRunStatus(t, run3.ID, otfrun.RunPlannedAndFinished) +} + +func waitWorkspaceLock(t *testing.T, events <-chan pubsub.Event[*workspace.Workspace], lock *resource.ID) { + t.Helper() + + timeout := time.After(5 * time.Second) + for { + select { + case event := <-events: + if lock != nil { + if event.Payload.Lock != nil && *lock == *event.Payload.Lock { + return + } + } else { + if event.Payload.Lock == nil { + return + } + } + case <-timeout: + t.Fatalf("timed out waiting for workspace lock condition") + } + } +} diff --git a/internal/integration/run_test.go b/internal/integration/run_service_test.go similarity index 99% rename from internal/integration/run_test.go rename to internal/integration/run_service_test.go index e5d561009..a04ec95be 100644 --- a/internal/integration/run_test.go +++ b/internal/integration/run_service_test.go @@ -17,7 +17,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestRun(t *testing.T) { +func TestRunService(t *testing.T) { integrationTest(t) t.Run("create", func(t *testing.T) { diff --git a/internal/run/scheduler.go b/internal/run/scheduler.go index 5206127b7..656e4a721 100644 --- a/internal/run/scheduler.go +++ b/internal/run/scheduler.go @@ -145,9 +145,13 @@ func (s *scheduler) schedule(ctx context.Context, workspaceID resource.ID, run * if err != nil { if errors.Is(err, workspace.ErrWorkspaceAlreadyLocked) { s.V(0).Info("workspace locked by user; cannot schedule run", "run", *q.current) - return nil + // Place current run back onto front of backlog and wait til + // user unlocks workspace + q.backlog = append([]resource.ID{*q.current}, q.backlog...) + q.current = nil + } else { + return err } - return err } } if unlock { diff --git a/internal/run/scheduler_test.go b/internal/run/scheduler_test.go index 56bee24ba..fb242f696 100644 --- a/internal/run/scheduler_test.go +++ b/internal/run/scheduler_test.go @@ -65,8 +65,10 @@ func TestScheduler_schedule(t *testing.T) { // Should not propagate error err := s.schedule(ctx, wsID, run1) assert.NoError(t, err) - // Should not schedule - assert.Equal(t, 0, len(s.queues)) + // Should not be made current run but instead placed on backlog + assert.Equal(t, 1, len(s.queues)) + assert.Nil(t, s.queues[wsID].current) + assert.Equal(t, []resource.ID{run1.ID}, s.queues[wsID].backlog) }) t.Run("remove finished current run and unlock queue", func(t *testing.T) {