Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
leg100 committed Jan 18, 2025
1 parent a074f91 commit 49cb7e7
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 6 deletions.
6 changes: 3 additions & 3 deletions internal/run/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,11 @@ func (s *scheduler) schedule(ctx context.Context, workspaceID resource.ID, run *
q := s.queues[workspaceID]
q, enqueue, unlock := q.process(run)
if enqueue {
run, err := s.runs.EnqueuePlan(ctx, *q.current)
_, err := s.runs.EnqueuePlan(ctx, *q.current)
if err != nil {
if errors.Is(err, workspace.ErrWorkspaceAlreadyLocked) {
s.V(0).Info("workspace locked by user; cannot schedule run", "run", run.ID)
return err
s.V(0).Info("workspace locked by user; cannot schedule run", "run", *q.current)
return nil
}
return err
}
Expand Down
30 changes: 27 additions & 3 deletions internal/run/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,29 @@ func TestScheduler_schedule(t *testing.T) {
assert.Equal(t, []resource.ID{run2.ID, run3.ID}, s.queues[wsID].backlog)
})

t.Run("remove finished current run", func(t *testing.T) {
t.Run("attempt to schedule run on user-locked workspace", func(t *testing.T) {
s := &scheduler{
runs: &fakeSchedulerRunClient{
enqueuePlanError: workspace.ErrWorkspaceAlreadyLocked,
},
workspaces: &fakeSchedulerWorkspaceClient{},
queues: make(map[resource.ID]queue),
}
run1 := &Run{Status: RunPending, ID: resource.NewID(resource.RunKind)}

// Should not propagate error
err := s.schedule(ctx, wsID, run1)
assert.NoError(t, err)
// Should not schedule
assert.Equal(t, 0, len(s.queues))
})

t.Run("remove finished current run and unlock queue", func(t *testing.T) {
runID := resource.NewID(resource.RunKind)
workspaces := &fakeSchedulerWorkspaceClient{}
s := &scheduler{
runs: &fakeSchedulerRunClient{},
workspaces: &fakeSchedulerWorkspaceClient{},
workspaces: workspaces,
queues: map[resource.ID]queue{
wsID: {current: &runID},
},
Expand All @@ -66,6 +84,7 @@ func TestScheduler_schedule(t *testing.T) {
require.NoError(t, err)

assert.Nil(t, s.queues[wsID].current)
assert.True(t, workspaces.unlocked)
})
}

Expand Down Expand Up @@ -147,16 +166,21 @@ func TestScheduler_process(t *testing.T) {

type fakeSchedulerRunClient struct {
schedulerRunClient

enqueuePlanError error
}

func (f *fakeSchedulerRunClient) EnqueuePlan(ctx context.Context, runID resource.ID) (*Run, error) {
return nil, nil
return nil, f.enqueuePlanError
}

type fakeSchedulerWorkspaceClient struct {
schedulerWorkspaceClient

unlocked bool
}

func (f *fakeSchedulerWorkspaceClient) Unlock(ctx context.Context, workspaceID resource.ID, runID *resource.ID, force bool) (*workspace.Workspace, error) {
f.unlocked = true
return nil, nil
}
3 changes: 3 additions & 0 deletions internal/run/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ func (s *Service) EnqueuePlan(ctx context.Context, runID resource.ID) (run *Run,
run, err = s.db.UpdateStatus(ctx, runID, func(ctx context.Context, run *Run) error {
return run.EnqueuePlan()
})
if err != nil {
return err
}
if !run.PlanOnly {
_, err := s.workspaces.Lock(ctx, run.WorkspaceID, &run.ID)
if err != nil {
Expand Down

0 comments on commit 49cb7e7

Please sign in to comment.