Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
leg100 committed Jan 18, 2025
1 parent 49cb7e7 commit 5d7fd63
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 5 deletions.
31 changes: 31 additions & 0 deletions internal/integration/daemon_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/leg100/otf/internal/module"
"github.com/leg100/otf/internal/notifications"
"github.com/leg100/otf/internal/organization"
"github.com/leg100/otf/internal/pubsub"
"github.com/leg100/otf/internal/releases"
"github.com/leg100/otf/internal/resource"
"github.com/leg100/otf/internal/run"
Expand All @@ -43,6 +44,8 @@ type (
*github.TestServer
// dowloader allows tests to download terraform
downloader
// run subscription for tests to check on run events
runEvents <-chan pubsub.Event[*run.Run]
}

// downloader downloads terraform versions
Expand Down Expand Up @@ -130,6 +133,10 @@ func setup(t *testing.T, cfg *config, gopts ...github.TestServerOption) (*testDa
// don't proceed until daemon has started.
<-started

// Subscribe to run events
runEvents, unsub := d.Runs.Watch(ctx)
t.Cleanup(unsub)

t.Cleanup(func() {
cancel() // terminates daemon
<-done // don't exit test until daemon is fully terminated
Expand All @@ -139,6 +146,7 @@ func setup(t *testing.T, cfg *config, gopts ...github.TestServerOption) (*testDa
Daemon: d,
TestServer: githubServer,
downloader: releases.NewDownloader(cfg.terraformBinDir),
runEvents: runEvents,
}

// create a dedicated user account and context for test to use.
Expand Down Expand Up @@ -191,6 +199,29 @@ func (s *testDaemon) getWorkspace(t *testing.T, ctx context.Context, workspaceID
return ws
}

func (s *testDaemon) getRun(t *testing.T, ctx context.Context, runID resource.ID) *run.Run {
t.Helper()

run, err := s.Runs.Get(ctx, runID)
require.NoError(t, err)
return run
}

func (s *testDaemon) waitRunStatus(t *testing.T, runID resource.ID, status run.Status) {
t.Helper()

for event := range s.runEvents {
if event.Payload.ID == runID {
if event.Payload.Status == status {
break
}
if event.Payload.Done() && event.Payload.Status != status {
t.Fatalf("expected run status %s but run finished with status %s", status, event.Payload.Status)
}
}
}
}

func (s *testDaemon) createVCSProvider(t *testing.T, ctx context.Context, org *organization.Organization) *vcsprovider.VCSProvider {
t.Helper()

Expand Down
92 changes: 92 additions & 0 deletions internal/integration/run_scheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package integration

import (
"testing"
"time"

"github.com/leg100/otf/internal/pubsub"
"github.com/leg100/otf/internal/resource"
otfrun "github.com/leg100/otf/internal/run"
"github.com/leg100/otf/internal/workspace"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestRunScheduler(t *testing.T) {
integrationTest(t)

daemon, _, ctx := setup(t, &config{})
user := userFromContext(t, ctx)

// watch workspace events
workspaceEvents, unsub := daemon.Workspaces.Watch(ctx)
defer unsub()

ws := daemon.createWorkspace(t, ctx, nil)
cv := daemon.createAndUploadConfigurationVersion(t, ctx, ws, nil)
run1 := daemon.createRun(t, ctx, ws, cv, nil)
run2 := daemon.createRun(t, ctx, ws, cv, nil)

// Wait for Run#1 to lock workspace
waitWorkspaceLock(t, workspaceEvents, &run1.ID)

// Wait for Run#1 to be planned
daemon.waitRunStatus(t, run1.ID, otfrun.RunPlanned)
// Run#2 should still be pending
assert.Equal(t, otfrun.RunPending, daemon.getRun(t, ctx, run2.ID).Status)

// Apply Run#1
err := daemon.Runs.Apply(ctx, run1.ID)
require.NoError(t, err)

// Wait for Run#1 to be applied
daemon.waitRunStatus(t, run1.ID, otfrun.RunApplied)

// Wait for Run#2 to lock workspace
waitWorkspaceLock(t, workspaceEvents, &run2.ID)

// Wait for Run#2 to be planned&finished (because there are no changes)
daemon.waitRunStatus(t, run2.ID, otfrun.RunPlannedAndFinished)

// Wait for workspace to be unlocked
waitWorkspaceLock(t, workspaceEvents, nil)

// User locks workspace
_, err = daemon.Workspaces.Lock(ctx, ws.ID, nil)
require.NoError(t, err)

// Create another run, it should remain in pending status.
run3 := daemon.createRun(t, ctx, ws, cv, nil)

// Workspace should still be locked by user
waitWorkspaceLock(t, workspaceEvents, &user.ID)

// User unlocks workspace
_, err = daemon.Workspaces.Unlock(ctx, ws.ID, nil, false)
require.NoError(t, err)

// Run #3 should now proceed to planned&finished
daemon.waitRunStatus(t, run3.ID, otfrun.RunPlannedAndFinished)
}

func waitWorkspaceLock(t *testing.T, events <-chan pubsub.Event[*workspace.Workspace], lock *resource.ID) {
t.Helper()

timeout := time.After(5 * time.Second)
for {
select {
case event := <-events:
if lock != nil {
if event.Payload.Lock != nil && *lock == *event.Payload.Lock {
return
}
} else {
if event.Payload.Lock == nil {
return
}
}
case <-timeout:
t.Fatalf("timed out waiting for workspace lock condition")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/stretchr/testify/require"
)

func TestRun(t *testing.T) {
func TestRunService(t *testing.T) {
integrationTest(t)

t.Run("create", func(t *testing.T) {
Expand Down
8 changes: 6 additions & 2 deletions internal/run/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,13 @@ func (s *scheduler) schedule(ctx context.Context, workspaceID resource.ID, run *
if err != nil {
if errors.Is(err, workspace.ErrWorkspaceAlreadyLocked) {
s.V(0).Info("workspace locked by user; cannot schedule run", "run", *q.current)
return nil
// Place current run back onto front of backlog and wait til
// user unlocks workspace
q.backlog = append([]resource.ID{*q.current}, q.backlog...)
q.current = nil
} else {
return err
}
return err
}
}
if unlock {
Expand Down
6 changes: 4 additions & 2 deletions internal/run/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,10 @@ func TestScheduler_schedule(t *testing.T) {
// Should not propagate error
err := s.schedule(ctx, wsID, run1)
assert.NoError(t, err)
// Should not schedule
assert.Equal(t, 0, len(s.queues))
// Should not be made current run but instead placed on backlog
assert.Equal(t, 1, len(s.queues))
assert.Nil(t, s.queues[wsID].current)
assert.Equal(t, []resource.ID{run1.ID}, s.queues[wsID].backlog)
})

t.Run("remove finished current run and unlock queue", func(t *testing.T) {
Expand Down

0 comments on commit 5d7fd63

Please sign in to comment.