Skip to content

Commit

Permalink
Merge pull request #121 from gabriel-samfira/use-errgroup
Browse files Browse the repository at this point in the history
Replace wait implementation with errgroup
  • Loading branch information
gabriel-samfira authored Jul 3, 2023
2 parents 32464c7 + 5bca63e commit 8abf94e
Showing 1 changed file with 67 additions and 75 deletions.
142 changes: 67 additions & 75 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cloudbase/garm/runner/providers"
providerCommon "github.com/cloudbase/garm/runner/providers/common"
"github.com/cloudbase/garm/util"
"golang.org/x/sync/errgroup"

"github.com/juju/clock"
"github.com/juju/retry"
Expand Down Expand Up @@ -360,44 +361,37 @@ func (r *Runner) loadReposOrgsAndEnterprises() error {
return errors.Wrap(err, "fetching enterprises")
}

expectedReplies := len(repos) + len(orgs) + len(enterprises)
errChan := make(chan error, expectedReplies)

g, _ := errgroup.WithContext(r.ctx)
for _, repo := range repos {
go func(repo params.Repository) {
repo := repo
g.Go(func() error {
log.Printf("creating pool manager for repo %s/%s", repo.Owner, repo.Name)
_, err := r.poolManagerCtrl.CreateRepoPoolManager(r.ctx, repo, r.providers, r.store)
errChan <- err
}(repo)
return err
})
}

for _, org := range orgs {
go func(org params.Organization) {
org := org
g.Go(func() error {
log.Printf("creating pool manager for organization %s", org.Name)
_, err := r.poolManagerCtrl.CreateOrgPoolManager(r.ctx, org, r.providers, r.store)
errChan <- err
}(org)
return err
})
}

for _, enterprise := range enterprises {
go func(enterprise params.Enterprise) {
enterprise := enterprise
g.Go(func() error {
log.Printf("creating pool manager for enterprise %s", enterprise.Name)
_, err := r.poolManagerCtrl.CreateEnterprisePoolManager(r.ctx, enterprise, r.providers, r.store)
errChan <- err
}(enterprise)
return err
})
}

for i := 0; i < expectedReplies; i++ {
select {
case err := <-errChan:
if err != nil {
return errors.Wrap(err, "failed to load pool managers for repos and orgs")
}
case <-time.After(60 * time.Second):
return fmt.Errorf("timed out waiting for pool manager load")
}
if err := r.waitForErrorGroupOrTimeout(g); err != nil {
return fmt.Errorf("failed to create pool managers: %w", err)
}

return nil
}

Expand All @@ -420,43 +414,52 @@ func (r *Runner) Start() error {
return errors.Wrap(err, "fetch enterprise pool managers")
}

expectedReplies := len(repositories) + len(organizations) + len(enterprises)
errChan := make(chan error, expectedReplies)

g, _ := errgroup.WithContext(r.ctx)
for _, repo := range repositories {
go func(repo common.PoolManager) {
err := repo.Start()
errChan <- err
}(repo)
repo := repo
g.Go(func() error {
return repo.Start()
})
}

for _, org := range organizations {
go func(org common.PoolManager) {
err := org.Start()
errChan <- err
}(org)
org := org
g.Go(func() error {
return org.Start()
})
}

for _, enterprise := range enterprises {
go func(org common.PoolManager) {
err := org.Start()
errChan <- err
}(enterprise)
enterprise := enterprise
g.Go(func() error {
return enterprise.Start()
})
}

for i := 0; i < expectedReplies; i++ {
select {
case err := <-errChan:
if err != nil {
return errors.Wrap(err, "starting pool manager")
}
case <-time.After(60 * time.Second):
return fmt.Errorf("timed out waiting for pool mamager start")
}
if err := r.waitForErrorGroupOrTimeout(g); err != nil {
return fmt.Errorf("failed to start pool managers: %w", err)
}
return nil
}

func (r *Runner) waitForErrorGroupOrTimeout(g *errgroup.Group) error {
if g == nil {
return nil
}

done := make(chan error, 1)
go func() {
done <- g.Wait()
}()

select {
case err := <-done:
return err
case <-time.After(60 * time.Second):
return fmt.Errorf("timed out waiting for pool manager start")
}
}

func (r *Runner) Stop() error {
r.mux.Lock()
defer r.mux.Unlock()
Expand All @@ -476,54 +479,43 @@ func (r *Runner) Stop() error {
return errors.Wrap(err, "fetch enterprise pool managers")
}

expectedReplies := len(repos) + len(orgs) + len(enterprises)
errChan := make(chan error, expectedReplies)
g, _ := errgroup.WithContext(r.ctx)

for _, repo := range repos {
go func(poolMgr common.PoolManager) {
poolMgr := repo
g.Go(func() error {
err := poolMgr.Stop()
if err != nil {
errChan <- err
return
return fmt.Errorf("failed to stop repo pool manager: %w", err)
}
err = poolMgr.Wait()
errChan <- err
}(repo)
return poolMgr.Wait()
})
}

for _, org := range orgs {
go func(poolMgr common.PoolManager) {
poolMgr := org
g.Go(func() error {
err := poolMgr.Stop()
if err != nil {
errChan <- err
return
return fmt.Errorf("failed to stop org pool manager: %w", err)
}
err = poolMgr.Wait()
errChan <- err
}(org)
return poolMgr.Wait()
})
}

for _, enterprise := range enterprises {
go func(poolMgr common.PoolManager) {
poolMgr := enterprise
g.Go(func() error {
err := poolMgr.Stop()
if err != nil {
errChan <- err
return
return fmt.Errorf("failed to stop enterprise pool manager: %w", err)
}
err = poolMgr.Wait()
errChan <- err
}(enterprise)
return poolMgr.Wait()
})
}

for i := 0; i < expectedReplies; i++ {
select {
case err := <-errChan:
if err != nil {
return errors.Wrap(err, "stopping pool manager")
}
case <-time.After(60 * time.Second):
return fmt.Errorf("timed out waiting for pool mamager stop")
}
if err := r.waitForErrorGroupOrTimeout(g); err != nil {
return fmt.Errorf("failed to stop pool managers: %w", err)
}
return nil
}
Expand Down

0 comments on commit 8abf94e

Please sign in to comment.