-
Notifications
You must be signed in to change notification settings - Fork 29
/
Copy pathpool.go
2111 lines (1881 loc) · 70 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2022 Cloudbase Solutions SRL
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package pool
import (
"context"
"fmt"
"log/slog"
"math"
"net/http"
"strconv"
"strings"
"sync"
"time"
"github.com/google/go-github/v57/github"
"github.com/google/uuid"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
runnerErrors "github.com/cloudbase/garm-provider-common/errors"
commonParams "github.com/cloudbase/garm-provider-common/params"
"github.com/cloudbase/garm-provider-common/util"
"github.com/cloudbase/garm/auth"
dbCommon "github.com/cloudbase/garm/database/common"
"github.com/cloudbase/garm/metrics"
"github.com/cloudbase/garm/params"
"github.com/cloudbase/garm/runner/common"
garmUtil "github.com/cloudbase/garm/util"
)
var (
poolIDLabelprefix = "runner-pool-id:"
controllerLabelPrefix = "runner-controller-id:"
// We tag runners that have been spawned as a result of a queued job with the job ID
// that spawned them. There is no way to guarantee that the runner spawned in response to a particular
// job, will be picked up by that job. We mark them so as in the very likely event that the runner
// has picked up a different job, we can clear the lock on the job that spaned it.
// The job it picked up would already be transitioned to in_progress so it will be ignored by the
// consume loop.
jobLabelPrefix = "in_response_to_job:"
)
const (
// maxCreateAttempts is the number of times we will attempt to create an instance
// before we give up.
//
// nolint:golangci-lint,godox
// TODO: make this configurable(?)
maxCreateAttempts = 5
)
type urls struct {
callbackURL string
metadataURL string
webhookURL string
controllerWebhookURL string
}
func NewEntityPoolManager(ctx context.Context, entity params.GithubEntity, cfgInternal params.Internal, providers map[string]common.Provider, store dbCommon.Store) (common.PoolManager, error) {
ctx = garmUtil.WithContext(ctx, slog.Any("pool_mgr", entity.String()), slog.Any("pool_type", params.GithubEntityTypeRepository))
ghc, err := garmUtil.GithubClient(ctx, entity, cfgInternal.GithubCredentialsDetails)
if err != nil {
return nil, errors.Wrap(err, "getting github client")
}
if entity.WebhookSecret == "" {
return nil, errors.New("webhook secret is empty")
}
wg := &sync.WaitGroup{}
keyMuxes := &keyMutex{}
repo := &basePoolManager{
ctx: ctx,
cfgInternal: cfgInternal,
entity: entity,
ghcli: ghc,
store: store,
providers: providers,
controllerID: cfgInternal.ControllerID,
urls: urls{
webhookURL: cfgInternal.BaseWebhookURL,
callbackURL: cfgInternal.InstanceCallbackURL,
metadataURL: cfgInternal.InstanceMetadataURL,
controllerWebhookURL: cfgInternal.ControllerWebhookURL,
},
quit: make(chan struct{}),
credsDetails: cfgInternal.GithubCredentialsDetails,
wg: wg,
keyMux: keyMuxes,
}
return repo, nil
}
type basePoolManager struct {
ctx context.Context
controllerID string
entity params.GithubEntity
ghcli common.GithubClient
cfgInternal params.Internal
store dbCommon.Store
providers map[string]common.Provider
tools []commonParams.RunnerApplicationDownload
quit chan struct{}
credsDetails params.GithubCredentials
managerIsRunning bool
managerErrorReason string
urls urls
mux sync.Mutex
wg *sync.WaitGroup
keyMux *keyMutex
}
func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error {
if err := r.ValidateOwner(job); err != nil {
return errors.Wrap(err, "validating owner")
}
var jobParams params.Job
var err error
var triggeredBy int64
defer func() {
// we're updating the job in the database, regardless of whether it was successful or not.
// or if it was meant for this pool or not. Github will send the same job data to all hierarchies
// that have been configured to work with garm. Updating the job at all levels should yield the same
// outcome in the db.
if jobParams.ID == 0 {
return
}
_, err := r.store.GetJobByID(r.ctx, jobParams.ID)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
slog.With(slog.Any("error", err)).ErrorContext(
r.ctx, "failed to get job",
"job_id", jobParams.ID)
return
}
// This job is new to us. Check if we have a pool that can handle it.
potentialPools, err := r.store.FindPoolsMatchingAllTags(r.ctx, r.entity.EntityType, r.entity.ID, jobParams.Labels)
if err != nil {
slog.With(slog.Any("error", err)).ErrorContext(
r.ctx, "failed to find pools matching tags; not recording job",
"requested_tags", strings.Join(jobParams.Labels, ", "))
return
}
if len(potentialPools) == 0 {
slog.WarnContext(
r.ctx, "no pools matching tags; not recording job",
"requested_tags", strings.Join(jobParams.Labels, ", "))
return
}
}
if _, jobErr := r.store.CreateOrUpdateJob(r.ctx, jobParams); jobErr != nil {
slog.With(slog.Any("error", jobErr)).ErrorContext(
r.ctx, "failed to update job", "job_id", jobParams.ID)
}
if triggeredBy != 0 && jobParams.ID != triggeredBy {
// The triggeredBy value is only set by the "in_progress" webhook. The runner that
// transitioned to in_progress was created as a result of a different queued job. If that job is
// still queued and we don't remove the lock, it will linger until the lock timeout is reached.
// That may take a long time, so we break the lock here and allow it to be scheduled again.
if err := r.store.BreakLockJobIsQueued(r.ctx, triggeredBy); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(
r.ctx, "failed to break lock for job",
"job_id", triggeredBy)
}
}
}()
switch job.Action {
case "queued":
// Record the job in the database. Queued jobs will be picked up by the consumeQueuedJobs() method
// when reconciling.
jobParams, err = r.paramsWorkflowJobToParamsJob(job)
if err != nil {
return errors.Wrap(err, "converting job to params")
}
case "completed":
jobParams, err = r.paramsWorkflowJobToParamsJob(job)
if err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
// Unassigned jobs will have an empty runner_name.
// We also need to ignore not found errors, as we may get a webhook regarding
// a workflow that is handled by a runner at a different hierarchy level.
return nil
}
return errors.Wrap(err, "converting job to params")
}
// update instance workload state.
if _, err := r.setInstanceRunnerStatus(jobParams.RunnerName, params.RunnerTerminated); err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
return nil
}
slog.With(slog.Any("error", err)).ErrorContext(
r.ctx, "failed to update runner status",
"runner_name", util.SanitizeLogEntry(jobParams.RunnerName))
return errors.Wrap(err, "updating runner")
}
slog.DebugContext(
r.ctx, "marking instance as pending_delete",
"runner_name", util.SanitizeLogEntry(jobParams.RunnerName))
if _, err := r.setInstanceStatus(jobParams.RunnerName, commonParams.InstancePendingDelete, nil); err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
return nil
}
slog.With(slog.Any("error", err)).ErrorContext(
r.ctx, "failed to update runner status",
"runner_name", util.SanitizeLogEntry(jobParams.RunnerName))
return errors.Wrap(err, "updating runner")
}
case "in_progress":
jobParams, err = r.paramsWorkflowJobToParamsJob(job)
if err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
// This is most likely a runner we're not managing. If we define a repo from within an org
// and also define that same org, we will get a hook from github from both the repo and the org
// regarding the same workflow. We look for the runner in the database, and make sure it exists and is
// part of a pool that this manager is responsible for. A not found error here will most likely mean
// that we are not responsible for that runner, and we should ignore it.
return nil
}
return errors.Wrap(err, "converting job to params")
}
// update instance workload state.
instance, err := r.setInstanceRunnerStatus(jobParams.RunnerName, params.RunnerActive)
if err != nil {
if errors.Is(err, runnerErrors.ErrNotFound) {
return nil
}
slog.With(slog.Any("error", err)).ErrorContext(
r.ctx, "failed to update runner status",
"runner_name", util.SanitizeLogEntry(jobParams.RunnerName))
return errors.Wrap(err, "updating runner")
}
// Set triggeredBy here so we break the lock on any potential queued job.
triggeredBy = jobIDFromLabels(instance.AditionalLabels)
// A runner has picked up the job, and is now running it. It may need to be replaced if the pool has
// a minimum number of idle runners configured.
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
if err != nil {
return errors.Wrap(err, "getting pool")
}
if err := r.ensureIdleRunnersForOnePool(pool); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(
r.ctx, "error ensuring idle runners for pool",
"pool_id", pool.ID)
}
}
return nil
}
func jobIDFromLabels(labels []string) int64 {
for _, lbl := range labels {
if strings.HasPrefix(lbl, jobLabelPrefix) {
jobID, err := strconv.ParseInt(lbl[len(jobLabelPrefix):], 10, 64)
if err != nil {
return 0
}
return jobID
}
}
return 0
}
func (r *basePoolManager) startLoopForFunction(f func() error, interval time.Duration, name string, alwaysRun bool) {
slog.InfoContext(
r.ctx, "starting loop for entity",
"loop_name", name)
ticker := time.NewTicker(interval)
r.wg.Add(1)
defer func() {
slog.InfoContext(
r.ctx, "pool loop exited",
"loop_name", name)
ticker.Stop()
r.wg.Done()
}()
for {
shouldRun := r.managerIsRunning
if alwaysRun {
shouldRun = true
}
switch shouldRun {
case true:
select {
case <-ticker.C:
if err := f(); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(
r.ctx, "error in loop",
"loop_name", name)
if errors.Is(err, runnerErrors.ErrUnauthorized) {
r.setPoolRunningState(false, err.Error())
}
}
case <-r.ctx.Done():
// daemon is shutting down.
return
case <-r.quit:
// this worker was stopped.
return
}
default:
select {
case <-r.ctx.Done():
// daemon is shutting down.
return
case <-r.quit:
// this worker was stopped.
return
default:
r.waitForTimeoutOrCancelled(common.BackoffTimer)
}
}
}
}
func (r *basePoolManager) updateTools() error {
// Update tools cache.
tools, err := r.FetchTools()
if err != nil {
slog.With(slog.Any("error", err)).ErrorContext(
r.ctx, "failed to update tools for repo")
r.setPoolRunningState(false, err.Error())
return fmt.Errorf("failed to update tools for repo %s: %w", r.entity.String(), err)
}
r.mux.Lock()
r.tools = tools
r.mux.Unlock()
slog.DebugContext(r.ctx, "successfully updated tools")
r.setPoolRunningState(true, "")
return err
}
// cleanupOrphanedProviderRunners compares runners in github with local runners and removes
// any local runners that are not present in Github. Runners that are "idle" in our
// provider, but do not exist in github, will be removed. This can happen if the
// garm was offline while a job was executed by a github action. When this
// happens, github will remove the ephemeral worker and send a webhook our way.
// If we were offline and did not process the webhook, the instance will linger.
// We need to remove it from the provider and database.
func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runner) error {
dbInstances, err := r.store.ListEntityInstances(r.ctx, r.entity)
if err != nil {
return errors.Wrap(err, "fetching instances from db")
}
runnerNames := map[string]bool{}
for _, run := range runners {
if !isManagedRunner(labelsFromRunner(run), r.controllerID) {
slog.DebugContext(
r.ctx, "runner is not managed by a pool we manage",
"runner_name", run.GetName())
continue
}
runnerNames[*run.Name] = true
}
for _, instance := range dbInstances {
lockAcquired := r.keyMux.TryLock(instance.Name)
if !lockAcquired {
slog.DebugContext(
r.ctx, "failed to acquire lock for instance",
"runner_name", instance.Name)
continue
}
defer r.keyMux.Unlock(instance.Name, false)
switch instance.Status {
case commonParams.InstancePendingCreate,
commonParams.InstancePendingDelete, commonParams.InstancePendingForceDelete:
// this instance is in the process of being created or is awaiting deletion.
// Instances in pending_create did not get a chance to register themselves in,
// github so we let them be for now.
continue
}
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching instance pool info")
}
switch instance.RunnerStatus {
case params.RunnerPending, params.RunnerInstalling:
if time.Since(instance.UpdatedAt).Minutes() < float64(pool.RunnerTimeout()) {
// runner is still installing. We give it a chance to finish.
slog.DebugContext(
r.ctx, "runner is still installing, give it a chance to finish",
"runner_name", instance.Name)
continue
}
}
if time.Since(instance.UpdatedAt).Minutes() < 5 {
// instance was updated recently. We give it a chance to register itself in github.
slog.DebugContext(
r.ctx, "instance was updated recently, skipping check",
"runner_name", instance.Name)
continue
}
if ok := runnerNames[instance.Name]; !ok {
// Set pending_delete on DB field. Allow consolidate() to remove it.
if _, err := r.setInstanceStatus(instance.Name, commonParams.InstancePendingDelete, nil); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(
r.ctx, "failed to update runner",
"runner_name", instance.Name)
return errors.Wrap(err, "updating runner")
}
}
}
return nil
}
// reapTimedOutRunners will mark as pending_delete any runner that has a status
// of "running" in the provider, but that has not registered with Github, and has
// received no new updates in the configured timeout interval.
func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
dbInstances, err := r.store.ListEntityInstances(r.ctx, r.entity)
if err != nil {
return errors.Wrap(err, "fetching instances from db")
}
runnersByName := map[string]*github.Runner{}
for _, run := range runners {
if !isManagedRunner(labelsFromRunner(run), r.controllerID) {
slog.DebugContext(
r.ctx, "runner is not managed by a pool we manage",
"runner_name", run.GetName())
continue
}
runnersByName[*run.Name] = run
}
for _, instance := range dbInstances {
slog.DebugContext(
r.ctx, "attempting to lock instance",
"runner_name", instance.Name)
lockAcquired := r.keyMux.TryLock(instance.Name)
if !lockAcquired {
slog.DebugContext(
r.ctx, "failed to acquire lock for instance",
"runner_name", instance.Name)
continue
}
defer r.keyMux.Unlock(instance.Name, false)
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching instance pool info")
}
if time.Since(instance.UpdatedAt).Minutes() < float64(pool.RunnerTimeout()) {
continue
}
// There are 3 cases (currently) where we consider a runner as timed out:
// * The runner never joined github within the pool timeout
// * The runner managed to join github, but the setup process failed later and the runner
// never started on the instance.
// * A JIT config was created, but the runner never joined github.
if runner, ok := runnersByName[instance.Name]; !ok || runner.GetStatus() == "offline" {
slog.InfoContext(
r.ctx, "reaping timed-out/failed runner",
"runner_name", instance.Name)
if err := r.DeleteRunner(instance, false, false); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(
r.ctx, "failed to update runner status",
"runner_name", instance.Name)
return errors.Wrap(err, "updating runner")
}
}
}
return nil
}
// cleanupOrphanedGithubRunners will forcefully remove any github runners that appear
// as offline and for which we no longer have a local instance.
// This may happen if someone manually deletes the instance in the provider. We need to
// first remove the instance from github, and then from our database.
func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner) error {
poolInstanceCache := map[string][]commonParams.ProviderInstance{}
g, ctx := errgroup.WithContext(r.ctx)
for _, runner := range runners {
if !isManagedRunner(labelsFromRunner(runner), r.controllerID) {
slog.DebugContext(
r.ctx, "runner is not managed by a pool we manage",
"runner_name", runner.GetName())
continue
}
status := runner.GetStatus()
if status != "offline" {
// Runner is online. Ignore it.
continue
}
dbInstance, err := r.store.GetInstanceByName(r.ctx, *runner.Name)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
return errors.Wrap(err, "fetching instance from DB")
}
// We no longer have a DB entry for this instance, and the runner appears offline in github.
// Previous forceful removal may have failed?
slog.InfoContext(
r.ctx, "Runner has no database entry in garm, removing from github",
"runner_name", runner.GetName())
resp, err := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID())
if err != nil {
// Removed in the meantime?
if resp != nil && resp.StatusCode == http.StatusNotFound {
continue
}
return errors.Wrap(err, "removing runner")
}
continue
}
switch dbInstance.Status {
case commonParams.InstancePendingDelete, commonParams.InstanceDeleting:
// already marked for deletion or is in the process of being deleted.
// Let consolidate take care of it.
continue
case commonParams.InstancePendingCreate, commonParams.InstanceCreating:
// instance is still being created. We give it a chance to finish.
slog.DebugContext(
r.ctx, "instance is still being created, give it a chance to finish",
"runner_name", dbInstance.Name)
continue
case commonParams.InstanceRunning:
// this check is not strictly needed, but can help avoid unnecessary strain on the provider.
// At worst, we will have a runner that is offline in github for 5 minutes before we reap it.
if time.Since(dbInstance.UpdatedAt).Minutes() < 5 {
// instance was updated recently. We give it a chance to register itself in github.
slog.DebugContext(
r.ctx, "instance was updated recently, skipping check",
"runner_name", dbInstance.Name)
continue
}
}
pool, err := r.store.GetEntityPool(r.ctx, r.entity, dbInstance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
}
// check if the provider still has the instance.
provider, ok := r.providers[pool.ProviderName]
if !ok {
return fmt.Errorf("unknown provider %s for pool %s", pool.ProviderName, pool.ID)
}
var poolInstances []commonParams.ProviderInstance
poolInstances, ok = poolInstanceCache[pool.ID]
if !ok {
slog.DebugContext(
r.ctx, "updating instances cache for pool",
"pool_id", pool.ID)
poolInstances, err = provider.ListInstances(r.ctx, pool.ID)
if err != nil {
return errors.Wrapf(err, "fetching instances for pool %s", pool.ID)
}
poolInstanceCache[pool.ID] = poolInstances
}
lockAcquired := r.keyMux.TryLock(dbInstance.Name)
if !lockAcquired {
slog.DebugContext(
r.ctx, "failed to acquire lock for instance",
"runner_name", dbInstance.Name)
continue
}
// See: https://golang.org/doc/faq#closures_and_goroutines
runner := runner
g.Go(func() error {
deleteMux := false
defer func() {
r.keyMux.Unlock(dbInstance.Name, deleteMux)
}()
providerInstance, ok := instanceInList(dbInstance.Name, poolInstances)
if !ok {
// The runner instance is no longer on the provider, and it appears offline in github.
// It should be safe to force remove it.
slog.InfoContext(
r.ctx, "Runner instance is no longer on the provider, removing from github",
"runner_name", dbInstance.Name)
resp, err := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID())
if err != nil {
// Removed in the meantime?
if resp != nil && resp.StatusCode == http.StatusNotFound {
slog.DebugContext(
r.ctx, "runner disappeared from github",
"runner_name", dbInstance.Name)
} else {
return errors.Wrap(err, "removing runner from github")
}
}
// Remove the database entry for the runner.
slog.InfoContext(
r.ctx, "Removing from database",
"runner_name", dbInstance.Name)
if err := r.store.DeleteInstance(ctx, dbInstance.PoolID, dbInstance.Name); err != nil {
return errors.Wrap(err, "removing runner from database")
}
deleteMux = true
return nil
}
if providerInstance.Status == commonParams.InstanceRunning {
// instance is running, but github reports runner as offline. Log the event.
// This scenario may require manual intervention.
// Perhaps it just came online and github did not yet change it's status?
slog.WarnContext(
r.ctx, "instance is online but github reports runner as offline",
"runner_name", dbInstance.Name)
return nil
}
slog.InfoContext(
r.ctx, "instance was found in stopped state; starting",
"runner_name", dbInstance.Name)
if err := provider.Start(r.ctx, dbInstance.ProviderID); err != nil {
return errors.Wrapf(err, "starting instance %s", dbInstance.ProviderID)
}
return nil
})
}
if err := r.waitForErrorGroupOrContextCancelled(g); err != nil {
return errors.Wrap(err, "removing orphaned github runners")
}
return nil
}
func (r *basePoolManager) waitForErrorGroupOrContextCancelled(g *errgroup.Group) error {
if g == nil {
return nil
}
done := make(chan error, 1)
go func() {
waitErr := g.Wait()
done <- waitErr
}()
select {
case err := <-done:
return err
case <-r.ctx.Done():
return r.ctx.Err()
}
}
func (r *basePoolManager) setInstanceRunnerStatus(runnerName string, status params.RunnerStatus) (params.Instance, error) {
updateParams := params.UpdateInstanceParams{
RunnerStatus: status,
}
instance, err := r.store.UpdateInstance(r.ctx, runnerName, updateParams)
if err != nil {
return params.Instance{}, errors.Wrap(err, "updating runner state")
}
return instance, nil
}
func (r *basePoolManager) setInstanceStatus(runnerName string, status commonParams.InstanceStatus, providerFault []byte) (params.Instance, error) {
updateParams := params.UpdateInstanceParams{
Status: status,
ProviderFault: providerFault,
}
instance, err := r.store.UpdateInstance(r.ctx, runnerName, updateParams)
if err != nil {
return params.Instance{}, errors.Wrap(err, "updating runner state")
}
return instance, nil
}
func (r *basePoolManager) AddRunner(ctx context.Context, poolID string, aditionalLabels []string) (err error) {
pool, err := r.store.GetEntityPool(r.ctx, r.entity, poolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
}
provider, ok := r.providers[pool.ProviderName]
if !ok {
return fmt.Errorf("unknown provider %s for pool %s", pool.ProviderName, pool.ID)
}
name := fmt.Sprintf("%s-%s", pool.GetRunnerPrefix(), util.NewID())
labels := r.getLabelsForInstance(pool)
jitConfig := make(map[string]string)
var runner *github.Runner
if !provider.DisableJITConfig() {
// Attempt to create JIT config
jitConfig, runner, err = r.ghcli.GetEntityJITConfig(ctx, name, pool, labels)
if err != nil {
slog.With(slog.Any("error", err)).ErrorContext(
ctx, "failed to get JIT config, falling back to registration token")
}
}
createParams := params.CreateInstanceParams{
Name: name,
Status: commonParams.InstancePendingCreate,
RunnerStatus: params.RunnerPending,
OSArch: pool.OSArch,
OSType: pool.OSType,
CallbackURL: r.urls.callbackURL,
MetadataURL: r.urls.metadataURL,
CreateAttempt: 1,
GitHubRunnerGroup: pool.GitHubRunnerGroup,
AditionalLabels: aditionalLabels,
JitConfiguration: jitConfig,
}
if runner != nil {
createParams.AgentID = runner.GetID()
}
instance, err := r.store.CreateInstance(r.ctx, poolID, createParams)
if err != nil {
return errors.Wrap(err, "creating instance")
}
defer func() {
if err != nil {
if instance.ID != "" {
if err := r.DeleteRunner(instance, false, false); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(
ctx, "failed to cleanup instance",
"runner_name", instance.Name)
}
}
if runner != nil {
_, runnerCleanupErr := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID())
if err != nil {
slog.With(slog.Any("error", runnerCleanupErr)).ErrorContext(
ctx, "failed to remove runner",
"gh_runner_id", runner.GetID())
}
}
}
}()
return nil
}
func (r *basePoolManager) Status() params.PoolManagerStatus {
r.mux.Lock()
defer r.mux.Unlock()
return params.PoolManagerStatus{
IsRunning: r.managerIsRunning,
FailureReason: r.managerErrorReason,
}
}
func (r *basePoolManager) waitForTimeoutOrCancelled(timeout time.Duration) {
slog.DebugContext(
r.ctx, fmt.Sprintf("sleeping for %.2f minutes", timeout.Minutes()))
select {
case <-time.After(timeout):
case <-r.ctx.Done():
case <-r.quit:
}
}
func (r *basePoolManager) setPoolRunningState(isRunning bool, failureReason string) {
r.mux.Lock()
r.managerErrorReason = failureReason
r.managerIsRunning = isRunning
r.mux.Unlock()
}
func (r *basePoolManager) getLabelsForInstance(pool params.Pool) []string {
labels := []string{}
for _, tag := range pool.Tags {
labels = append(labels, tag.Name)
}
labels = append(labels, r.controllerLabel())
labels = append(labels, r.poolLabel(pool.ID))
return labels
}
func (r *basePoolManager) addInstanceToProvider(instance params.Instance) error {
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
}
provider, ok := r.providers[pool.ProviderName]
if !ok {
return fmt.Errorf("unknown provider %s for pool %s", pool.ProviderName, pool.ID)
}
jwtValidity := pool.RunnerTimeout()
entity := r.entity.String()
jwtToken, err := auth.NewInstanceJWTToken(instance, r.cfgInternal.JWTSecret, entity, pool.PoolType(), jwtValidity)
if err != nil {
return errors.Wrap(err, "fetching instance jwt token")
}
hasJITConfig := len(instance.JitConfiguration) > 0
bootstrapArgs := commonParams.BootstrapInstance{
Name: instance.Name,
Tools: r.tools,
RepoURL: r.GithubURL(),
MetadataURL: instance.MetadataURL,
CallbackURL: instance.CallbackURL,
InstanceToken: jwtToken,
OSArch: pool.OSArch,
OSType: pool.OSType,
Flavor: pool.Flavor,
Image: pool.Image,
ExtraSpecs: pool.ExtraSpecs,
PoolID: instance.PoolID,
CACertBundle: r.credsDetails.CABundle,
GitHubRunnerGroup: instance.GitHubRunnerGroup,
JitConfigEnabled: hasJITConfig,
}
if !hasJITConfig {
// We still need the labels here for situations where we don't have a JIT config generated.
// This can happen if GARM is used against an instance of GHES older than version 3.10.
// The labels field should be ignored by providers if JIT config is enabled.
bootstrapArgs.Labels = r.getLabelsForInstance(pool)
}
var instanceIDToDelete string
defer func() {
if instanceIDToDelete != "" {
if err := provider.DeleteInstance(r.ctx, instanceIDToDelete); err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
slog.With(slog.Any("error", err)).ErrorContext(
r.ctx, "failed to cleanup instance",
"provider_id", instanceIDToDelete)
}
}
}
}()
providerInstance, err := provider.CreateInstance(r.ctx, bootstrapArgs)
if err != nil {
instanceIDToDelete = instance.Name
return errors.Wrap(err, "creating instance")
}
if providerInstance.Status == commonParams.InstanceError {
instanceIDToDelete = instance.ProviderID
if instanceIDToDelete == "" {
instanceIDToDelete = instance.Name
}
}
updateInstanceArgs := r.updateArgsFromProviderInstance(providerInstance)
if _, err := r.store.UpdateInstance(r.ctx, instance.Name, updateInstanceArgs); err != nil {
return errors.Wrap(err, "updating instance")
}
return nil
}
func (r *basePoolManager) getRunnerDetailsFromJob(job params.WorkflowJob) (params.RunnerInfo, error) {
runnerInfo := params.RunnerInfo{
Name: job.WorkflowJob.RunnerName,
Labels: job.WorkflowJob.Labels,
}
var err error
if job.WorkflowJob.RunnerName == "" {
if job.WorkflowJob.Conclusion == "skipped" || job.WorkflowJob.Conclusion == "cancelled" {
// job was skipped or cancelled before a runner was allocated. No point in continuing.
return params.RunnerInfo{}, fmt.Errorf("job %d was skipped or cancelled before a runner was allocated: %w", job.WorkflowJob.ID, runnerErrors.ErrNotFound)
}
// Runner name was not set in WorkflowJob by github. We can still attempt to
// fetch the info we need, using the workflow run ID, from the API.
slog.InfoContext(
r.ctx, "runner name not found in workflow job, attempting to fetch from API",
"job_id", job.WorkflowJob.ID)
runnerInfo, err = r.GetRunnerInfoFromWorkflow(job)
if err != nil {
return params.RunnerInfo{}, errors.Wrap(err, "fetching runner name from API")
}
}
_, err = r.store.GetInstanceByName(context.Background(), runnerInfo.Name)
if err != nil {
slog.With(slog.Any("error", err)).ErrorContext(
r.ctx, "could not find runner details",
"runner_name", util.SanitizeLogEntry(runnerInfo.Name))
return params.RunnerInfo{}, errors.Wrap(err, "fetching runner details")
}
return runnerInfo, nil
}
// paramsWorkflowJobToParamsJob returns a params.Job from a params.WorkflowJob, and aditionally determines
// if the runner belongs to this pool or not. It will always return a valid params.Job, even if it errs out.
// This allows us to still update the job in the database, even if we determined that it wasn't necessarily meant
// for this pool.
// If garm manages multiple hierarchies (repos, org, enterprise) which involve the same repo, we will get a hook
// whenever a job involving our repo triggers a hook. So even if the job is picked up by a runner at the enterprise
// level, the repo and org still get a hook.
// We even get a hook if a particular job is picked up by a GitHub hosted runner. We don't know who will pick up the job
// until the "in_progress" event is sent and we can see which runner picked it up.
//
// We save the details of that job at every level, because we want to at least update the status of the job. We make
// decissions based on the status of saved jobs. A "queued" job will prompt garm to search for an appropriate pool
// and spin up a runner there if no other idle runner exists to pick it up.
func (r *basePoolManager) paramsWorkflowJobToParamsJob(job params.WorkflowJob) (params.Job, error) {
asUUID, err := uuid.Parse(r.ID())
if err != nil {
return params.Job{}, errors.Wrap(err, "parsing pool ID as UUID")
}
jobParams := params.Job{
ID: job.WorkflowJob.ID,
Action: job.Action,
RunID: job.WorkflowJob.RunID,
Status: job.WorkflowJob.Status,
Conclusion: job.WorkflowJob.Conclusion,
StartedAt: job.WorkflowJob.StartedAt,
CompletedAt: job.WorkflowJob.CompletedAt,
Name: job.WorkflowJob.Name,
GithubRunnerID: job.WorkflowJob.RunnerID,
RunnerGroupID: job.WorkflowJob.RunnerGroupID,
RunnerGroupName: job.WorkflowJob.RunnerGroupName,
RepositoryName: job.Repository.Name,
RepositoryOwner: job.Repository.Owner.Login,
Labels: job.WorkflowJob.Labels,
}
runnerName := job.WorkflowJob.RunnerName
if job.Action != "queued" && runnerName == "" {
if job.WorkflowJob.Conclusion != "skipped" && job.WorkflowJob.Conclusion != "cancelled" {
// Runner name was not set in WorkflowJob by github. We can still attempt to fetch the info we need,
// using the workflow run ID, from the API.
// We may still get no runner name. In situations such as jobs being cancelled before a runner had the chance
// to pick up the job, the runner name is not available from the API.
runnerInfo, err := r.getRunnerDetailsFromJob(job)
if err != nil && !errors.Is(err, runnerErrors.ErrNotFound) {
return jobParams, errors.Wrap(err, "fetching runner details")
}
runnerName = runnerInfo.Name
}
}
jobParams.RunnerName = runnerName
switch r.entity.EntityType {
case params.GithubEntityTypeEnterprise:
jobParams.EnterpriseID = &asUUID
case params.GithubEntityTypeRepository:
jobParams.RepoID = &asUUID
case params.GithubEntityTypeOrganization:
jobParams.OrgID = &asUUID
default:
return jobParams, errors.Errorf("unknown pool type: %s", r.entity.EntityType)
}
return jobParams, nil
}
func (r *basePoolManager) poolLabel(poolID string) string {
return fmt.Sprintf("%s%s", poolIDLabelprefix, poolID)
}
func (r *basePoolManager) controllerLabel() string {