Skip to content

Commit

Permalink
Provide a synchronous way to ready a git mirror
Browse files Browse the repository at this point in the history
For fluxd, we have a git mirroring implementation that keeps track of
its ready state. This is because when fluxd starts, it may not have a
viable SSH key, so it needs to be able to retry in the background,
while it does everything else that it can.

In the helm operator, and in tests, it's more useful to be able to
clone the repo and succeed or fail synchronously. This should remove
some flakes from the tests, which previously had to start the repo's
loop, and check back with it to see if it was ready -- which would
occasionally fail to be the case within the timeout.
  • Loading branch information
squaremo committed Jul 19, 2018
1 parent 44a3a27 commit 3f74d3c
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 89 deletions.
6 changes: 3 additions & 3 deletions daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,9 +597,9 @@ func mockDaemon(t *testing.T) (*Daemon, func(), func(), *cluster.Mock, *mockEven
}

start := func() {
wg.Add(1)
go repo.Start(shutdown, wg)
gittest.WaitForRepoReady(repo, t)
if err := repo.Ready(context.Background()); err != nil {
t.Fatal(err)
}

wg.Add(1)
go d.Loop(shutdown, wg, logger)
Expand Down
6 changes: 3 additions & 3 deletions daemon/loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ func daemon(t *testing.T) (*Daemon, func()) {
wg := &sync.WaitGroup{}
shutdown := make(chan struct{})

wg.Add(1)
go repo.Start(shutdown, wg)
gittest.WaitForRepoReady(repo, t)
if err := repo.Ready(context.Background()); err != nil {
t.Fatal(err)
}

gitConfig := git.Config{
Branch: "master",
Expand Down
28 changes: 4 additions & 24 deletions git/gittest/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import (
"io/ioutil"
"os/exec"
"path/filepath"
"sync"
"testing"
"time"

"context"
"github.com/weaveworks/flux/cluster/kubernetes/testfiles"
Expand Down Expand Up @@ -67,19 +65,17 @@ func Repo(t *testing.T) (*git.Repo, func()) {
// the clone, the original repo, and a cleanup function.
func CheckoutWithConfig(t *testing.T, config git.Config) (*git.Checkout, *git.Repo, func()) {
repo, cleanup := Repo(t)
shutdown, wg := make(chan struct{}), &sync.WaitGroup{}
wg.Add(1)
go repo.Start(shutdown, wg)
WaitForRepoReady(repo, t)
if err := repo.Ready(context.Background()); err != nil {
cleanup()
t.Fatal(err)
}

co, err := repo.Clone(context.Background(), config)
if err != nil {
close(shutdown)
cleanup()
t.Fatal(err)
}
return co, repo, func() {
close(shutdown)
co.Clean()
cleanup()
}
Expand All @@ -106,19 +102,3 @@ func execCommand(cmd string, args ...string) error {
c.Stdout = ioutil.Discard
return c.Run()
}

func WaitForRepoReady(r *git.Repo, t *testing.T) {
retries := 30
for {
s, _ := r.Status()
if s == git.RepoReady {
return
}
if retries == 0 {
t.Fatalf("repo was not ready after 3 seconds")
return
}
retries--
time.Sleep(100 * time.Millisecond)
}
}
6 changes: 3 additions & 3 deletions git/gittest/repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ func TestCheckout(t *testing.T) {

sd, sg := make(chan struct{}), &sync.WaitGroup{}

sg.Add(1)
go repo.Start(sd, sg)
WaitForRepoReady(repo, t)
if err := repo.Ready(context.Background()); err != nil {
t.Fatal(err)
}

ctx := context.Background()

Expand Down
143 changes: 87 additions & 56 deletions git/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,75 +216,105 @@ func (r *Repo) CommitsBetween(ctx context.Context, ref1, ref2, path string) ([]C
return onelinelog(ctx, r.dir, ref1+".."+ref2, path)
}

// Start begins synchronising the repo by cloning it, then fetching
// the required tags and so on.
func (r *Repo) Start(shutdown <-chan struct{}, done *sync.WaitGroup) error {
defer done.Done()

for {

r.mu.RLock()
url := r.origin.URL
dir := r.dir
status := r.status
r.mu.RUnlock()

bg := context.Background()
// step attempts to advance the repo state machine, and returns `true`
// if it has made progress, `false` otherwise.
func (r *Repo) step(bg context.Context) bool {
r.mu.RLock()
url := r.origin.URL
dir := r.dir
status := r.status
r.mu.RUnlock()

switch status {
switch status {

case RepoNoConfig:
// this is not going to change in the lifetime of this
// process, so just exit.
return nil
case RepoNew:
case RepoNoConfig:
// this is not going to change in the lifetime of this
// process, so just exit.
return false

case RepoNew:
rootdir, err := ioutil.TempDir(os.TempDir(), "flux-gitclone")
if err != nil {
panic(err)
}

rootdir, err := ioutil.TempDir(os.TempDir(), "flux-gitclone")
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(bg, opTimeout)
dir, err = mirror(ctx, rootdir, url)
cancel()
if err == nil {
r.mu.Lock()
r.dir = dir
ctx, cancel := context.WithTimeout(bg, opTimeout)
err = r.fetch(ctx)
cancel()
r.mu.Unlock()
}
if err == nil {
r.setUnready(RepoCloned, ErrClonedOnly)
return true
}
dir = ""
os.RemoveAll(rootdir)
r.setUnready(RepoNew, err)
return false

case RepoCloned:
if !r.readonly {
ctx, cancel := context.WithTimeout(bg, opTimeout)
dir, err = mirror(ctx, rootdir, url)
err := checkPush(ctx, dir, url)
cancel()
if err == nil {
r.mu.Lock()
r.dir = dir
ctx, cancel := context.WithTimeout(bg, opTimeout)
err = r.fetch(ctx)
cancel()
r.mu.Unlock()
}
if err == nil {
r.setUnready(RepoCloned, ErrClonedOnly)
continue // with new status, skipping timer
}
dir = ""
os.RemoveAll(rootdir)
r.setUnready(RepoNew, err)

case RepoCloned:
if !r.readonly {
ctx, cancel := context.WithTimeout(bg, opTimeout)
err := checkPush(ctx, dir, url)
cancel()
if err != nil {
r.setUnready(RepoCloned, err)
break
}
if err != nil {
r.setUnready(RepoCloned, err)
return false
}
}

r.setReady()
// Treat every transition to ready as a refresh, so
// that any listeners can respond in the same way.
r.refreshed()
return true

case RepoReady:
return false
}

return false
}

// Ready tries to advance the cloning process along as far as
// possible, and returns an error if it is not able to get to a ready
// state.
func (r *Repo) Ready(ctx context.Context) error {
for r.step(ctx) {
// keep going!
}
_, err := r.Status()
return err
}

r.setReady()
// Treat every transition to ready as a refresh, so
// that any listeners can respond in the same way.
r.refreshed()
continue // with new status, skipping timer
// Start begins synchronising the repo by cloning it, then fetching
// the required tags and so on.
func (r *Repo) Start(shutdown <-chan struct{}, done *sync.WaitGroup) error {
defer done.Done()

for {
ctx, cancel := context.WithTimeout(context.Background(), opTimeout)
advanced := r.step(ctx)
cancel()

case RepoReady:
if advanced {
continue
}

status, _ := r.Status()
if status == RepoReady {
if err := r.refreshLoop(shutdown); err != nil {
r.setUnready(RepoNew, err)
continue // with new status, skipping timer
}
} else if status == RepoNoConfig {
return nil
}

tryAgain := time.NewTimer(10 * time.Second)
Expand All @@ -298,6 +328,7 @@ func (r *Repo) Start(shutdown <-chan struct{}, done *sync.WaitGroup) error {
continue
}
}
return nil
}

func (r *Repo) Refresh(ctx context.Context) error {
Expand Down

0 comments on commit 3f74d3c

Please sign in to comment.