Skip to content

Commit

Permalink
refactor: group together advisory lock ids
Browse files Browse the repository at this point in the history
  • Loading branch information
leg100 committed Jan 19, 2025
1 parent 9c3afcc commit 7d889dd
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 72 deletions.
58 changes: 26 additions & 32 deletions internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,11 +481,10 @@ func (d *Daemon) Start(ctx context.Context, started chan struct{}) error {
System: d.Logs,
},
{
Name: "reporter",
Logger: d.Logger,
Exclusive: true,
DB: d.DB,
LockID: internal.Int64(run.ReporterLockID),
Name: "reporter",
Logger: d.Logger,
DB: d.DB,
LockID: internal.Int64(sql.ReporterLockID),
System: &run.Reporter{
Logger: d.Logger.WithValues("component", "reporter"),
VCS: d.VCSProviders,
Expand All @@ -504,11 +503,10 @@ func (d *Daemon) Start(ctx context.Context, started chan struct{}) error {
},
},
{
Name: "timeout",
Logger: d.Logger,
Exclusive: true,
DB: d.DB,
LockID: internal.Int64(run.TimeoutLockID),
Name: "timeout",
Logger: d.Logger,
DB: d.DB,
LockID: internal.Int64(sql.TimeoutLockID),
System: &run.Timeout{
Logger: d.Logger.WithValues("component", "timeout"),
OverrideCheckInterval: d.OverrideTimeoutCheckInterval,
Expand All @@ -518,11 +516,10 @@ func (d *Daemon) Start(ctx context.Context, started chan struct{}) error {
},
},
{
Name: "notifier",
Logger: d.Logger,
Exclusive: true,
DB: d.DB,
LockID: internal.Int64(notifications.LockID),
Name: "notifier",
Logger: d.Logger,
DB: d.DB,
LockID: internal.Int64(sql.NotifierLockID),
System: notifications.NewNotifier(notifications.NotifierOptions{
Logger: d.Logger,
HostnameService: d.System,
Expand All @@ -533,20 +530,18 @@ func (d *Daemon) Start(ctx context.Context, started chan struct{}) error {
}),
},
{
Name: "job-allocator",
Logger: d.Logger,
Exclusive: true,
DB: d.DB,
LockID: internal.Int64(runner.AllocatorLockID),
System: d.Runners.NewAllocator(d.Logger),
Name: "job-allocator",
Logger: d.Logger,
DB: d.DB,
LockID: internal.Int64(sql.AllocatorLockID),
System: d.Runners.NewAllocator(d.Logger),
},
{
Name: "runner-manager",
Logger: d.Logger,
Exclusive: true,
DB: d.DB,
LockID: internal.Int64(runner.ManagerLockID),
System: d.Runners.NewManager(),
Name: "runner-manager",
Logger: d.Logger,
DB: d.DB,
LockID: internal.Int64(sql.RunnerManagerLockID),
System: d.Runners.NewManager(),
},
{
Name: "runner-daemon",
Expand All @@ -557,11 +552,10 @@ func (d *Daemon) Start(ctx context.Context, started chan struct{}) error {
}
if !d.DisableScheduler {
subsystems = append(subsystems, &Subsystem{
Name: "scheduler",
Logger: d.Logger,
Exclusive: true,
DB: d.DB,
LockID: internal.Int64(run.SchedulerLockID),
Name: "scheduler",
Logger: d.Logger,
DB: d.DB,
LockID: internal.Int64(sql.SchedulerLockID),
System: run.NewScheduler(run.SchedulerOptions{
Logger: d.Logger,
WorkspaceClient: d.Workspaces,
Expand Down
22 changes: 8 additions & 14 deletions internal/daemon/subsystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ type (
Name string
// System is the underlying system to be invoked and supervised.
System Startable
// Exclusive: permit only one instance of this subsystem on an OTF
// cluster
Exclusive bool
// DB for obtaining cluster-wide lock. Must be non-nil if Exclusive is
// true.
// DB for obtaining cluster-wide lock. Must be non-nil if LockID is
// non-nil
DB subsystemDB
// Cluster-unique lock ID. Must be non-nil if Exclusive is true.
// Cluster-unique lock ID. If non-nil then only one instance of this
// subsystem will run on an OTF cluster. If non-nil then DB must also be
// non-nil.
LockID *int64
logr.Logger
}
Expand All @@ -40,13 +39,8 @@ type (
)

func (s *Subsystem) Start(ctx context.Context, g *errgroup.Group) error {
if s.Exclusive {
if s.LockID == nil {
return errors.New("exclusive subsystem must have non-nil lock ID")
}
if s.DB == nil {
return errors.New("exclusive subsystem must have non-nil database")
}
if s.LockID != nil && s.DB == nil {
return errors.New("lock ID requires that DB also be set")
}

// Confer all privileges to subsystem and identify subsystem in service
Expand All @@ -58,7 +52,7 @@ func (s *Subsystem) Start(ctx context.Context, g *errgroup.Group) error {
s.V(1).Info("started subsystem", "name", s.Name)
return s.System.Start(ctx)
}
if s.Exclusive {
if s.LockID != nil {
// block on getting an exclusive lock
err = s.DB.WaitAndLock(ctx, *s.LockID, start)
} else {
Expand Down
7 changes: 3 additions & 4 deletions internal/daemon/subsystem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ func TestSubsystem(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sub := &Subsystem{
Name: tt.name,
System: &fakeStartable{},
Logger: logr.Discard(),
Exclusive: tt.exclusive,
Name: tt.name,
System: &fakeStartable{},
Logger: logr.Discard(),
}
if tt.exclusive {
sub.DB = &fakeWaitAndLock{}
Expand Down
4 changes: 0 additions & 4 deletions internal/notifications/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ import (
"github.com/leg100/otf/internal/workspace"
)

// LockID guarantees only one notifier on a cluster is running at any
// time.
const LockID int64 = 5577006791947779411

type (
// Notifier relays run events onto interested parties
Notifier struct {
Expand Down
3 changes: 0 additions & 3 deletions internal/run/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ import (
"github.com/leg100/otf/internal/workspace"
)

// ReporterLockID is a unique ID guaranteeing only one reporter on a cluster is running at any time.
const ReporterLockID int64 = 179366396344335597

type (
// Reporter reports back to VCS providers the current status of VCS-triggered
// runs.
Expand Down
4 changes: 0 additions & 4 deletions internal/run/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ import (
"github.com/pkg/errors"
)

// SchedulerLockID guarantees only one scheduler on a cluster is running at any
// time.
const SchedulerLockID int64 = 5577006791947779410

type (
// scheduler performs two principle tasks :
// (a) manages lifecycle of workspace queues, creating/destroying them
Expand Down
3 changes: 0 additions & 3 deletions internal/run/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ import (
"golang.org/x/exp/maps"
)

// TimeoutLockID is a unique ID guaranteeing only one timeout daemon on a cluster is running at any time.
const TimeoutLockID int64 = 179366396344335598

// By default check timed out runs every minute
var defaultCheckInterval = time.Minute

Expand Down
4 changes: 0 additions & 4 deletions internal/runner/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ import (
"github.com/leg100/otf/internal/resource"
)

// AllocatorLockID guarantees only one allocator on a cluster is running at any
// time.
const AllocatorLockID int64 = 5577006791947779412

// allocator allocates jobs to runners. Only one allocator must be active on
// an OTF cluster at any one time.
type allocator struct {
Expand Down
4 changes: 0 additions & 4 deletions internal/runner/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ var (
defaultManagerInterval = 10 * time.Second
)

// ManagerLockID guarantees only one manager on a cluster is running at any
// time.
const ManagerLockID int64 = 5577006791947779413

// manager manages the state of runners.
//
// Only one manager should be running on an OTF cluster at any one time.
Expand Down
13 changes: 13 additions & 0 deletions internal/sql/locks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package sql

// Postgres advisory lock IDs for each subsystem to ensure only one of each
// subsystem is running on an OTF cluster. It's important that they don't share the same
// value, hence placing them all in one place makes sense.
const (
ReporterLockID int64 = iota + 179366396344335597
TimeoutLockID
SchedulerLockID
NotifierLockID
AllocatorLockID
RunnerManagerLockID
)

0 comments on commit 7d889dd

Please sign in to comment.