diff --git a/api/v6/api.go b/api/v6/api.go index 4743fa471..9befabd71 100644 --- a/api/v6/api.go +++ b/api/v6/api.go @@ -16,9 +16,24 @@ type ImageStatus struct { Containers []Container } +// ReadOnlyReason enumerates the reasons that a controller is +// considered read-only. The zero value is considered "OK", since the +// zero value is what prior versions of the daemon will effectively +// send. +type ReadOnlyReason string + +const ( + ReadOnlyOK ReadOnlyReason = "" + ReadOnlyMissing ReadOnlyReason = "NotInRepo" + ReadOnlySystem ReadOnlyReason = "System" + ReadOnlyNoRepo ReadOnlyReason = "NoRepo" + ReadOnlyNotReady ReadOnlyReason = "NotReady" +) + type ControllerStatus struct { ID flux.ResourceID Containers []Container + ReadOnly ReadOnlyReason Status string Automated bool Locked bool diff --git a/cluster/cluster.go b/cluster/cluster.go index 1e103ac3e..3cca63a5e 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -29,6 +29,10 @@ type Cluster interface { type Controller struct { ID flux.ResourceID Status string // A status summary for display + // Is the controller considered read-only because it's under the + // control of the platform. In the case of Kubernetes, we simply + // omit these controllers; but this may not always be the case. + IsSystem bool Containers ContainersOrExcuse } diff --git a/cmd/fluxd/main.go b/cmd/fluxd/main.go index 2f95d350a..6ced19b89 100644 --- a/cmd/fluxd/main.go +++ b/cmd/fluxd/main.go @@ -8,6 +8,7 @@ import ( "os" "os/exec" "os/signal" + "strconv" "sync" "syscall" "time" @@ -15,15 +16,12 @@ import ( "github.com/go-kit/kit/log" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/pflag" - "github.com/weaveworks/go-checkpoint" k8sclient "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "github.com/weaveworks/flux/api/v6" "github.com/weaveworks/flux/cluster" "github.com/weaveworks/flux/cluster/kubernetes" "github.com/weaveworks/flux/daemon" - "github.com/weaveworks/flux/event" "github.com/weaveworks/flux/git" transport "github.com/weaveworks/flux/http" "github.com/weaveworks/flux/http/client" @@ -278,40 +276,6 @@ func main() { } } - // Indirect reference to a daemon, initially of the NotReady variety - notReadyDaemon := daemon.NewNotReadyDaemon(version, k8s, v6.GitRemoteConfig{ - URL: *gitURL, - Branch: *gitBranch, - Path: *gitPath, - }) - daemonRef := daemon.NewRef(notReadyDaemon) - - var eventWriter event.EventWriter - { - // Connect to fluxsvc if given an upstream address - if *upstreamURL != "" { - upstreamLogger := log.With(logger, "component", "upstream") - upstreamLogger.Log("URL", *upstreamURL) - upstream, err := daemonhttp.NewUpstream( - &http.Client{Timeout: 10 * time.Second}, - fmt.Sprintf("fluxd/%v", version), - client.Token(*token), - transport.NewUpstreamRouter(), - *upstreamURL, - remote.NewErrorLoggingUpstreamServer(daemonRef, upstreamLogger), - upstreamLogger, - ) - if err != nil { - logger.Log("err", err) - os.Exit(1) - } - eventWriter = upstream - defer upstream.Close() - } else { - logger.Log("upstream", "no upstream URL given") - } - } - // Mechanical components. // When we can receive from this channel, it indicates that we @@ -337,23 +301,13 @@ func main() { shutdownWg.Wait() }() - // HTTP transport component, for metrics - go func() { - mux := http.DefaultServeMux - mux.Handle("/metrics", promhttp.Handler()) - handler := daemonhttp.NewHandler(daemonRef, daemonhttp.NewRouter()) - mux.Handle("/api/flux/", http.StripPrefix("/api/flux", handler)) - logger.Log("addr", *listenAddr) - errc <- http.ListenAndServe(*listenAddr, mux) - }() - // Checkpoint: we want to include the fact of whether the daemon // was given a Git repo it could clone; but the expected scenario // is that it will have been set up already, and we don't want to // report anything before seeing if it works. So, don't start // until we have failed or succeeded. - var checker *checkpoint.Checker updateCheckLogger := log.With(logger, "component", "checkpoint") + checkForUpdates(clusterVersion, strconv.FormatBool(*gitURL != ""), updateCheckLogger) gitRemote := git.Remote{URL: *gitURL} gitConfig := git.Config{ @@ -368,48 +322,24 @@ func main() { repo := git.NewRepo(gitRemote) { - - // If there's no URL here, we will not be able to do anything else. - if gitRemote.URL == "" { - checker = checkForUpdates(clusterVersion, "false", updateCheckLogger) - return - } - shutdownWg.Add(1) go func() { - errc <- repo.Start(shutdown, shutdownWg) - }() - for { - status, err := repo.Status() - logger.Log("repo", repo.Origin().URL, "status", status, "err", err) - notReadyDaemon.UpdateStatus(status, err) - - if status == git.RepoReady { - checker = checkForUpdates(clusterVersion, "true", updateCheckLogger) - logger.Log("working-dir", repo.Dir(), - "user", *gitUser, - "email", *gitEmail, - "sync-tag", *gitSyncTag, - "notes-ref", *gitNotesRef, - "set-author", *gitSetAuthor) - break - } - - if checker == nil { - checker = checkForUpdates(clusterVersion, "false", updateCheckLogger) - } - - tryAgain := time.NewTimer(10 * time.Second) - select { - case err := <-errc: - go func() { errc <- err }() - return - case <-tryAgain.C: - continue + err := repo.Start(shutdown, shutdownWg) + if err != nil { + errc <- err } - } + }() } + logger.Log( + "url", *gitURL, + "user", *gitUser, + "email", *gitEmail, + "sync-tag", *gitSyncTag, + "notes-ref", *gitNotesRef, + "set-author", *gitSetAuthor, + ) + var jobs *job.Queue { jobs = job.NewQueue(shutdown, shutdownWg) @@ -425,14 +355,38 @@ func main() { GitConfig: gitConfig, Jobs: jobs, JobStatusCache: &job.StatusCache{Size: 100}, - - EventWriter: eventWriter, - Logger: log.With(logger, "component", "daemon"), LoopVars: &daemon.LoopVars{ + Logger: log.With(logger, "component", "daemon"), + LoopVars: &daemon.LoopVars{ SyncInterval: *gitPollInterval, RegistryPollInterval: *registryPollInterval, }, } + { + // Connect to fluxsvc if given an upstream address + if *upstreamURL != "" { + upstreamLogger := log.With(logger, "component", "upstream") + upstreamLogger.Log("URL", *upstreamURL) + upstream, err := daemonhttp.NewUpstream( + &http.Client{Timeout: 10 * time.Second}, + fmt.Sprintf("fluxd/%v", version), + client.Token(*token), + transport.NewUpstreamRouter(), + *upstreamURL, + remote.NewErrorLoggingUpstreamServer(daemon, upstreamLogger), + upstreamLogger, + ) + if err != nil { + logger.Log("err", err) + os.Exit(1) + } + daemon.EventWriter = upstream + defer upstream.Close() + } else { + logger.Log("upstream", "no upstream URL given") + } + } + shutdownWg.Add(1) go daemon.Loop(shutdown, shutdownWg, log.With(logger, "component", "sync-loop")) @@ -441,8 +395,14 @@ func main() { shutdownWg.Add(1) go cacheWarmer.Loop(log.With(logger, "component", "warmer"), shutdown, shutdownWg, imageCreds) - // Update daemonRef so that upstream and handlers point to fully working daemon - daemonRef.UpdateServer(daemon) + go func() { + mux := http.DefaultServeMux + mux.Handle("/metrics", promhttp.Handler()) + handler := daemonhttp.NewHandler(daemon, daemonhttp.NewRouter()) + mux.Handle("/api/flux/", http.StripPrefix("/api/flux", handler)) + logger.Log("addr", *listenAddr) + errc <- http.ListenAndServe(*listenAddr, mux) + }() // Fall off the end, into the waiting procedure. } diff --git a/daemon/daemon.go b/daemon/daemon.go index cbafd4e8f..e42b88bf6 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -76,21 +76,37 @@ func (d *Daemon) ListServices(ctx context.Context, namespace string) ([]v6.Contr } var services policy.ResourceMap + var globalReadOnly v6.ReadOnlyReason err = d.WithClone(ctx, func(checkout *git.Checkout) error { var err error services, err = d.Manifests.ServicesWithPolicies(checkout.ManifestDir()) return err }) - if err != nil { + switch { + case err == git.ErrNotReady: + globalReadOnly = v6.ReadOnlyNotReady + case err == git.ErrNoConfig: + globalReadOnly = v6.ReadOnlyNoRepo + case err != nil: return nil, errors.Wrap(err, "getting service policies") } var res []v6.ControllerStatus for _, service := range clusterServices { - policies := services[service.ID] + var readOnly v6.ReadOnlyReason + policies, ok := services[service.ID] + switch { + case globalReadOnly != "": + readOnly = globalReadOnly + case !ok: + readOnly = v6.ReadOnlyMissing + case service.IsSystem: + readOnly = v6.ReadOnlySystem + } res = append(res, v6.ControllerStatus{ ID: service.ID, Containers: containers2containers(service.ContainersOrNil()), + ReadOnly: readOnly, Status: service.Status, Automated: policies.Contains(policy.Automated), Locked: policies.Contains(policy.Locked), diff --git a/daemon/notready.go b/daemon/notready.go deleted file mode 100644 index d7f95c048..000000000 --- a/daemon/notready.go +++ /dev/null @@ -1,103 +0,0 @@ -package daemon - -import ( - "context" - "errors" - "sync" - - "github.com/weaveworks/flux/api/v6" - "github.com/weaveworks/flux/api/v9" - "github.com/weaveworks/flux/cluster" - "github.com/weaveworks/flux/git" - "github.com/weaveworks/flux/job" - "github.com/weaveworks/flux/update" -) - -// NotReadyDaemon is a stub implementation used to serve a subset of the -// API when we have yet to successfully clone the config repo. -type NotReadyDaemon struct { - sync.RWMutex - version string - cluster cluster.Cluster - gitRemote v6.GitRemoteConfig - gitStatus git.GitRepoStatus - reason error -} - -// NotReadyDaemon is a state of the daemon that has not proceeded past -// getting the git repo set up. Since this typically needs some -// actions on the part of the user, this state can last indefinitely; -// so, it has its own code. -func NewNotReadyDaemon(version string, cluster cluster.Cluster, gitRemote v6.GitRemoteConfig) (nrd *NotReadyDaemon) { - return &NotReadyDaemon{ - version: version, - cluster: cluster, - gitRemote: gitRemote, - gitStatus: git.RepoNoConfig, - reason: errors.New("git repo is not configured"), - } -} - -func (nrd *NotReadyDaemon) Reason() error { - nrd.RLock() - defer nrd.RUnlock() - return nrd.reason -} - -func (nrd *NotReadyDaemon) UpdateStatus(status git.GitRepoStatus, reason error) { - nrd.Lock() - nrd.gitStatus = status - nrd.reason = reason - nrd.Unlock() -} - -func (nrd *NotReadyDaemon) Ping(ctx context.Context) error { - return nrd.cluster.Ping() -} - -func (nrd *NotReadyDaemon) Version(ctx context.Context) (string, error) { - return nrd.version, nil -} - -func (nrd *NotReadyDaemon) Export(ctx context.Context) ([]byte, error) { - return nrd.cluster.Export() -} - -func (nrd *NotReadyDaemon) ListServices(ctx context.Context, namespace string) ([]v6.ControllerStatus, error) { - return nil, nrd.Reason() -} - -func (nrd *NotReadyDaemon) ListImages(context.Context, update.ResourceSpec) ([]v6.ImageStatus, error) { - return nil, nrd.Reason() -} - -func (nrd *NotReadyDaemon) UpdateManifests(context.Context, update.Spec) (job.ID, error) { - var id job.ID - return id, nrd.Reason() -} - -func (nrd *NotReadyDaemon) NotifyChange(context.Context, v9.Change) error { - return nrd.Reason() -} - -func (nrd *NotReadyDaemon) JobStatus(context.Context, job.ID) (job.Status, error) { - return job.Status{}, nrd.Reason() -} - -func (nrd *NotReadyDaemon) SyncStatus(context.Context, string) ([]string, error) { - return nil, nrd.Reason() -} - -func (nrd *NotReadyDaemon) GitRepoConfig(ctx context.Context, regenerate bool) (v6.GitConfig, error) { - publicSSHKey, err := nrd.cluster.PublicSSHKey(regenerate) - if err != nil { - return v6.GitConfig{}, err - } - nrd.RLock() - defer nrd.RUnlock() - return v6.GitConfig{ - Remote: nrd.gitRemote, - PublicSSHKey: publicSSHKey, - Status: nrd.gitStatus, - }, nil -} diff --git a/daemon/ref.go b/daemon/ref.go deleted file mode 100644 index 52894352e..000000000 --- a/daemon/ref.go +++ /dev/null @@ -1,80 +0,0 @@ -package daemon - -import ( - "context" - "sync" - - "github.com/weaveworks/flux/api" - "github.com/weaveworks/flux/api/v6" - "github.com/weaveworks/flux/api/v9" - "github.com/weaveworks/flux/job" - "github.com/weaveworks/flux/update" -) - -// Ref is a cell containing a server implementation, that we can -// update atomically. The point of this is to be able to have a -// server in use (e.g., answering RPCs), and swap it later when the -// state changes. -type Ref struct { - sync.RWMutex - server api.UpstreamServer -} - -func NewRef(server api.UpstreamServer) *Ref { - return &Ref{server: server} -} - -func (r *Ref) Server() api.UpstreamServer { - r.RLock() - defer r.RUnlock() - return r.server -} - -func (r *Ref) UpdateServer(server api.UpstreamServer) { - r.Lock() - r.server = server - r.Unlock() -} - -// api.Server implementation so clients don't need to be refactored around -// Server() API - -func (r *Ref) Ping(ctx context.Context) error { - return r.Server().Ping(ctx) -} - -func (r *Ref) Version(ctx context.Context) (string, error) { - return r.Server().Version(ctx) -} - -func (r *Ref) Export(ctx context.Context) ([]byte, error) { - return r.Server().Export(ctx) -} - -func (r *Ref) ListServices(ctx context.Context, namespace string) ([]v6.ControllerStatus, error) { - return r.Server().ListServices(ctx, namespace) -} - -func (r *Ref) ListImages(ctx context.Context, spec update.ResourceSpec) ([]v6.ImageStatus, error) { - return r.Server().ListImages(ctx, spec) -} - -func (r *Ref) UpdateManifests(ctx context.Context, spec update.Spec) (job.ID, error) { - return r.Server().UpdateManifests(ctx, spec) -} - -func (r *Ref) NotifyChange(ctx context.Context, change v9.Change) error { - return r.Server().NotifyChange(ctx, change) -} - -func (r *Ref) JobStatus(ctx context.Context, id job.ID) (job.Status, error) { - return r.Server().JobStatus(ctx, id) -} - -func (r *Ref) SyncStatus(ctx context.Context, ref string) ([]string, error) { - return r.Server().SyncStatus(ctx, ref) -} - -func (r *Ref) GitRepoConfig(ctx context.Context, regenerate bool) (v6.GitConfig, error) { - return r.Server().GitRepoConfig(ctx, regenerate) -} diff --git a/git/repo.go b/git/repo.go index 597966ad6..1f34727ec 100644 --- a/git/repo.go +++ b/git/repo.go @@ -21,7 +21,7 @@ const ( var ( ErrNoChanges = errors.New("no changes made in repo") ErrNotReady = errors.New("git repo not ready") - ErrNoConfig = errors.New("git repo has not valid config") + ErrNoConfig = errors.New("git repo does not have valid config") ) // GitRepoStatus represents the progress made synchronising with a git @@ -31,9 +31,9 @@ type GitRepoStatus string const ( RepoNoConfig GitRepoStatus = "unconfigured" // configuration is empty - RepoNew = "new" // no attempt made to clone it yet - RepoCloned = "cloned" // has been read (cloned); no attempt made to write - RepoReady = "ready" // has been written to, so ready to sync + RepoNew GitRepoStatus = "new" // no attempt made to clone it yet + RepoCloned GitRepoStatus = "cloned" // has been read (cloned); no attempt made to write + RepoReady GitRepoStatus = "ready" // has been written to, so ready to sync ) // Remote points at a git repo somewhere. @@ -57,9 +57,13 @@ type Repo struct { // NewRepo constructs a repo mirror which will sync itself. func NewRepo(origin Remote) *Repo { + status := RepoNew + if origin.URL == "" { + status = RepoNoConfig + } r := &Repo{ origin: origin, - status: RepoNew, + status: status, err: nil, notify: make(chan struct{}, 1), // `1` so that Notify doesn't block C: make(chan struct{}, 1), // `1` so we don't block on completing a refresh @@ -109,12 +113,25 @@ func (r *Repo) Notify() { } } +// errorIfNotReady returns the appropriate error if the repo is not +// ready, and `nil` otherwise. +func (r *Repo) errorIfNotReady() error { + switch r.status { + case RepoReady: + return nil + case RepoNoConfig: + return ErrNoConfig + default: + return ErrNotReady + } +} + // Revision returns the revision (SHA1) of the ref passed in func (r *Repo) Revision(ctx context.Context, ref string) (string, error) { r.mu.RLock() defer r.mu.RUnlock() - if r.dir == "" { - return "", errors.New("git repo not initialised") + if err := r.errorIfNotReady(); err != nil { + return "", err } return refRevision(ctx, r.dir, ref) } @@ -122,12 +139,18 @@ func (r *Repo) Revision(ctx context.Context, ref string) (string, error) { func (r *Repo) CommitsBefore(ctx context.Context, ref, path string) ([]Commit, error) { r.mu.RLock() defer r.mu.RUnlock() + if err := r.errorIfNotReady(); err != nil { + return nil, err + } return onelinelog(ctx, r.dir, ref, path) } func (r *Repo) CommitsBetween(ctx context.Context, ref1, ref2, path string) ([]Commit, error) { r.mu.RLock() defer r.mu.RUnlock() + if err := r.errorIfNotReady(); err != nil { + return nil, err + } return onelinelog(ctx, r.dir, ref1+".."+ref2, path) } @@ -148,12 +171,10 @@ func (r *Repo) Start(shutdown <-chan struct{}, done *sync.WaitGroup) error { switch status { - // TODO(michael): I don't think this is a real status; perhaps - // have a no-op repo instead. case RepoNoConfig: // this is not going to change in the lifetime of this - // process - return ErrNoConfig + // process, so just exit. + return nil case RepoNew: rootdir, err := ioutil.TempDir(os.TempDir(), "flux-gitclone") @@ -215,8 +236,8 @@ func (r *Repo) Refresh(ctx context.Context) error { // could clone to another repo and pull there, then swap when complete. r.mu.Lock() defer r.mu.Unlock() - if r.status != RepoReady { - return ErrNotReady + if err := r.errorIfNotReady(); err != nil { + return err } if err := r.fetch(ctx); err != nil { return err @@ -270,6 +291,9 @@ func (r *Repo) fetch(ctx context.Context) error { func (r *Repo) workingClone(ctx context.Context, ref string) (string, error) { r.mu.RLock() defer r.mu.RUnlock() + if err := r.errorIfNotReady(); err != nil { + return "", err + } working, err := ioutil.TempDir(os.TempDir(), "flux-working") if err != nil { return "", err diff --git a/git/working.go b/git/working.go index 94e95e37f..ffc306449 100644 --- a/git/working.go +++ b/git/working.go @@ -45,7 +45,7 @@ func (r *Repo) Clone(ctx context.Context, conf Config) (*Checkout, error) { upstream := r.Origin() repoDir, err := r.workingClone(ctx, conf.Branch) if err != nil { - return nil, CloningError(upstream.URL, err) + return nil, err } if err := config(ctx, repoDir, conf.UserName, conf.UserEmail); err != nil {