Skip to content

Commit

Permalink
job: Introduce explicit priority for jobs (#977)
Browse files Browse the repository at this point in the history
* job: Introduce explicit priority for jobs

* cleanup scheduler logic

* fix: Read open dir mark correctly
  • Loading branch information
radeksimko authored Jun 30, 2022
1 parent 2247099 commit 68e2bf6
Show file tree
Hide file tree
Showing 11 changed files with 402 additions and 179 deletions.
20 changes: 16 additions & 4 deletions internal/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ type Job struct {
// which is used for deduplication of queued jobs along with Dir.
Type string

// Priority represents priority with which the job should be scheduled.
// This overrides the priority implied from whether the dir is open.
Priority JobPriority

// Defer is a function to execute after Func is executed
// and before the job is marked as done (StateDone).
// This can be used to schedule jobs dependent on the main job.
Expand All @@ -32,9 +36,17 @@ type DeferFunc func(ctx context.Context, jobErr error) IDs

func (job Job) Copy() Job {
return Job{
Func: job.Func,
Dir: job.Dir,
Type: job.Type,
Defer: job.Defer,
Func: job.Func,
Dir: job.Dir,
Type: job.Type,
Priority: job.Priority,
Defer: job.Defer,
}
}

type JobPriority int

const (
LowPriority JobPriority = -1
HighPriority JobPriority = 1
)
6 changes: 3 additions & 3 deletions internal/langserver/handlers/did_open.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,18 +175,18 @@ func (svc *service) decodeModule(ctx context.Context, modHandle document.DirHand
}
ids = append(ids, id)

id, err = svc.stateStore.JobStore.EnqueueJob(job.Job{
_, err = svc.stateStore.JobStore.EnqueueJob(job.Job{
Dir: modHandle,
Func: func(ctx context.Context) error {
return module.GetModuleDataFromRegistry(svc.srvCtx, svc.registryClient,
svc.modStore, svc.stateStore.RegistryModules, modHandle.Path())
},
Type: op.OpTypeGetModuleDataFromRegistry.String(),
Priority: job.LowPriority,
Type: op.OpTypeGetModuleDataFromRegistry.String(),
})
if err != nil {
return
}
ids = append(ids, id)

return
},
Expand Down
48 changes: 0 additions & 48 deletions internal/langserver/handlers/indexers.go

This file was deleted.

29 changes: 15 additions & 14 deletions internal/langserver/handlers/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
idecoder "github.com/hashicorp/terraform-ls/internal/decoder"
"github.com/hashicorp/terraform-ls/internal/document"
"github.com/hashicorp/terraform-ls/internal/filesystem"
"github.com/hashicorp/terraform-ls/internal/job"
"github.com/hashicorp/terraform-ls/internal/langserver/diagnostics"
"github.com/hashicorp/terraform-ls/internal/langserver/notifier"
"github.com/hashicorp/terraform-ls/internal/langserver/session"
Expand All @@ -41,8 +42,8 @@ type service struct {
sessCtx context.Context
stopSession context.CancelFunc

closedDirIndexer *scheduler.Scheduler
openDirIndexer *scheduler.Scheduler
lowPrioIndexer *scheduler.Scheduler
highPrioIndexer *scheduler.Scheduler

closedDirWalker *module.Walker
openDirWalker *module.Walker
Expand Down Expand Up @@ -437,15 +438,15 @@ func (svc *service) configureSessionDependencies(ctx context.Context, cfgOpts *s
sendModuleTelemetry(svc.stateStore, svc.telemetry),
}

svc.closedDirIndexer = scheduler.NewScheduler(&closedDirJobStore{svc.stateStore.JobStore}, 1)
svc.closedDirIndexer.SetLogger(svc.logger)
svc.closedDirIndexer.Start(svc.sessCtx)
svc.logger.Printf("running closed dir scheduler")
svc.lowPrioIndexer = scheduler.NewScheduler(svc.stateStore.JobStore, 1, job.LowPriority)
svc.lowPrioIndexer.SetLogger(svc.logger)
svc.lowPrioIndexer.Start(svc.sessCtx)
svc.logger.Printf("started low priority scheduler")

svc.openDirIndexer = scheduler.NewScheduler(&openDirJobStore{svc.stateStore.JobStore}, 1)
svc.openDirIndexer.SetLogger(svc.logger)
svc.openDirIndexer.Start(svc.sessCtx)
svc.logger.Printf("running open dir scheduler")
svc.highPrioIndexer = scheduler.NewScheduler(svc.stateStore.JobStore, 1, job.HighPriority)
svc.highPrioIndexer.SetLogger(svc.logger)
svc.highPrioIndexer.Start(svc.sessCtx)
svc.logger.Printf("started high priority scheduler")

cc, err := ilsp.ClientCapabilities(ctx)
if err == nil {
Expand Down Expand Up @@ -532,11 +533,11 @@ func (svc *service) shutdown() {
svc.logger.Printf("openDirWalker stopped")
}

if svc.closedDirIndexer != nil {
svc.closedDirIndexer.Stop()
if svc.lowPrioIndexer != nil {
svc.lowPrioIndexer.Stop()
}
if svc.openDirIndexer != nil {
svc.openDirIndexer.Stop()
if svc.highPrioIndexer != nil {
svc.highPrioIndexer.Stop()
}
}

Expand Down
8 changes: 5 additions & 3 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,24 @@ type Scheduler struct {
logger *log.Logger
jobStorage JobStorage
parallelism int
priority job.JobPriority
stopFunc context.CancelFunc
}

type JobStorage interface {
job.JobStore
AwaitNextJob(ctx context.Context) (job.ID, job.Job, error)
AwaitNextJob(ctx context.Context, priority job.JobPriority) (job.ID, job.Job, error)
FinishJob(id job.ID, jobErr error, deferredJobIds ...job.ID) error
}

func NewScheduler(jobStorage JobStorage, parallelism int) *Scheduler {
func NewScheduler(jobStorage JobStorage, parallelism int, priority job.JobPriority) *Scheduler {
discardLogger := log.New(ioutil.Discard, "", 0)

return &Scheduler{
logger: discardLogger,
jobStorage: jobStorage,
parallelism: parallelism,
priority: priority,
stopFunc: func() {},
}
}
Expand All @@ -54,7 +56,7 @@ func (s *Scheduler) Stop() {

func (s *Scheduler) eval(ctx context.Context) {
for {
id, nextJob, err := s.jobStorage.AwaitNextJob(ctx)
id, nextJob, err := s.jobStorage.AwaitNextJob(ctx, s.priority)
if err != nil {
if errors.Is(err, context.Canceled) {
return
Expand Down
50 changes: 5 additions & 45 deletions internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestScheduler_closedOnly(t *testing.T) {

ctx := context.Background()

s := NewScheduler(&closedDirJobs{js: ss.JobStore}, 2)
s := NewScheduler(ss.JobStore, 2, job.LowPriority)
s.SetLogger(testLogger())
s.Start(ctx)
t.Cleanup(func() {
Expand Down Expand Up @@ -151,14 +151,14 @@ func TestScheduler_closedAndOpen(t *testing.T) {
t.Cleanup(cancelFunc)
}

cs := NewScheduler(&closedDirJobs{js: ss.JobStore}, 1)
cs := NewScheduler(ss.JobStore, 1, job.LowPriority)
cs.SetLogger(testLogger())
cs.Start(ctx)
t.Cleanup(func() {
cs.Stop()
})

os := NewScheduler(&openDirJobs{js: ss.JobStore}, 1)
os := NewScheduler(ss.JobStore, 1, job.HighPriority)
os.SetLogger(testLogger())
os.Start(ctx)
t.Cleanup(func() {
Expand Down Expand Up @@ -197,7 +197,7 @@ func BenchmarkScheduler_EnqueueAndWaitForJob_closedOnly(b *testing.B) {
tmpDir := b.TempDir()
ctx := context.Background()

s := NewScheduler(&closedDirJobs{js: ss.JobStore}, 1)
s := NewScheduler(ss.JobStore, 1, job.LowPriority)
s.Start(ctx)
b.Cleanup(func() {
s.Stop()
Expand Down Expand Up @@ -238,7 +238,7 @@ func TestScheduler_defer(t *testing.T) {

ctx := context.Background()

s := NewScheduler(&closedDirJobs{js: ss.JobStore}, 2)
s := NewScheduler(ss.JobStore, 2, job.LowPriority)
s.SetLogger(testLogger())
s.Start(ctx)
t.Cleanup(func() {
Expand Down Expand Up @@ -326,43 +326,3 @@ func testLogger() *log.Logger {

return log.New(ioutil.Discard, "", 0)
}

type closedDirJobs struct {
js *state.JobStore
}

func (js *closedDirJobs) EnqueueJob(newJob job.Job) (job.ID, error) {
return js.js.EnqueueJob(newJob)
}

func (js *closedDirJobs) AwaitNextJob(ctx context.Context) (job.ID, job.Job, error) {
return js.js.AwaitNextJob(ctx, false)
}

func (js *closedDirJobs) FinishJob(id job.ID, jobErr error, deferredJobIds ...job.ID) error {
return js.js.FinishJob(id, jobErr, deferredJobIds...)
}

func (js *closedDirJobs) WaitForJobs(ctx context.Context, jobIds ...job.ID) error {
return js.js.WaitForJobs(ctx, jobIds...)
}

type openDirJobs struct {
js *state.JobStore
}

func (js *openDirJobs) EnqueueJob(newJob job.Job) (job.ID, error) {
return js.js.EnqueueJob(newJob)
}

func (js *openDirJobs) AwaitNextJob(ctx context.Context) (job.ID, job.Job, error) {
return js.js.AwaitNextJob(ctx, true)
}

func (js *openDirJobs) FinishJob(id job.ID, jobErr error, deferredJobIds ...job.ID) error {
return js.js.FinishJob(id, jobErr, deferredJobIds...)
}

func (js *openDirJobs) WaitForJobs(ctx context.Context, jobIds ...job.ID) error {
return js.js.WaitForJobs(ctx, jobIds...)
}
50 changes: 29 additions & 21 deletions internal/state/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ type JobStore struct {
tableName string
logger *log.Logger

nextJobOpenDirMu *sync.Mutex
nextJobClosedDirMu *sync.Mutex
nextJobHighPrioMu *sync.Mutex
nextJobLowPrioMu *sync.Mutex

lastJobId uint64
}
Expand Down Expand Up @@ -79,17 +79,20 @@ func (js *JobStore) EnqueueJob(newJob job.Job) (job.ID, error) {

newJobID := job.ID(fmt.Sprintf("%d", atomic.AddUint64(&js.lastJobId, 1)))

err = txn.Insert(js.tableName, &ScheduledJob{
sJob := &ScheduledJob{
ID: newJobID,
Job: newJob,
IsDirOpen: isDirOpen(txn, newJob.Dir),
State: StateQueued,
})
}

err = txn.Insert(js.tableName, sJob)
if err != nil {
return "", err
}

js.logger.Printf("JOBS: Enqueueing new job %q: %q for %q", newJobID, newJob.Type, newJob.Dir)
js.logger.Printf("JOBS: Enqueueing new job %q: %q for %q (IsDirOpen: %t)",
sJob.ID, sJob.Type, sJob.Dir, sJob.IsDirOpen)

txn.Commit()

Expand Down Expand Up @@ -190,26 +193,30 @@ func (js *JobStore) jobExists(j job.Job, state State) (job.ID, bool, error) {
return "", false, nil
}

func (js *JobStore) AwaitNextJob(ctx context.Context, openDir bool) (job.ID, job.Job, error) {
func (js *JobStore) AwaitNextJob(ctx context.Context, priority job.JobPriority) (job.ID, job.Job, error) {
// Locking is needed if same query is executed in multiple threads,
// i.e. this method is called at the same time from different threads, at
// which point txn.FirstWatch would return the same job to more than
// one thread and we would then end up executing it more than once.
if openDir {
js.nextJobOpenDirMu.Lock()
defer js.nextJobOpenDirMu.Unlock()
} else {
js.nextJobClosedDirMu.Lock()
defer js.nextJobClosedDirMu.Unlock()
}

return js.awaitNextJob(ctx, openDir)
switch priority {
case job.HighPriority:
js.nextJobHighPrioMu.Lock()
defer js.nextJobHighPrioMu.Unlock()
case job.LowPriority:
js.nextJobLowPrioMu.Lock()
defer js.nextJobLowPrioMu.Unlock()
default:
// This should never happen
panic(fmt.Sprintf("unexpected priority: %#v", priority))
}

return js.awaitNextJob(ctx, priority)
}

func (js *JobStore) awaitNextJob(ctx context.Context, openDir bool) (job.ID, job.Job, error) {
func (js *JobStore) awaitNextJob(ctx context.Context, priority job.JobPriority) (job.ID, job.Job, error) {
txn := js.db.Txn(false)

wCh, obj, err := txn.FirstWatch(js.tableName, "is_dir_open_state", openDir, StateQueued)
wCh, obj, err := txn.FirstWatch(js.tableName, "priority_state", priority, StateQueued)
if err != nil {
return "", job.Job{}, err
}
Expand All @@ -221,7 +228,7 @@ func (js *JobStore) awaitNextJob(ctx context.Context, openDir bool) (job.ID, job
return "", job.Job{}, ctx.Err()
}

return js.awaitNextJob(ctx, openDir)
return js.awaitNextJob(ctx, priority)
}

sJob := obj.(*ScheduledJob)
Expand All @@ -235,18 +242,19 @@ func (js *JobStore) awaitNextJob(ctx context.Context, openDir bool) (job.ID, job
// Instead of adding more sync primitives here we simply retry.
if errors.Is(err, jobAlreadyRunning{ID: sJob.ID}) || errors.Is(err, jobNotFound{ID: sJob.ID}) {
js.logger.Printf("retrying next job: %s", err)
return js.awaitNextJob(ctx, openDir)
return js.awaitNextJob(ctx, priority)
}

return "", job.Job{}, err
}

js.logger.Printf("JOBS: Dispatching next job %q: %q for %q", sJob.ID, sJob.Type, sJob.Dir)
js.logger.Printf("JOBS: Dispatching next job %q (scheduler prio: %d, job prio: %d, isDirOpen: %t): %q for %q",
sJob.ID, priority, sJob.Priority, sJob.IsDirOpen, sJob.Type, sJob.Dir)
return sJob.ID, sJob.Job, nil
}

func isDirOpen(txn *memdb.Txn, dirHandle document.DirHandle) bool {
docObj, err := txn.First(documentsTableName, "id", dirHandle)
docObj, err := txn.First(documentsTableName, "dir", dirHandle)
if err != nil {
return false
}
Expand Down
Loading

0 comments on commit 68e2bf6

Please sign in to comment.