diff --git a/cluster/kubernetes/manifests.go b/cluster/kubernetes/manifests.go index 576a09bc10..e1d6d69edd 100644 --- a/cluster/kubernetes/manifests.go +++ b/cluster/kubernetes/manifests.go @@ -26,8 +26,8 @@ type Manifests struct { Namespacer namespacer } -func postProcess(manifests map[string]kresource.KubeManifest, nser namespacer) (map[string]resource.Resource, error) { - result := map[string]resource.Resource{} +func postProcess(manifests map[string]kresource.KubeManifest, nser namespacer) ([]resource.Resource, error) { + result := []resource.Resource{} for _, km := range manifests { if nser != nil { ns, err := nser.EffectiveNamespace(km) @@ -36,21 +36,21 @@ func postProcess(manifests map[string]kresource.KubeManifest, nser namespacer) ( } km.SetNamespace(ns) } - result[km.ResourceID().String()] = km + result = append(result, km) } return result, nil } -func (c *Manifests) LoadManifests(base string, paths []string) (map[string]resource.Resource, error) { - manifests, err := kresource.Load(base, paths) +func (c *Manifests) LoadManifests(baseDir string, paths []string) ([]resource.Resource, error) { + manifests, err := kresource.Load(baseDir, paths) if err != nil { return nil, err } return postProcess(manifests, c.Namespacer) } -func (c *Manifests) UpdateImage(def []byte, id flux.ResourceID, container string, image image.Ref) ([]byte, error) { +func (c *Manifests) SetWorkloadContainerImage(def []byte, id flux.ResourceID, container string, image image.Ref) ([]byte, error) { return updateWorkload(def, id, container, image) } -// UpdatePolicies and ServicesWithPolicies in policies.go +// UpdateWorkloadPolicies in policies.go diff --git a/cluster/kubernetes/policies.go b/cluster/kubernetes/policies.go index 79d51221ce..5888618dde 100644 --- a/cluster/kubernetes/policies.go +++ b/cluster/kubernetes/policies.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/pkg/errors" - "gopkg.in/yaml.v2" "github.com/weaveworks/flux" kresource "github.com/weaveworks/flux/cluster/kubernetes/resource" @@ -12,7 +11,7 @@ import ( "github.com/weaveworks/flux/resource" ) -func (m *Manifests) UpdatePolicies(def []byte, id flux.ResourceID, update policy.Update) ([]byte, error) { +func (m *Manifests) UpdateWorkloadPolicies(def []byte, id flux.ResourceID, update policy.Update) ([]byte, error) { ns, kind, name := id.Components() add, del := update.Add, update.Remove @@ -55,17 +54,6 @@ type manifest struct { } `yaml:"metadata"` } -func extractAnnotations(def []byte) (map[string]string, error) { - var m manifest - if err := yaml.Unmarshal(def, &m); err != nil { - return nil, errors.Wrap(err, "decoding manifest for annotations") - } - if m.Metadata.Annotations == nil { - return map[string]string{}, nil - } - return m.Metadata.Annotations, nil -} - func extractContainers(def []byte, id flux.ResourceID) ([]resource.Container, error) { resources, err := kresource.ParseMultidoc(def, "stdin") if err != nil { diff --git a/cluster/kubernetes/policies_test.go b/cluster/kubernetes/policies_test.go index 985c49d719..bcac61d0d1 100644 --- a/cluster/kubernetes/policies_test.go +++ b/cluster/kubernetes/policies_test.go @@ -171,7 +171,7 @@ func TestUpdatePolicies(t *testing.T) { caseIn := templToString(t, annotationsTemplate, c.in) caseOut := templToString(t, annotationsTemplate, c.out) resourceID := flux.MustParseResourceID("default:deployment/nginx") - out, err := (&Manifests{}).UpdatePolicies([]byte(caseIn), resourceID, c.update) + out, err := (&Manifests{}).UpdateWorkloadPolicies([]byte(caseIn), resourceID, c.update) assert.Equal(t, c.wantErr, err != nil) if !c.wantErr { assert.Equal(t, string(out), caseOut) @@ -185,7 +185,7 @@ func TestUpdatePolicies_invalidTagPattern(t *testing.T) { update := policy.Update{ Add: policy.Set{policy.TagPrefix("nginx"): "semver:invalid"}, } - _, err := (&Manifests{}).UpdatePolicies(nil, resourceID, update) + _, err := (&Manifests{}).UpdateWorkloadPolicies(nil, resourceID, update) assert.Error(t, err) } diff --git a/cluster/kubernetes/sync_test.go b/cluster/kubernetes/sync_test.go index ba6af35399..d55f6b8caa 100644 --- a/cluster/kubernetes/sync_test.go +++ b/cluster/kubernetes/sync_test.go @@ -28,6 +28,7 @@ import ( "github.com/weaveworks/flux/cluster" kresource "github.com/weaveworks/flux/cluster/kubernetes/resource" fluxfake "github.com/weaveworks/flux/integrations/client/clientset/versioned/fake" + "github.com/weaveworks/flux/resource" "github.com/weaveworks/flux/sync" ) @@ -280,8 +281,11 @@ metadata: if err != nil { t.Fatal(err) } - - err = sync.Sync("testset", resources, kube) + resourcesByID := map[string]resource.Resource{} + for _, r := range resources { + resourcesByID[r.ResourceID().String()] = r + } + err = sync.Sync("testset", resourcesByID, kube) if !expectErrors && err != nil { t.Error(err) } diff --git a/cluster/manifests.go b/cluster/manifests.go index fe74a48630..39d6e4c645 100644 --- a/cluster/manifests.go +++ b/cluster/manifests.go @@ -2,9 +2,6 @@ package cluster import ( "fmt" - "io/ioutil" - "os" - "path/filepath" "github.com/weaveworks/flux" "github.com/weaveworks/flux/image" @@ -17,52 +14,20 @@ type ManifestError struct { } func ErrResourceNotFound(name string) error { - return ManifestError{fmt.Errorf("manifest for resource %s not found under manifests path", name)} + return ManifestError{fmt.Errorf("manifest for resource %s not found", name)} } -// Manifests represents how a set of files are used as definitions of +// Manifests represents a set of files containing definitions of // resources, e.g., in Kubernetes, YAML files describing Kubernetes // resources. type Manifests interface { - // Update the image in a manifest's bytes to that given - UpdateImage(def []byte, resourceID flux.ResourceID, container string, newImageID image.Ref) ([]byte, error) + // Set the image of a container in a manifest's bytes to that given + SetWorkloadContainerImage(def []byte, resourceID flux.ResourceID, container string, newImageID image.Ref) ([]byte, error) // Load all the resource manifests under the paths // given. `baseDir` is used to relativise the paths, which are // supplied as absolute paths to directories or files; at least // one path should be supplied, even if it is the same as `baseDir`. - LoadManifests(baseDir string, paths []string) (map[string]resource.Resource, error) - // UpdatePolicies modifies a manifest to apply the policy update specified - UpdatePolicies([]byte, flux.ResourceID, policy.Update) ([]byte, error) -} - -// UpdateManifest looks for the manifest for the identified resource, -// reads its contents, applies f(contents), and writes the results -// back to the file. -func UpdateManifest(m Manifests, root string, paths []string, id flux.ResourceID, f func(manifest []byte) ([]byte, error)) error { - resources, err := m.LoadManifests(root, paths) - if err != nil { - return err - } - - resource, ok := resources[id.String()] - if !ok { - return ErrResourceNotFound(id.String()) - } - - path := filepath.Join(root, resource.Source()) - def, err := ioutil.ReadFile(path) - if err != nil { - return err - } - - newDef, err := f(def) - if err != nil { - return err - } - - fi, err := os.Stat(path) - if err != nil { - return err - } - return ioutil.WriteFile(path, newDef, fi.Mode()) + LoadManifests(baseDir string, paths []string) ([]resource.Resource, error) + // UpdatWorkloadPolicies modifies a manifest to apply the policy update specified + UpdateWorkloadPolicies([]byte, flux.ResourceID, policy.Update) ([]byte, error) } diff --git a/cluster/mock.go b/cluster/mock.go index cdce3db5fe..599ee16d06 100644 --- a/cluster/mock.go +++ b/cluster/mock.go @@ -10,17 +10,20 @@ import ( // Doubles as a cluster.Cluster and cluster.Manifests implementation type Mock struct { - AllWorkloadsFunc func(maybeNamespace string) ([]Workload, error) - SomeWorkloadsFunc func([]flux.ResourceID) ([]Workload, error) - PingFunc func() error - ExportFunc func() ([]byte, error) - SyncFunc func(SyncSet) error - PublicSSHKeyFunc func(regenerate bool) (ssh.PublicKey, error) - UpdateImageFunc func(def []byte, id flux.ResourceID, container string, newImageID image.Ref) ([]byte, error) - LoadManifestsFunc func(base string, paths []string) (map[string]resource.Resource, error) - UpdatePoliciesFunc func([]byte, flux.ResourceID, policy.Update) ([]byte, error) + AllWorkloadsFunc func(maybeNamespace string) ([]Workload, error) + SomeWorkloadsFunc func([]flux.ResourceID) ([]Workload, error) + PingFunc func() error + ExportFunc func() ([]byte, error) + SyncFunc func(SyncSet) error + PublicSSHKeyFunc func(regenerate bool) (ssh.PublicKey, error) + SetWorkloadContainerImageFunc func(def []byte, id flux.ResourceID, container string, newImageID image.Ref) ([]byte, error) + LoadManifestsFunc func(base string, paths []string) ([]resource.Resource, error) + UpdateWorkloadPoliciesFunc func([]byte, flux.ResourceID, policy.Update) ([]byte, error) } +var _ Cluster = &Mock{} +var _ Manifests = &Mock{} + func (m *Mock) AllWorkloads(maybeNamespace string) ([]Workload, error) { return m.AllWorkloadsFunc(maybeNamespace) } @@ -45,14 +48,14 @@ func (m *Mock) PublicSSHKey(regenerate bool) (ssh.PublicKey, error) { return m.PublicSSHKeyFunc(regenerate) } -func (m *Mock) UpdateImage(def []byte, id flux.ResourceID, container string, newImageID image.Ref) ([]byte, error) { - return m.UpdateImageFunc(def, id, container, newImageID) +func (m *Mock) SetWorkloadContainerImage(def []byte, id flux.ResourceID, container string, newImageID image.Ref) ([]byte, error) { + return m.SetWorkloadContainerImageFunc(def, id, container, newImageID) } -func (m *Mock) LoadManifests(base string, paths []string) (map[string]resource.Resource, error) { +func (m *Mock) LoadManifests(base string, paths []string) ([]resource.Resource, error) { return m.LoadManifestsFunc(base, paths) } -func (m *Mock) UpdatePolicies(def []byte, id flux.ResourceID, p policy.Update) ([]byte, error) { - return m.UpdatePoliciesFunc(def, id, p) +func (m *Mock) UpdateWorkloadPolicies(def []byte, id flux.ResourceID, p policy.Update) ([]byte, error) { + return m.UpdateWorkloadPoliciesFunc(def, id, p) } diff --git a/cluster/resourcestore.go b/cluster/resourcestore.go new file mode 100644 index 0000000000..f9d7a622bc --- /dev/null +++ b/cluster/resourcestore.go @@ -0,0 +1,160 @@ +package cluster + +import ( + "bytes" + "io/ioutil" + "os" + "path" + "sync" + + "github.com/weaveworks/flux" + "github.com/weaveworks/flux/git" + "github.com/weaveworks/flux/image" + "github.com/weaveworks/flux/policy" + "github.com/weaveworks/flux/resource" +) + +// ResourceStore manages all the cluster resources defined in a repository, explicitly declared in a file or not. +// e.g., generated and updated by a .flux.yaml file, explicit Kubernetes .yaml manifests files ... +type ResourceStore interface { + SetWorkloadContainerImage(resourceID flux.ResourceID, container string, newImageID image.Ref) error + GetAllResourcesByID() (map[string]resource.Resource, error) + GetAllResourcesBySource() (map[string]resource.Resource, error) + UpdateWorkloadPolicies(flux.ResourceID, policy.Update) (bool, error) +} + +var _ ResourceStore = &resourceStore{} + +type resourceStore struct { + manifests Manifests + checkout *git.Checkout + cachedResourcesBySource map[string]resource.Resource + cachedResourcesByID map[string]resource.Resource + // TODO(fons): do we really need locking, can the checkout shared across threads? + sync.RWMutex +} + +func NewResourceStore(manifests Manifests, checkout *git.Checkout) *resourceStore { + return &resourceStore{ + manifests: manifests, + checkout: checkout, + } +} + +// Set the container image of a resource in the store +func (s *resourceStore) SetWorkloadContainerImage(id flux.ResourceID, container string, newImageID image.Ref) error { + resourcesByID, _, err := s.getAllResources() + if err != nil { + return err + } + resource, ok := resourcesByID[id.String()] + if !ok { + return ErrResourceNotFound(id.String()) + } + absolutePath := path.Join(s.checkout.Dir(), resource.Source()) + def, err := ioutil.ReadFile(absolutePath) + if err != nil { + return err + } + newDef, err := s.manifests.SetWorkloadContainerImage(def, id, container, newImageID) + if err != nil { + return err + } + fi, err := os.Stat(absolutePath) + if err != nil { + return err + } + if err := ioutil.WriteFile(absolutePath, newDef, fi.Mode()); err != nil { + return err + } + // Reset cached resources, since we have modified one + // TODO(fons): make this more performant. + // We could, for instance, parse it after modifying it and add it to the resources. + s.resetCache() + return nil +} + +// Load all the resources in the store. The returned map is indexed by the resource IDs +func (s *resourceStore) GetAllResourcesByID() (map[string]resource.Resource, error) { + resourcesByID, _, err := s.getAllResources() + if err != nil { + return nil, err + } + return resourcesByID, nil +} + +// Load all the resources in the store. The returned map is indexed by the file which defines de resource +func (s *resourceStore) GetAllResourcesBySource() (map[string]resource.Resource, error) { + _, resourcesBySource, err := s.getAllResources() + if err != nil { + return nil, err + } + return resourcesBySource, nil +} + +// UpdateWorkloadPolicies modifies a resource in the store to apply the policy-update specified +// It returns whether a change in the resource was actually made as a result of the change +func (s *resourceStore) UpdateWorkloadPolicies(id flux.ResourceID, update policy.Update) (bool, error) { + resourcesByID, _, err := s.getAllResources() + if err != nil { + return false, err + } + resource, ok := resourcesByID[id.String()] + if !ok { + return false, ErrResourceNotFound(id.String()) + } + absolutePath := path.Join(s.checkout.Dir(), resource.Source()) + def, err := ioutil.ReadFile(absolutePath) + if err != nil { + return false, err + } + newDef, err := s.manifests.UpdateWorkloadPolicies(def, id, update) + if err != nil { + return false, err + } + fi, err := os.Stat(absolutePath) + if err != nil { + return false, err + } + if err := ioutil.WriteFile(absolutePath, newDef, fi.Mode()); err != nil { + return false, err + } + comparison := bytes.Compare(def, newDef) + // Reset cached resources, since we have modified one + // TODO(fons): make this more performant. + // We could, for instance, parse it after modifying it and add it to the resources. + s.resetCache() + return comparison != 0, nil +} + +func (s *resourceStore) getAllResources() (map[string]resource.Resource, map[string]resource.Resource, error) { + s.RLock() + if s.cachedResourcesByID != nil && s.cachedResourcesBySource != nil { + s.RUnlock() + return s.cachedResourcesByID, s.cachedResourcesBySource, nil + } + s.RUnlock() + + resources, err := s.manifests.LoadManifests(s.checkout.Dir(), s.checkout.ManifestDirs()) + if err != nil { + return nil, nil, err + } + resourcesByID := map[string]resource.Resource{} + resourcesBySource := map[string]resource.Resource{} + for _, r := range resources { + resourcesByID[r.ResourceID().String()] = r + resourcesBySource[r.Source()] = r + } + s.Lock() + s.cachedResourcesByID = resourcesByID + s.cachedResourcesBySource = resourcesBySource + s.Unlock() + return resourcesByID, resourcesBySource, nil +} + +func (s *resourceStore) resetCache() { + s.Lock() + s.cachedResourcesByID = nil + s.cachedResourcesBySource = nil + s.Unlock() +} diff --git a/daemon/daemon.go b/daemon/daemon.go index a0bdebeef8..cc117e5604 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -77,7 +77,7 @@ func (d *Daemon) getResources(ctx context.Context) (map[string]resource.Resource var globalReadOnly v6.ReadOnlyReason err := d.WithClone(ctx, func(checkout *git.Checkout) error { var err error - resources, err = d.Manifests.LoadManifests(checkout.Dir(), checkout.ManifestDirs()) + resources, err = cluster.NewResourceStore(d.Manifests, checkout).GetAllResourcesByID() return err }) @@ -336,7 +336,7 @@ func (d *Daemon) UpdateManifests(ctx context.Context, spec update.Spec) (job.ID, } return d.queueJob(d.makeLoggingJobFunc(d.makeJobFromUpdate(d.release(spec, s)))), nil case policy.Updates: - return d.queueJob(d.makeLoggingJobFunc(d.makeJobFromUpdate(d.updatePolicy(spec, s)))), nil + return d.queueJob(d.makeLoggingJobFunc(d.makeJobFromUpdate(d.updatePolicies(spec, s)))), nil case update.ManualSync: return d.queueJob(d.sync()), nil default: @@ -362,7 +362,7 @@ func (d *Daemon) sync() jobFunc { } } -func (d *Daemon) updatePolicy(spec update.Spec, updates policy.Updates) updateFunc { +func (d *Daemon) updatePolicies(spec update.Spec, updates policy.Updates) updateFunc { return func(ctx context.Context, jobID job.ID, working *git.Checkout, logger log.Logger) (job.Result, error) { // For each update var workloadIDs []flux.ResourceID @@ -380,29 +380,13 @@ func (d *Daemon) updatePolicy(spec update.Spec, updates policy.Updates) updateFu if policy.Set(u.Add).Has(policy.Automated) { anythingAutomated = true } - // find the workload manifest - err := cluster.UpdateManifest(d.Manifests, working.Dir(), working.ManifestDirs(), workloadID, func(def []byte) ([]byte, error) { - newDef, err := d.Manifests.UpdatePolicies(def, workloadID, u) - if err != nil { - result.Result[workloadID] = update.WorkloadResult{ - Status: update.ReleaseStatusFailed, - Error: err.Error(), - } - return nil, err - } - if string(newDef) == string(def) { - result.Result[workloadID] = update.WorkloadResult{ - Status: update.ReleaseStatusSkipped, - } - } else { - workloadIDs = append(workloadIDs, workloadID) - result.Result[workloadID] = update.WorkloadResult{ - Status: update.ReleaseStatusSuccess, - } - } - return newDef, nil - }) + + updated, err := cluster.NewResourceStore(d.Manifests, working).UpdateWorkloadPolicies(workloadID, u) if err != nil { + result.Result[workloadID] = update.WorkloadResult{ + Status: update.ReleaseStatusFailed, + Error: err.Error(), + } switch err := err.(type) { case cluster.ManifestError: result.Result[workloadID] = update.WorkloadResult{ @@ -413,6 +397,16 @@ func (d *Daemon) updatePolicy(spec update.Spec, updates policy.Updates) updateFu return result, err } } + if !updated { + result.Result[workloadID] = update.WorkloadResult{ + Status: update.ReleaseStatusSkipped, + } + } else { + workloadIDs = append(workloadIDs, workloadID) + result.Result[workloadID] = update.WorkloadResult{ + Status: update.ReleaseStatusSuccess, + } + } } if len(workloadIDs) == 0 { return result, nil diff --git a/daemon/daemon_test.go b/daemon/daemon_test.go index 4058c8611e..3551996ddd 100644 --- a/daemon/daemon_test.go +++ b/daemon/daemon_test.go @@ -514,8 +514,7 @@ func TestDaemon_PolicyUpdate(t *testing.T) { return false } defer co.Clean() - dirs := co.ManifestDirs() - m, err := d.Manifests.LoadManifests(co.Dir(), dirs) + m, err := cluster.NewResourceStore(d.Manifests, co).GetAllResourcesByID() if err != nil { t.Fatalf("Error: %s", err.Error()) } @@ -863,8 +862,7 @@ func (w *wait) ForImageTag(t *testing.T, d *Daemon, workload, container, tag str } defer co.Clean() - dirs := co.ManifestDirs() - resources, err := d.Manifests.LoadManifests(co.Dir(), dirs) + resources, err := cluster.NewResourceStore(d.Manifests, co).GetAllResourcesByID() assert.NoError(t, err) workload, ok := resources[workload].(resource.Workload) diff --git a/daemon/loop.go b/daemon/loop.go index 65a4ec690f..1ec68f0f23 100644 --- a/daemon/loop.go +++ b/daemon/loop.go @@ -5,6 +5,7 @@ import ( "crypto/sha256" "encoding/base64" "fmt" + "path/filepath" "strings" "sync" "time" @@ -200,7 +201,8 @@ func (d *Daemon) doSync(logger log.Logger, lastKnownSyncTagRev *string, warnedAb } // Get a map of all resources defined in the repo - allResources, err := d.Manifests.LoadManifests(working.Dir(), working.ManifestDirs()) + resourceStore := cluster.NewResourceStore(d.Manifests, working) + allResources, err := resourceStore.GetAllResourcesByID() if err != nil { return errors.Wrap(err, "loading resources from repo") } @@ -241,9 +243,7 @@ func (d *Daemon) doSync(logger log.Logger, lastKnownSyncTagRev *string, warnedAb } } - // Figure out which workload IDs changed in this release changedResources := map[string]resource.Resource{} - if initialSync { // no synctag, We are syncing everything from scratch changedResources = allResources @@ -251,9 +251,22 @@ func (d *Daemon) doSync(logger log.Logger, lastKnownSyncTagRev *string, warnedAb ctx, cancel := context.WithTimeout(ctx, d.GitOpTimeout) changedFiles, err := working.ChangedFiles(ctx, oldTagRev) if err == nil && len(changedFiles) > 0 { + resourcesBySource, err := resourceStore.GetAllResourcesBySource() + if err != nil { + // TODO(fons): refactor this code and avoid duplicating identical errors + errors.Wrap(err, "loading resources from repo") + } // We had some changed files, we're syncing a diff // FIXME(michael): this won't be accurate when a file can have more than one resource - changedResources, err = d.Manifests.LoadManifests(working.Dir(), changedFiles) + for _, absolutePath := range changedFiles { + relPath, err := filepath.Rel(working.Dir(), absolutePath) + if err != nil { + errors.Wrap(err, "loading resources from repo") + } + if r, ok := resourcesBySource[relPath]; ok { + changedResources[r.ResourceID().String()] = r + } + } } cancel() if err != nil { diff --git a/daemon/loop_test.go b/daemon/loop_test.go index 4f11d03163..85425a7b07 100644 --- a/daemon/loop_test.go +++ b/daemon/loop_test.go @@ -1,18 +1,19 @@ package daemon import ( + "bytes" + "context" + "fmt" "io/ioutil" "os" + "path" "reflect" - "strings" "sync" "testing" "time" "github.com/go-kit/kit/log" - "context" - "github.com/weaveworks/flux" "github.com/weaveworks/flux/cluster" "github.com/weaveworks/flux/cluster/kubernetes" @@ -237,14 +238,25 @@ func TestDoSync_WithNewCommit(t *testing.T) { return err } // Push some new changes - dirs := checkout.ManifestDirs() - err = cluster.UpdateManifest(d.Manifests, checkout.Dir(), dirs, flux.MustParseResourceID("default:deployment/helloworld"), func(def []byte) ([]byte, error) { - // A simple modification so we have changes to push - return []byte(strings.Replace(string(def), "replicas: 5", "replicas: 4", -1)), nil - }) + resourcesByID, err := cluster.NewResourceStore(d.Manifests, checkout).GetAllResourcesByID() + if err != nil { + return err + } + targetResource := "default:deployment/helloworld" + res, ok := resourcesByID[targetResource] + if !ok { + return fmt.Errorf("resource not found: %q", targetResource) + + } + absolutePath := path.Join(checkout.Dir(), res.Source()) + def, err := ioutil.ReadFile(absolutePath) if err != nil { return err } + newDef := bytes.Replace(def, []byte("replicas: 5"), []byte("replicas: 4"), -1) + if err := ioutil.WriteFile(absolutePath, newDef, 0600); err != nil { + return err + } commitAction := git.CommitAction{Author: "", Message: "test commit"} err = checkout.CommitAndPush(ctx, commitAction, nil) diff --git a/git/working.go b/git/working.go index ae0854164f..3b1ee3452b 100644 --- a/git/working.go +++ b/git/working.go @@ -111,7 +111,7 @@ func (c *Checkout) Dir() string { // ManifestDirs returns the paths to the manifests files. It ensures // that at least one path is returned, so that it can be used with -// `Manifest.LoadManifests`. +// `Manifest.GetAllResourcesByID`. func (c *Checkout) ManifestDirs() []string { if len(c.config.Paths) == 0 { return []string{c.dir} diff --git a/release/context.go b/release/context.go index 630075876a..b2792c9c39 100644 --- a/release/context.go +++ b/release/context.go @@ -2,9 +2,6 @@ package release import ( "fmt" - "io/ioutil" - "os" - "path/filepath" "github.com/pkg/errors" @@ -17,18 +14,16 @@ import ( ) type ReleaseContext struct { - cluster cluster.Cluster - manifests cluster.Manifests - repo *git.Checkout - registry registry.Registry + cluster cluster.Cluster + resourceStore cluster.ResourceStore + registry registry.Registry } func NewReleaseContext(c cluster.Cluster, m cluster.Manifests, reg registry.Registry, repo *git.Checkout) *ReleaseContext { return &ReleaseContext{ - cluster: c, - manifests: m, - repo: repo, - registry: reg, + cluster: c, + resourceStore: cluster.NewResourceStore(m, repo), + registry: reg, } } @@ -36,26 +31,20 @@ func (rc *ReleaseContext) Registry() registry.Registry { return rc.registry } -func (rc *ReleaseContext) LoadManifests() (map[string]resource.Resource, error) { - return rc.manifests.LoadManifests(rc.repo.Dir(), rc.repo.ManifestDirs()) +func (rc *ReleaseContext) GetAllResources() (map[string]resource.Resource, error) { + return rc.resourceStore.GetAllResourcesByID() } func (rc *ReleaseContext) WriteUpdates(updates []*update.WorkloadUpdate) error { + err := func() error { for _, update := range updates { - manifestBytes, err := ioutil.ReadFile(update.ManifestPath) - if err != nil { - return err - } for _, container := range update.Updates { - manifestBytes, err = rc.manifests.UpdateImage(manifestBytes, update.ResourceID, container.Container, container.Target) + err := rc.resourceStore.SetWorkloadContainerImage(update.ResourceID, container.Container, container.Target) if err != nil { return errors.Wrapf(err, "updating resource %s in %s", update.ResourceID.String(), update.Resource.Source()) } } - if err = ioutil.WriteFile(update.ManifestPath, manifestBytes, os.FileMode(0600)); err != nil { - return errors.Wrapf(err, "writing updated file %s", update.Resource.Source()) - } } return nil }() @@ -127,9 +116,9 @@ func (rc *ReleaseContext) SelectWorkloads(results update.Result, prefilters, pos } // WorkloadsForUpdate collects all workloads defined in manifests and prepares a list of -// controller updates for each of them. It does not consider updatability. +// workload updates for each of them. It does not consider updatability. func (rc *ReleaseContext) WorkloadsForUpdate() (map[flux.ResourceID]*update.WorkloadUpdate, error) { - resources, err := rc.LoadManifests() + resources, err := rc.GetAllResources() if err != nil { return nil, err } @@ -138,9 +127,8 @@ func (rc *ReleaseContext) WorkloadsForUpdate() (map[flux.ResourceID]*update.Work for _, res := range resources { if wl, ok := res.(resource.Workload); ok { defined[res.ResourceID()] = &update.WorkloadUpdate{ - ResourceID: res.ResourceID(), - Resource: wl, - ManifestPath: filepath.Join(rc.repo.Dir(), res.Source()), + ResourceID: res.ResourceID(), + Resource: wl, } } } diff --git a/release/releaser.go b/release/releaser.go index 922856b38f..fc23866c00 100644 --- a/release/releaser.go +++ b/release/releaser.go @@ -31,7 +31,7 @@ func Release(rc *ReleaseContext, changes Changes, logger log.Logger) (results up logger = log.With(logger, "type", "release") - before, err := rc.LoadManifests() + before, err := rc.GetAllResources() updates, results, err := changes.CalculateRelease(rc, logger) if err != nil { return nil, err @@ -42,7 +42,7 @@ func Release(rc *ReleaseContext, changes Changes, logger log.Logger) (results up return nil, MakeReleaseError(errors.Wrap(err, "applying changes")) } - after, err := rc.LoadManifests() + after, err := rc.GetAllResources() if err != nil { return nil, MakeReleaseError(errors.Wrap(err, "loading resources after updates")) } diff --git a/release/releaser_test.go b/release/releaser_test.go index 4368318ce3..3c271259a7 100644 --- a/release/releaser_test.go +++ b/release/releaser_test.go @@ -231,7 +231,7 @@ func Test_InitContainer(t *testing.T) { }, } - cluster := mockCluster(hwSvc, lockedSvc, initSvc) + mCluster := mockCluster(hwSvc, lockedSvc, initSvc) expect := expected{ Specific: update.Result{ @@ -260,16 +260,15 @@ func Test_InitContainer(t *testing.T) { defer clean() testRelease(t, &ReleaseContext{ - cluster: cluster, - manifests: mockManifests, - registry: mockRegistry, - repo: checkout, + cluster: mCluster, + resourceStore: cluster.NewResourceStore(mockManifests, checkout), + registry: mockRegistry, }, spec, expect.Result()) } func Test_FilterLogic(t *testing.T) { - cluster := mockCluster(hwSvc, lockedSvc) // no testsvc in cluster, but it _is_ in repo + mCluster := mockCluster(hwSvc, lockedSvc) // no testsvc in cluster, but it _is_ in repo notInRepoService := "default:deployment/notInRepo" notInRepoSpec, _ := update.ParseResourceSpec(notInRepoService) @@ -452,17 +451,16 @@ func Test_FilterLogic(t *testing.T) { checkout, cleanup := setup(t) defer cleanup() testRelease(t, &ReleaseContext{ - cluster: cluster, - manifests: mockManifests, - registry: mockRegistry, - repo: checkout, + cluster: mCluster, + resourceStore: cluster.NewResourceStore(mockManifests, checkout), + registry: mockRegistry, }, tst.Spec, tst.Expected.Result()) }) } } func Test_Force_lockedWorkload(t *testing.T) { - cluster := mockCluster(lockedSvc) + mCluster := mockCluster(lockedSvc) success := update.WorkloadResult{ Status: update.ReleaseStatusSuccess, PerContainer: []update.ContainerUpdate{ @@ -545,17 +543,16 @@ func Test_Force_lockedWorkload(t *testing.T) { checkout, cleanup := setup(t) defer cleanup() testRelease(t, &ReleaseContext{ - cluster: cluster, - manifests: mockManifests, - registry: mockRegistry, - repo: checkout, + cluster: mCluster, + resourceStore: cluster.NewResourceStore(mockManifests, checkout), + registry: mockRegistry, }, tst.Spec, tst.Expected.Result()) }) } } func Test_Force_filteredContainer(t *testing.T) { - cluster := mockCluster(semverSvc) + mCluster := mockCluster(semverSvc) successNew := update.WorkloadResult{ Status: update.ReleaseStatusSuccess, PerContainer: []update.ContainerUpdate{ @@ -650,17 +647,16 @@ func Test_Force_filteredContainer(t *testing.T) { checkout, cleanup := setup(t) defer cleanup() testRelease(t, &ReleaseContext{ - cluster: cluster, - manifests: mockManifests, - registry: mockRegistry, - repo: checkout, + cluster: mCluster, + resourceStore: cluster.NewResourceStore(mockManifests, checkout), + registry: mockRegistry, }, tst.Spec, tst.Expected.Result()) }) } } func Test_ImageStatus(t *testing.T) { - cluster := mockCluster(hwSvc, lockedSvc, testSvc) + mCluster := mockCluster(hwSvc, lockedSvc, testSvc) upToDateRegistry := ®istryMock.Registry{ Images: []image.Info{ { @@ -720,10 +716,9 @@ func Test_ImageStatus(t *testing.T) { checkout, cleanup := setup(t) defer cleanup() ctx := &ReleaseContext{ - cluster: cluster, - manifests: mockManifests, - repo: checkout, - registry: upToDateRegistry, + cluster: mCluster, + resourceStore: cluster.NewResourceStore(mockManifests, checkout), + registry: upToDateRegistry, } testRelease(t, ctx, tst.Spec, tst.Expected.Result()) }) @@ -744,14 +739,13 @@ func Test_UpdateMultidoc(t *testing.T) { }, } - cluster := mockCluster(hwSvc, lockedSvc, egSvc) // no testsvc in cluster, but it _is_ in repo + mCluster := mockCluster(hwSvc, lockedSvc, egSvc) // no testsvc in cluster, but it _is_ in repo checkout, cleanup := setup(t) defer cleanup() ctx := &ReleaseContext{ - cluster: cluster, - manifests: mockManifests, - repo: checkout, - registry: mockRegistry, + cluster: mCluster, + resourceStore: cluster.NewResourceStore(mockManifests, checkout), + registry: mockRegistry, } spec := update.ReleaseImageSpec{ ServiceSpecs: []update.ResourceSpec{"default:deployment/multi-deploy"}, @@ -792,14 +786,13 @@ func Test_UpdateList(t *testing.T) { }, } - cluster := mockCluster(hwSvc, lockedSvc, egSvc) // no testsvc in cluster, but it _is_ in repo + mCluster := mockCluster(hwSvc, lockedSvc, egSvc) // no testsvc in cluster, but it _is_ in repo checkout, cleanup := setup(t) defer cleanup() ctx := &ReleaseContext{ - cluster: cluster, - manifests: mockManifests, - repo: checkout, - registry: mockRegistry, + cluster: mCluster, + resourceStore: cluster.NewResourceStore(mockManifests, checkout), + registry: mockRegistry, } spec := update.ReleaseImageSpec{ ServiceSpecs: []update.ResourceSpec{"default:deployment/list-deploy"}, @@ -827,14 +820,13 @@ func Test_UpdateList(t *testing.T) { } func Test_UpdateContainers(t *testing.T) { - cluster := mockCluster(hwSvc, lockedSvc) + mCluster := mockCluster(hwSvc, lockedSvc) checkout, cleanup := setup(t) defer cleanup() ctx := &ReleaseContext{ - cluster: cluster, - manifests: mockManifests, - repo: checkout, - registry: mockRegistry, + cluster: mCluster, + resourceStore: cluster.NewResourceStore(mockManifests, checkout), + registry: mockRegistry, } type expected struct { Err error @@ -1060,12 +1052,12 @@ type badManifests struct { kubernetes.Manifests } -func (m *badManifests) UpdateImage(def []byte, resourceID flux.ResourceID, container string, newImageID image.Ref) ([]byte, error) { +func (m *badManifests) SetWorkloadContainerImage(def []byte, id flux.ResourceID, container string, image image.Ref) ([]byte, error) { return def, nil } func Test_BadRelease(t *testing.T) { - cluster := mockCluster(hwSvc) + mCluster := mockCluster(hwSvc) spec := update.ReleaseImageSpec{ ServiceSpecs: []update.ResourceSpec{update.ResourceSpecAll}, ImageSpec: update.ImageSpecFromRef(newHwRef), @@ -1076,24 +1068,23 @@ func Test_BadRelease(t *testing.T) { defer cleanup1() ctx := &ReleaseContext{ - cluster: cluster, - manifests: &kubernetes.Manifests{}, - repo: checkout1, - registry: mockRegistry, + cluster: mCluster, + resourceStore: cluster.NewResourceStore(&kubernetes.Manifests{}, checkout1), + registry: mockRegistry, } _, err := Release(ctx, spec, log.NewNopLogger()) if err != nil { - t.Fatal("release with 'good' Manifests should succeed, but errored:", err) + t.Fatal("release with 'good' ResourceStore should succeed, but errored:", err) } checkout2, cleanup2 := setup(t) defer cleanup2() + manifests := &badManifests{Manifests: kubernetes.Manifests{constNamespacer("default")}} ctx = &ReleaseContext{ - cluster: cluster, - manifests: &badManifests{Manifests: kubernetes.Manifests{constNamespacer("default")}}, - repo: checkout2, - registry: mockRegistry, + cluster: mCluster, + resourceStore: cluster.NewResourceStore(manifests, checkout2), + registry: mockRegistry, } _, err = Release(ctx, spec, log.NewNopLogger()) if err == nil { diff --git a/sync/sync_test.go b/sync/sync_test.go index 011bbc3d54..68f74a334a 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -23,7 +23,8 @@ func TestSync(t *testing.T) { clus := &syncCluster{map[string]string{}} dirs := checkout.ManifestDirs() - resources, err := manifests.LoadManifests(checkout.Dir(), dirs) + rs := cluster.NewResourceStore(manifests, checkout) + resources, err := rs.GetAllResourcesByID() if err != nil { t.Fatal(err) } @@ -31,7 +32,7 @@ func TestSync(t *testing.T) { if err := Sync("synctest", resources, clus); err != nil { t.Fatal(err) } - checkClusterMatchesFiles(t, manifests, clus.resources, checkout.Dir(), dirs) + checkClusterMatchesFiles(t, rs, clus.resources, checkout.Dir(), dirs) } // --- @@ -73,8 +74,8 @@ func resourcesToStrings(resources map[string]resource.Resource) map[string]strin // Our invariant is that the model we can export from the cluster // should always reflect what's in git. So, let's check that. -func checkClusterMatchesFiles(t *testing.T, m cluster.Manifests, resources map[string]string, base string, dirs []string) { - files, err := m.LoadManifests(base, dirs) +func checkClusterMatchesFiles(t *testing.T, rs cluster.ResourceStore, resources map[string]string, base string, dirs []string) { + files, err := rs.GetAllResourcesByID() if err != nil { t.Fatal(err) } diff --git a/update/workload.go b/update/workload.go index f7314c82ff..7eeacbc49e 100644 --- a/update/workload.go +++ b/update/workload.go @@ -7,11 +7,10 @@ import ( ) type WorkloadUpdate struct { - ResourceID flux.ResourceID - Workload cluster.Workload - Resource resource.Workload - ManifestPath string - Updates []ContainerUpdate + ResourceID flux.ResourceID + Workload cluster.Workload + Resource resource.Workload + Updates []ContainerUpdate } type WorkloadFilter interface {