From 7d889dd4d26118dfd9c7940e6a986af3b8418efb Mon Sep 17 00:00:00 2001 From: Louis Garman Date: Sun, 19 Jan 2025 09:15:13 +0000 Subject: [PATCH] refactor: group together advisory lock ids --- internal/daemon/daemon.go | 58 ++++++++++++++---------------- internal/daemon/subsystem.go | 22 +++++------- internal/daemon/subsystem_test.go | 7 ++-- internal/notifications/notifier.go | 4 --- internal/run/reporter.go | 3 -- internal/run/scheduler.go | 4 --- internal/run/timeout.go | 3 -- internal/runner/allocator.go | 4 --- internal/runner/manager.go | 4 --- internal/sql/locks.go | 13 +++++++ 10 files changed, 50 insertions(+), 72 deletions(-) create mode 100644 internal/sql/locks.go diff --git a/internal/daemon/daemon.go b/internal/daemon/daemon.go index a95466403..1dfc46b37 100644 --- a/internal/daemon/daemon.go +++ b/internal/daemon/daemon.go @@ -481,11 +481,10 @@ func (d *Daemon) Start(ctx context.Context, started chan struct{}) error { System: d.Logs, }, { - Name: "reporter", - Logger: d.Logger, - Exclusive: true, - DB: d.DB, - LockID: internal.Int64(run.ReporterLockID), + Name: "reporter", + Logger: d.Logger, + DB: d.DB, + LockID: internal.Int64(sql.ReporterLockID), System: &run.Reporter{ Logger: d.Logger.WithValues("component", "reporter"), VCS: d.VCSProviders, @@ -504,11 +503,10 @@ func (d *Daemon) Start(ctx context.Context, started chan struct{}) error { }, }, { - Name: "timeout", - Logger: d.Logger, - Exclusive: true, - DB: d.DB, - LockID: internal.Int64(run.TimeoutLockID), + Name: "timeout", + Logger: d.Logger, + DB: d.DB, + LockID: internal.Int64(sql.TimeoutLockID), System: &run.Timeout{ Logger: d.Logger.WithValues("component", "timeout"), OverrideCheckInterval: d.OverrideTimeoutCheckInterval, @@ -518,11 +516,10 @@ func (d *Daemon) Start(ctx context.Context, started chan struct{}) error { }, }, { - Name: "notifier", - Logger: d.Logger, - Exclusive: true, - DB: d.DB, - LockID: internal.Int64(notifications.LockID), + Name: "notifier", + Logger: d.Logger, + DB: d.DB, + LockID: internal.Int64(sql.NotifierLockID), System: notifications.NewNotifier(notifications.NotifierOptions{ Logger: d.Logger, HostnameService: d.System, @@ -533,20 +530,18 @@ func (d *Daemon) Start(ctx context.Context, started chan struct{}) error { }), }, { - Name: "job-allocator", - Logger: d.Logger, - Exclusive: true, - DB: d.DB, - LockID: internal.Int64(runner.AllocatorLockID), - System: d.Runners.NewAllocator(d.Logger), + Name: "job-allocator", + Logger: d.Logger, + DB: d.DB, + LockID: internal.Int64(sql.AllocatorLockID), + System: d.Runners.NewAllocator(d.Logger), }, { - Name: "runner-manager", - Logger: d.Logger, - Exclusive: true, - DB: d.DB, - LockID: internal.Int64(runner.ManagerLockID), - System: d.Runners.NewManager(), + Name: "runner-manager", + Logger: d.Logger, + DB: d.DB, + LockID: internal.Int64(sql.RunnerManagerLockID), + System: d.Runners.NewManager(), }, { Name: "runner-daemon", @@ -557,11 +552,10 @@ func (d *Daemon) Start(ctx context.Context, started chan struct{}) error { } if !d.DisableScheduler { subsystems = append(subsystems, &Subsystem{ - Name: "scheduler", - Logger: d.Logger, - Exclusive: true, - DB: d.DB, - LockID: internal.Int64(run.SchedulerLockID), + Name: "scheduler", + Logger: d.Logger, + DB: d.DB, + LockID: internal.Int64(sql.SchedulerLockID), System: run.NewScheduler(run.SchedulerOptions{ Logger: d.Logger, WorkspaceClient: d.Workspaces, diff --git a/internal/daemon/subsystem.go b/internal/daemon/subsystem.go index 88a939433..e72063176 100644 --- a/internal/daemon/subsystem.go +++ b/internal/daemon/subsystem.go @@ -18,13 +18,12 @@ type ( Name string // System is the underlying system to be invoked and supervised. System Startable - // Exclusive: permit only one instance of this subsystem on an OTF - // cluster - Exclusive bool - // DB for obtaining cluster-wide lock. Must be non-nil if Exclusive is - // true. + // DB for obtaining cluster-wide lock. Must be non-nil if LockID is + // non-nil DB subsystemDB - // Cluster-unique lock ID. Must be non-nil if Exclusive is true. + // Cluster-unique lock ID. If non-nil then only one instance of this + // subsystem will run on an OTF cluster. If non-nil then DB must also be + // non-nil. LockID *int64 logr.Logger } @@ -40,13 +39,8 @@ type ( ) func (s *Subsystem) Start(ctx context.Context, g *errgroup.Group) error { - if s.Exclusive { - if s.LockID == nil { - return errors.New("exclusive subsystem must have non-nil lock ID") - } - if s.DB == nil { - return errors.New("exclusive subsystem must have non-nil database") - } + if s.LockID != nil && s.DB == nil { + return errors.New("lock ID requires that DB also be set") } // Confer all privileges to subsystem and identify subsystem in service @@ -58,7 +52,7 @@ func (s *Subsystem) Start(ctx context.Context, g *errgroup.Group) error { s.V(1).Info("started subsystem", "name", s.Name) return s.System.Start(ctx) } - if s.Exclusive { + if s.LockID != nil { // block on getting an exclusive lock err = s.DB.WaitAndLock(ctx, *s.LockID, start) } else { diff --git a/internal/daemon/subsystem_test.go b/internal/daemon/subsystem_test.go index f0173fe1b..451cc70f8 100644 --- a/internal/daemon/subsystem_test.go +++ b/internal/daemon/subsystem_test.go @@ -23,10 +23,9 @@ func TestSubsystem(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { sub := &Subsystem{ - Name: tt.name, - System: &fakeStartable{}, - Logger: logr.Discard(), - Exclusive: tt.exclusive, + Name: tt.name, + System: &fakeStartable{}, + Logger: logr.Discard(), } if tt.exclusive { sub.DB = &fakeWaitAndLock{} diff --git a/internal/notifications/notifier.go b/internal/notifications/notifier.go index 59d18540c..0e69abe34 100644 --- a/internal/notifications/notifier.go +++ b/internal/notifications/notifier.go @@ -13,10 +13,6 @@ import ( "github.com/leg100/otf/internal/workspace" ) -// LockID guarantees only one notifier on a cluster is running at any -// time. -const LockID int64 = 5577006791947779411 - type ( // Notifier relays run events onto interested parties Notifier struct { diff --git a/internal/run/reporter.go b/internal/run/reporter.go index cb4d5efc7..05701ecfa 100644 --- a/internal/run/reporter.go +++ b/internal/run/reporter.go @@ -14,9 +14,6 @@ import ( "github.com/leg100/otf/internal/workspace" ) -// ReporterLockID is a unique ID guaranteeing only one reporter on a cluster is running at any time. -const ReporterLockID int64 = 179366396344335597 - type ( // Reporter reports back to VCS providers the current status of VCS-triggered // runs. diff --git a/internal/run/scheduler.go b/internal/run/scheduler.go index 656e4a721..30d431929 100644 --- a/internal/run/scheduler.go +++ b/internal/run/scheduler.go @@ -12,10 +12,6 @@ import ( "github.com/pkg/errors" ) -// SchedulerLockID guarantees only one scheduler on a cluster is running at any -// time. -const SchedulerLockID int64 = 5577006791947779410 - type ( // scheduler performs two principle tasks : // (a) manages lifecycle of workspace queues, creating/destroying them diff --git a/internal/run/timeout.go b/internal/run/timeout.go index b668c31bb..ac5348ab5 100644 --- a/internal/run/timeout.go +++ b/internal/run/timeout.go @@ -11,9 +11,6 @@ import ( "golang.org/x/exp/maps" ) -// TimeoutLockID is a unique ID guaranteeing only one timeout daemon on a cluster is running at any time. -const TimeoutLockID int64 = 179366396344335598 - // By default check timed out runs every minute var defaultCheckInterval = time.Minute diff --git a/internal/runner/allocator.go b/internal/runner/allocator.go index 3840707cc..5fb99a115 100644 --- a/internal/runner/allocator.go +++ b/internal/runner/allocator.go @@ -10,10 +10,6 @@ import ( "github.com/leg100/otf/internal/resource" ) -// AllocatorLockID guarantees only one allocator on a cluster is running at any -// time. -const AllocatorLockID int64 = 5577006791947779412 - // allocator allocates jobs to runners. Only one allocator must be active on // an OTF cluster at any one time. type allocator struct { diff --git a/internal/runner/manager.go b/internal/runner/manager.go index d33dcebd0..44ba84c69 100644 --- a/internal/runner/manager.go +++ b/internal/runner/manager.go @@ -14,10 +14,6 @@ var ( defaultManagerInterval = 10 * time.Second ) -// ManagerLockID guarantees only one manager on a cluster is running at any -// time. -const ManagerLockID int64 = 5577006791947779413 - // manager manages the state of runners. // // Only one manager should be running on an OTF cluster at any one time. diff --git a/internal/sql/locks.go b/internal/sql/locks.go new file mode 100644 index 000000000..85a065d75 --- /dev/null +++ b/internal/sql/locks.go @@ -0,0 +1,13 @@ +package sql + +// Postgres advisory lock IDs for each subsystem to ensure only one of each +// subsystem is running on an OTF cluster. It's important that they don't share the same +// value, hence placing them all in one place makes sense. +const ( + ReporterLockID int64 = iota + 179366396344335597 + TimeoutLockID + SchedulerLockID + NotifierLockID + AllocatorLockID + RunnerManagerLockID +)