diff --git a/internal/job/job.go b/internal/job/job.go index ea17fcc3..487ae267 100644 --- a/internal/job/job.go +++ b/internal/job/job.go @@ -19,6 +19,10 @@ type Job struct { // which is used for deduplication of queued jobs along with Dir. Type string + // Priority represents priority with which the job should be scheduled. + // This overrides the priority implied from whether the dir is open. + Priority JobPriority + // Defer is a function to execute after Func is executed // and before the job is marked as done (StateDone). // This can be used to schedule jobs dependent on the main job. @@ -32,9 +36,17 @@ type DeferFunc func(ctx context.Context, jobErr error) IDs func (job Job) Copy() Job { return Job{ - Func: job.Func, - Dir: job.Dir, - Type: job.Type, - Defer: job.Defer, + Func: job.Func, + Dir: job.Dir, + Type: job.Type, + Priority: job.Priority, + Defer: job.Defer, } } + +type JobPriority int + +const ( + LowPriority JobPriority = -1 + HighPriority JobPriority = 1 +) diff --git a/internal/langserver/handlers/did_open.go b/internal/langserver/handlers/did_open.go index b4800959..0647be8b 100644 --- a/internal/langserver/handlers/did_open.go +++ b/internal/langserver/handlers/did_open.go @@ -175,18 +175,18 @@ func (svc *service) decodeModule(ctx context.Context, modHandle document.DirHand } ids = append(ids, id) - id, err = svc.stateStore.JobStore.EnqueueJob(job.Job{ + _, err = svc.stateStore.JobStore.EnqueueJob(job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.GetModuleDataFromRegistry(svc.srvCtx, svc.registryClient, svc.modStore, svc.stateStore.RegistryModules, modHandle.Path()) }, - Type: op.OpTypeGetModuleDataFromRegistry.String(), + Priority: job.LowPriority, + Type: op.OpTypeGetModuleDataFromRegistry.String(), }) if err != nil { return } - ids = append(ids, id) return }, diff --git a/internal/langserver/handlers/indexers.go b/internal/langserver/handlers/indexers.go deleted file mode 100644 index 8a81d721..00000000 --- a/internal/langserver/handlers/indexers.go +++ /dev/null @@ -1,48 +0,0 @@ -package handlers - -import ( - "context" - - "github.com/hashicorp/terraform-ls/internal/job" - "github.com/hashicorp/terraform-ls/internal/state" -) - -type closedDirJobStore struct { - js *state.JobStore -} - -func (js *closedDirJobStore) EnqueueJob(newJob job.Job) (job.ID, error) { - return js.js.EnqueueJob(newJob) -} - -func (js *closedDirJobStore) AwaitNextJob(ctx context.Context) (job.ID, job.Job, error) { - return js.js.AwaitNextJob(ctx, false) -} - -func (js *closedDirJobStore) FinishJob(id job.ID, jobErr error, deferredJobIds ...job.ID) error { - return js.js.FinishJob(id, jobErr, deferredJobIds...) -} - -func (js *closedDirJobStore) WaitForJobs(ctx context.Context, jobIds ...job.ID) error { - return js.js.WaitForJobs(ctx, jobIds...) -} - -type openDirJobStore struct { - js *state.JobStore -} - -func (js *openDirJobStore) EnqueueJob(newJob job.Job) (job.ID, error) { - return js.js.EnqueueJob(newJob) -} - -func (js *openDirJobStore) AwaitNextJob(ctx context.Context) (job.ID, job.Job, error) { - return js.js.AwaitNextJob(ctx, true) -} - -func (js *openDirJobStore) FinishJob(id job.ID, jobErr error, deferredJobIds ...job.ID) error { - return js.js.FinishJob(id, jobErr, deferredJobIds...) -} - -func (js *openDirJobStore) WaitForJobs(ctx context.Context, jobIds ...job.ID) error { - return js.js.WaitForJobs(ctx, jobIds...) -} diff --git a/internal/langserver/handlers/service.go b/internal/langserver/handlers/service.go index 89b1345b..6ee63bfd 100644 --- a/internal/langserver/handlers/service.go +++ b/internal/langserver/handlers/service.go @@ -17,6 +17,7 @@ import ( idecoder "github.com/hashicorp/terraform-ls/internal/decoder" "github.com/hashicorp/terraform-ls/internal/document" "github.com/hashicorp/terraform-ls/internal/filesystem" + "github.com/hashicorp/terraform-ls/internal/job" "github.com/hashicorp/terraform-ls/internal/langserver/diagnostics" "github.com/hashicorp/terraform-ls/internal/langserver/notifier" "github.com/hashicorp/terraform-ls/internal/langserver/session" @@ -41,8 +42,8 @@ type service struct { sessCtx context.Context stopSession context.CancelFunc - closedDirIndexer *scheduler.Scheduler - openDirIndexer *scheduler.Scheduler + lowPrioIndexer *scheduler.Scheduler + highPrioIndexer *scheduler.Scheduler closedDirWalker *module.Walker openDirWalker *module.Walker @@ -437,15 +438,15 @@ func (svc *service) configureSessionDependencies(ctx context.Context, cfgOpts *s sendModuleTelemetry(svc.stateStore, svc.telemetry), } - svc.closedDirIndexer = scheduler.NewScheduler(&closedDirJobStore{svc.stateStore.JobStore}, 1) - svc.closedDirIndexer.SetLogger(svc.logger) - svc.closedDirIndexer.Start(svc.sessCtx) - svc.logger.Printf("running closed dir scheduler") + svc.lowPrioIndexer = scheduler.NewScheduler(svc.stateStore.JobStore, 1, job.LowPriority) + svc.lowPrioIndexer.SetLogger(svc.logger) + svc.lowPrioIndexer.Start(svc.sessCtx) + svc.logger.Printf("started low priority scheduler") - svc.openDirIndexer = scheduler.NewScheduler(&openDirJobStore{svc.stateStore.JobStore}, 1) - svc.openDirIndexer.SetLogger(svc.logger) - svc.openDirIndexer.Start(svc.sessCtx) - svc.logger.Printf("running open dir scheduler") + svc.highPrioIndexer = scheduler.NewScheduler(svc.stateStore.JobStore, 1, job.HighPriority) + svc.highPrioIndexer.SetLogger(svc.logger) + svc.highPrioIndexer.Start(svc.sessCtx) + svc.logger.Printf("started high priority scheduler") cc, err := ilsp.ClientCapabilities(ctx) if err == nil { @@ -532,11 +533,11 @@ func (svc *service) shutdown() { svc.logger.Printf("openDirWalker stopped") } - if svc.closedDirIndexer != nil { - svc.closedDirIndexer.Stop() + if svc.lowPrioIndexer != nil { + svc.lowPrioIndexer.Stop() } - if svc.openDirIndexer != nil { - svc.openDirIndexer.Stop() + if svc.highPrioIndexer != nil { + svc.highPrioIndexer.Stop() } } diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index b27e9dd9..387949cb 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -13,22 +13,24 @@ type Scheduler struct { logger *log.Logger jobStorage JobStorage parallelism int + priority job.JobPriority stopFunc context.CancelFunc } type JobStorage interface { job.JobStore - AwaitNextJob(ctx context.Context) (job.ID, job.Job, error) + AwaitNextJob(ctx context.Context, priority job.JobPriority) (job.ID, job.Job, error) FinishJob(id job.ID, jobErr error, deferredJobIds ...job.ID) error } -func NewScheduler(jobStorage JobStorage, parallelism int) *Scheduler { +func NewScheduler(jobStorage JobStorage, parallelism int, priority job.JobPriority) *Scheduler { discardLogger := log.New(ioutil.Discard, "", 0) return &Scheduler{ logger: discardLogger, jobStorage: jobStorage, parallelism: parallelism, + priority: priority, stopFunc: func() {}, } } @@ -54,7 +56,7 @@ func (s *Scheduler) Stop() { func (s *Scheduler) eval(ctx context.Context) { for { - id, nextJob, err := s.jobStorage.AwaitNextJob(ctx) + id, nextJob, err := s.jobStorage.AwaitNextJob(ctx, s.priority) if err != nil { if errors.Is(err, context.Canceled) { return diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index ba4c3937..3986f675 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -27,7 +27,7 @@ func TestScheduler_closedOnly(t *testing.T) { ctx := context.Background() - s := NewScheduler(&closedDirJobs{js: ss.JobStore}, 2) + s := NewScheduler(ss.JobStore, 2, job.LowPriority) s.SetLogger(testLogger()) s.Start(ctx) t.Cleanup(func() { @@ -151,14 +151,14 @@ func TestScheduler_closedAndOpen(t *testing.T) { t.Cleanup(cancelFunc) } - cs := NewScheduler(&closedDirJobs{js: ss.JobStore}, 1) + cs := NewScheduler(ss.JobStore, 1, job.LowPriority) cs.SetLogger(testLogger()) cs.Start(ctx) t.Cleanup(func() { cs.Stop() }) - os := NewScheduler(&openDirJobs{js: ss.JobStore}, 1) + os := NewScheduler(ss.JobStore, 1, job.HighPriority) os.SetLogger(testLogger()) os.Start(ctx) t.Cleanup(func() { @@ -197,7 +197,7 @@ func BenchmarkScheduler_EnqueueAndWaitForJob_closedOnly(b *testing.B) { tmpDir := b.TempDir() ctx := context.Background() - s := NewScheduler(&closedDirJobs{js: ss.JobStore}, 1) + s := NewScheduler(ss.JobStore, 1, job.LowPriority) s.Start(ctx) b.Cleanup(func() { s.Stop() @@ -238,7 +238,7 @@ func TestScheduler_defer(t *testing.T) { ctx := context.Background() - s := NewScheduler(&closedDirJobs{js: ss.JobStore}, 2) + s := NewScheduler(ss.JobStore, 2, job.LowPriority) s.SetLogger(testLogger()) s.Start(ctx) t.Cleanup(func() { @@ -326,43 +326,3 @@ func testLogger() *log.Logger { return log.New(ioutil.Discard, "", 0) } - -type closedDirJobs struct { - js *state.JobStore -} - -func (js *closedDirJobs) EnqueueJob(newJob job.Job) (job.ID, error) { - return js.js.EnqueueJob(newJob) -} - -func (js *closedDirJobs) AwaitNextJob(ctx context.Context) (job.ID, job.Job, error) { - return js.js.AwaitNextJob(ctx, false) -} - -func (js *closedDirJobs) FinishJob(id job.ID, jobErr error, deferredJobIds ...job.ID) error { - return js.js.FinishJob(id, jobErr, deferredJobIds...) -} - -func (js *closedDirJobs) WaitForJobs(ctx context.Context, jobIds ...job.ID) error { - return js.js.WaitForJobs(ctx, jobIds...) -} - -type openDirJobs struct { - js *state.JobStore -} - -func (js *openDirJobs) EnqueueJob(newJob job.Job) (job.ID, error) { - return js.js.EnqueueJob(newJob) -} - -func (js *openDirJobs) AwaitNextJob(ctx context.Context) (job.ID, job.Job, error) { - return js.js.AwaitNextJob(ctx, true) -} - -func (js *openDirJobs) FinishJob(id job.ID, jobErr error, deferredJobIds ...job.ID) error { - return js.js.FinishJob(id, jobErr, deferredJobIds...) -} - -func (js *openDirJobs) WaitForJobs(ctx context.Context, jobIds ...job.ID) error { - return js.js.WaitForJobs(ctx, jobIds...) -} diff --git a/internal/state/jobs.go b/internal/state/jobs.go index 1648bcce..05108094 100644 --- a/internal/state/jobs.go +++ b/internal/state/jobs.go @@ -18,8 +18,8 @@ type JobStore struct { tableName string logger *log.Logger - nextJobOpenDirMu *sync.Mutex - nextJobClosedDirMu *sync.Mutex + nextJobHighPrioMu *sync.Mutex + nextJobLowPrioMu *sync.Mutex lastJobId uint64 } @@ -79,17 +79,20 @@ func (js *JobStore) EnqueueJob(newJob job.Job) (job.ID, error) { newJobID := job.ID(fmt.Sprintf("%d", atomic.AddUint64(&js.lastJobId, 1))) - err = txn.Insert(js.tableName, &ScheduledJob{ + sJob := &ScheduledJob{ ID: newJobID, Job: newJob, IsDirOpen: isDirOpen(txn, newJob.Dir), State: StateQueued, - }) + } + + err = txn.Insert(js.tableName, sJob) if err != nil { return "", err } - js.logger.Printf("JOBS: Enqueueing new job %q: %q for %q", newJobID, newJob.Type, newJob.Dir) + js.logger.Printf("JOBS: Enqueueing new job %q: %q for %q (IsDirOpen: %t)", + sJob.ID, sJob.Type, sJob.Dir, sJob.IsDirOpen) txn.Commit() @@ -190,26 +193,30 @@ func (js *JobStore) jobExists(j job.Job, state State) (job.ID, bool, error) { return "", false, nil } -func (js *JobStore) AwaitNextJob(ctx context.Context, openDir bool) (job.ID, job.Job, error) { +func (js *JobStore) AwaitNextJob(ctx context.Context, priority job.JobPriority) (job.ID, job.Job, error) { // Locking is needed if same query is executed in multiple threads, // i.e. this method is called at the same time from different threads, at // which point txn.FirstWatch would return the same job to more than // one thread and we would then end up executing it more than once. - if openDir { - js.nextJobOpenDirMu.Lock() - defer js.nextJobOpenDirMu.Unlock() - } else { - js.nextJobClosedDirMu.Lock() - defer js.nextJobClosedDirMu.Unlock() - } - - return js.awaitNextJob(ctx, openDir) + switch priority { + case job.HighPriority: + js.nextJobHighPrioMu.Lock() + defer js.nextJobHighPrioMu.Unlock() + case job.LowPriority: + js.nextJobLowPrioMu.Lock() + defer js.nextJobLowPrioMu.Unlock() + default: + // This should never happen + panic(fmt.Sprintf("unexpected priority: %#v", priority)) + } + + return js.awaitNextJob(ctx, priority) } -func (js *JobStore) awaitNextJob(ctx context.Context, openDir bool) (job.ID, job.Job, error) { +func (js *JobStore) awaitNextJob(ctx context.Context, priority job.JobPriority) (job.ID, job.Job, error) { txn := js.db.Txn(false) - wCh, obj, err := txn.FirstWatch(js.tableName, "is_dir_open_state", openDir, StateQueued) + wCh, obj, err := txn.FirstWatch(js.tableName, "priority_state", priority, StateQueued) if err != nil { return "", job.Job{}, err } @@ -221,7 +228,7 @@ func (js *JobStore) awaitNextJob(ctx context.Context, openDir bool) (job.ID, job return "", job.Job{}, ctx.Err() } - return js.awaitNextJob(ctx, openDir) + return js.awaitNextJob(ctx, priority) } sJob := obj.(*ScheduledJob) @@ -235,18 +242,19 @@ func (js *JobStore) awaitNextJob(ctx context.Context, openDir bool) (job.ID, job // Instead of adding more sync primitives here we simply retry. if errors.Is(err, jobAlreadyRunning{ID: sJob.ID}) || errors.Is(err, jobNotFound{ID: sJob.ID}) { js.logger.Printf("retrying next job: %s", err) - return js.awaitNextJob(ctx, openDir) + return js.awaitNextJob(ctx, priority) } return "", job.Job{}, err } - js.logger.Printf("JOBS: Dispatching next job %q: %q for %q", sJob.ID, sJob.Type, sJob.Dir) + js.logger.Printf("JOBS: Dispatching next job %q (scheduler prio: %d, job prio: %d, isDirOpen: %t): %q for %q", + sJob.ID, priority, sJob.Priority, sJob.IsDirOpen, sJob.Type, sJob.Dir) return sJob.ID, sJob.Job, nil } func isDirOpen(txn *memdb.Txn, dirHandle document.DirHandle) bool { - docObj, err := txn.First(documentsTableName, "id", dirHandle) + docObj, err := txn.First(documentsTableName, "dir", dirHandle) if err != nil { return false } diff --git a/internal/state/jobs_test.go b/internal/state/jobs_test.go index 57e182c3..516b2c1a 100644 --- a/internal/state/jobs_test.go +++ b/internal/state/jobs_test.go @@ -63,6 +63,52 @@ func TestJobStore_EnqueueJob(t *testing.T) { } } +func TestJobStore_EnqueueJob_openDir(t *testing.T) { + ss, err := NewStateStore() + if err != nil { + t.Fatal(err) + } + + dirHandle := document.DirHandleFromPath("/test-1") + + err = ss.DocumentStore.OpenDocument(document.Handle{Dir: dirHandle, Filename: "test.tf"}, "test", 0, []byte{}) + if err != nil { + t.Fatal(err) + } + + id, err := ss.JobStore.EnqueueJob(job.Job{ + Func: func(ctx context.Context) error { + return nil + }, + Dir: dirHandle, + Type: "test-type", + }) + if err != nil { + t.Fatal(err) + } + + // verify that job for open dir comes is treated as high priority + ctx := context.Background() + ctx, cancelFunc := context.WithTimeout(ctx, 250*time.Millisecond) + t.Cleanup(cancelFunc) + nextId, j, err := ss.JobStore.AwaitNextJob(ctx, job.HighPriority) + if err != nil { + t.Fatal(err) + } + + if nextId != id { + t.Fatalf("expected next job ID %q, given: %q", id, nextId) + } + + if j.Dir != dirHandle { + t.Fatalf("expected next job dir %q, given: %q", dirHandle, j.Dir) + } + + if j.Type != "test-type" { + t.Fatalf("expected next job dir %q, given: %q", "test-type", j.Type) + } +} + func BenchmarkJobStore_EnqueueJob_basic(b *testing.B) { ss, err := NewStateStore() if err != nil { @@ -237,7 +283,7 @@ func TestJobStore_AwaitNextJob_closedOnly(t *testing.T) { } ctx := context.Background() - nextId, job, err := ss.JobStore.AwaitNextJob(ctx, false) + nextId, j, err := ss.JobStore.AwaitNextJob(ctx, job.LowPriority) if err != nil { t.Fatal(err) } @@ -246,17 +292,17 @@ func TestJobStore_AwaitNextJob_closedOnly(t *testing.T) { t.Fatalf("expected next job ID %q, given: %q", id1, nextId) } - if job.Dir != firstDir { - t.Fatalf("expected next job dir %q, given: %q", firstDir, job.Dir) + if j.Dir != firstDir { + t.Fatalf("expected next job dir %q, given: %q", firstDir, j.Dir) } - if job.Type != "test-type" { - t.Fatalf("expected next job dir %q, given: %q", "test-type", job.Type) + if j.Type != "test-type" { + t.Fatalf("expected next job dir %q, given: %q", "test-type", j.Type) } ctx, cancelFunc := context.WithTimeout(ctx, 250*time.Millisecond) t.Cleanup(cancelFunc) - nextId, job, err = ss.JobStore.AwaitNextJob(ctx, false) + nextId, j, err = ss.JobStore.AwaitNextJob(ctx, job.LowPriority) if err != nil { if !errors.Is(err, context.DeadlineExceeded) { t.Fatalf("%#v", err) @@ -300,7 +346,7 @@ func TestJobStore_AwaitNextJob_openOnly(t *testing.T) { } ctx := context.Background() - nextId, job, err := ss.JobStore.AwaitNextJob(ctx, true) + nextId, j, err := ss.JobStore.AwaitNextJob(ctx, job.HighPriority) if err != nil { t.Fatal(err) } @@ -309,21 +355,197 @@ func TestJobStore_AwaitNextJob_openOnly(t *testing.T) { t.Fatalf("expected next job ID %q, given: %q", id2, nextId) } - if job.Dir != secondDir { - t.Fatalf("expected next job dir %q, given: %q", secondDir, job.Dir) + if j.Dir != secondDir { + t.Fatalf("expected next job dir %q, given: %q", secondDir, j.Dir) } - if job.Type != "test-type" { - t.Fatalf("expected next job dir %q, given: %q", "test-type", job.Type) + if j.Type != "test-type" { + t.Fatalf("expected next job dir %q, given: %q", "test-type", j.Type) } ctx, cancelFunc := context.WithTimeout(ctx, 250*time.Millisecond) t.Cleanup(cancelFunc) - nextId, job, err = ss.JobStore.AwaitNextJob(ctx, true) + nextId, j, err = ss.JobStore.AwaitNextJob(ctx, job.HighPriority) + if err != nil { + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("%#v", err) + } + } +} + +func TestJobStore_AwaitNextJob_highPriority(t *testing.T) { + ss, err := NewStateStore() + if err != nil { + t.Fatal(err) + } + + firstDir := document.DirHandleFromPath("/test-1") + id1, err := ss.JobStore.EnqueueJob(job.Job{ + Func: func(ctx context.Context) error { + return nil + }, + Dir: firstDir, + Type: "test-type", + Priority: job.HighPriority, + }) + if err != nil { + t.Fatal(err) + } + + secondDir := document.DirHandleFromPath("/test-2") + id2, err := ss.JobStore.EnqueueJob(job.Job{ + Func: func(ctx context.Context) error { + return nil + }, + Dir: secondDir, + Type: "test-type", + }) + if err != nil { + t.Fatal(err) + } + + err = ss.DocumentStore.OpenDocument(document.Handle{Dir: secondDir, Filename: "test.tf"}, "test", 0, []byte{}) + if err != nil { + t.Fatal(err) + } + + ctx := context.Background() + nextId, j, err := ss.JobStore.AwaitNextJob(ctx, job.HighPriority) + if err != nil { + t.Fatal(err) + } + + if nextId != id1 { + t.Fatalf("expected next job ID %q, given: %q", id1, nextId) + } + + if j.Dir != firstDir { + t.Fatalf("expected next job dir %q, given: %q", firstDir, j.Dir) + } + + if j.Type != "test-type" { + t.Fatalf("expected next job dir %q, given: %q", "test-type", j.Type) + } + + nextId, j, err = ss.JobStore.AwaitNextJob(ctx, job.HighPriority) + if err != nil { + t.Fatal(err) + } + + if nextId != id2 { + t.Fatalf("expected next job ID %q, given: %q", id2, nextId) + } + + if j.Dir != secondDir { + t.Fatalf("expected next job dir %q, given: %q", secondDir, j.Dir) + } + + if j.Type != "test-type" { + t.Fatalf("expected next job dir %q, given: %q", "test-type", j.Type) + } + + ctx, cancelFunc := context.WithTimeout(ctx, 250*time.Millisecond) + t.Cleanup(cancelFunc) + nextId, j, err = ss.JobStore.AwaitNextJob(ctx, job.HighPriority) + if err != nil { + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("%#v", err) + } + } +} + +func TestJobStore_AwaitNextJob_lowPriority(t *testing.T) { + ss, err := NewStateStore() + if err != nil { + t.Fatal(err) + } + + firstDir := document.DirHandleFromPath("/test-1") + id1, err := ss.JobStore.EnqueueJob(job.Job{ + Func: func(ctx context.Context) error { + return nil + }, + Dir: firstDir, + Type: "test-type", + }) + if err != nil { + t.Fatal(err) + } + + secondDir := document.DirHandleFromPath("/test-2") + id2, err := ss.JobStore.EnqueueJob(job.Job{ + Func: func(ctx context.Context) error { + return nil + }, + Dir: secondDir, + Type: "test-type", + Priority: job.LowPriority, + }) + if err != nil { + t.Fatal(err) + } + + err = ss.DocumentStore.OpenDocument(document.Handle{Dir: secondDir, Filename: "test.tf"}, "test", 0, []byte{}) + if err != nil { + t.Fatal(err) + } + + baseCtx := context.Background() + + ctx, cancelFunc := context.WithTimeout(baseCtx, 250*time.Millisecond) + t.Cleanup(cancelFunc) + _, _, err = ss.JobStore.AwaitNextJob(ctx, job.HighPriority) + if err != nil { + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("%#v", err) + } + } else { + t.Fatal("expected error") + } + + nextId, j, err := ss.JobStore.AwaitNextJob(baseCtx, job.LowPriority) + if err != nil { + t.Fatal(err) + } + + if nextId != id1 { + t.Fatalf("expected next job ID %q, given: %q", id1, nextId) + } + + if j.Dir != firstDir { + t.Fatalf("expected next job dir %q, given: %q", firstDir, j.Dir) + } + + if j.Type != "test-type" { + t.Fatalf("expected next job dir %q, given: %q", "test-type", j.Type) + } + + nextId, j, err = ss.JobStore.AwaitNextJob(baseCtx, job.LowPriority) + if err != nil { + t.Fatal(err) + } + + if nextId != id2 { + t.Fatalf("expected next job ID %q, given: %q", id2, nextId) + } + + if j.Dir != secondDir { + t.Fatalf("expected next job dir %q, given: %q", secondDir, j.Dir) + } + + if j.Type != "test-type" { + t.Fatalf("expected next job dir %q, given: %q", "test-type", j.Type) + } + + ctx, cancelFunc = context.WithTimeout(baseCtx, 250*time.Millisecond) + t.Cleanup(cancelFunc) + nextId, j, err = ss.JobStore.AwaitNextJob(ctx, job.HighPriority) if err != nil { if !errors.Is(err, context.DeadlineExceeded) { t.Fatalf("%#v", err) } + } else { + t.Fatal("expected error") } } diff --git a/internal/state/priority_index.go b/internal/state/priority_index.go new file mode 100644 index 00000000..ffadb853 --- /dev/null +++ b/internal/state/priority_index.go @@ -0,0 +1,83 @@ +package state + +import ( + "fmt" + "reflect" + + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/terraform-ls/internal/job" +) + +type JobPriorityIndex struct { + PriorityIntField string + IsDirOpenBoolField string +} + +func (jpi *JobPriorityIndex) FromObject(obj interface{}) (bool, []byte, error) { + prioField, size, err := getIntField(obj, jpi.PriorityIntField) + if err != nil { + return false, []byte{}, err + } + if !prioField.IsZero() { + // Get the value and encode it + val := prioField.Int() + buf := encodeInt(val, size) + return true, buf, nil + } + + // Where explicit priority is not set + // imply it from IsDirOpenBoolField + isDirOpenField, err := getBoolField(obj, jpi.IsDirOpenBoolField) + if err != nil { + return false, []byte{}, err + } + impliedPriority := job.LowPriority + if isDirOpenField.Bool() { + impliedPriority = job.HighPriority + } + + buf := encodeInt(int64(impliedPriority), size) + return true, buf, nil +} + +func getIntField(obj interface{}, fieldName string) (reflect.Value, int, error) { + v := reflect.ValueOf(obj) + v = reflect.Indirect(v) // Dereference the pointer if any + + fieldValue := v.FieldByName(fieldName) + if !fieldValue.IsValid() { + return reflect.Value{}, 0, fmt.Errorf("field '%s' for %#v is invalid", fieldName, obj) + } + + // Check the type + k := fieldValue.Kind() + size, ok := memdb.IsIntType(k) + if !ok { + return reflect.Value{}, 0, fmt.Errorf("field %q is of type %v; want an int", fieldName, k) + } + + return fieldValue, size, nil +} + +func getBoolField(obj interface{}, fieldName string) (reflect.Value, error) { + v := reflect.ValueOf(obj) + v = reflect.Indirect(v) // Dereference the pointer if any + + fieldValue := v.FieldByName(fieldName) + if !fieldValue.IsValid() { + return reflect.Value{}, fmt.Errorf("field '%s' for %#v is invalid", fieldName, obj) + } + + // Check the type + k := fieldValue.Kind() + if k != reflect.Bool { + return reflect.Value{}, fmt.Errorf("field %q is of type %v; want a bool", fieldName, k) + } + + return fieldValue, nil +} + +func (jpi *JobPriorityIndex) FromArgs(args ...interface{}) ([]byte, error) { + intIdx := &memdb.IntFieldIndex{} + return intIdx.FromArgs(args...) +} diff --git a/internal/state/state.go b/internal/state/state.go index 0815f097..c32ba9ea 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -55,11 +55,14 @@ var dbSchema = &memdb.DBSchema{ Unique: true, Indexer: &StringerFieldIndexer{Field: "ID"}, }, - "is_dir_open_state": { - Name: "is_dir_open_state", + "priority_state": { + Name: "priority_state", Indexer: &memdb.CompoundIndex{ Indexes: []memdb.Indexer{ - &memdb.BoolFieldIndex{Field: "IsDirOpen"}, + &JobPriorityIndex{ + PriorityIntField: "Priority", + IsDirOpenBoolField: "IsDirOpen", + }, &memdb.UintFieldIndex{Field: "State"}, }, }, @@ -271,11 +274,11 @@ func NewStateStore() (*StateStore, error) { TimeProvider: time.Now, }, JobStore: &JobStore{ - db: db, - tableName: jobsTableName, - logger: defaultLogger, - nextJobOpenDirMu: &sync.Mutex{}, - nextJobClosedDirMu: &sync.Mutex{}, + db: db, + tableName: jobsTableName, + logger: defaultLogger, + nextJobHighPrioMu: &sync.Mutex{}, + nextJobLowPrioMu: &sync.Mutex{}, }, Modules: &ModuleStore{ db: db, diff --git a/internal/terraform/module/module_manager_test.go b/internal/terraform/module/module_manager_test.go index 5c0e01f0..1a174b67 100644 --- a/internal/terraform/module/module_manager_test.go +++ b/internal/terraform/module/module_manager_test.go @@ -225,7 +225,7 @@ func TestWalker_complexModules(t *testing.T) { ExecPath: "tf-mock", }) - s := scheduler.NewScheduler(&closedJobStore{ss.JobStore}, 1) + s := scheduler.NewScheduler(ss.JobStore, 1, job.LowPriority) ss.SetLogger(testLogger()) s.Start(ctx) @@ -352,23 +352,3 @@ func testLogger() *log.Logger { return log.New(ioutil.Discard, "", 0) } - -type closedJobStore struct { - js *state.JobStore -} - -func (js *closedJobStore) EnqueueJob(newJob job.Job) (job.ID, error) { - return js.js.EnqueueJob(newJob) -} - -func (js *closedJobStore) AwaitNextJob(ctx context.Context) (job.ID, job.Job, error) { - return js.js.AwaitNextJob(ctx, false) -} - -func (js *closedJobStore) FinishJob(id job.ID, jobErr error, deferredJobIds ...job.ID) error { - return js.js.FinishJob(id, jobErr, deferredJobIds...) -} - -func (js *closedJobStore) WaitForJobs(ctx context.Context, jobIds ...job.ID) error { - return js.js.WaitForJobs(ctx, jobIds...) -}