Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Implement ResourceStore to abstract out manifest operations
Browse files Browse the repository at this point in the history
ResourceStore is meant to abstract out file operations on explicit manifest
files, paving the way for supporting programatically-generated cluster
resources.
  • Loading branch information
2opremio authored and Alfonso Acosta committed Mar 20, 2019
1 parent 2d4cc4d commit df0cd55
Show file tree
Hide file tree
Showing 17 changed files with 326 additions and 210 deletions.
14 changes: 7 additions & 7 deletions cluster/kubernetes/manifests.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ type Manifests struct {
Namespacer namespacer
}

func postProcess(manifests map[string]kresource.KubeManifest, nser namespacer) (map[string]resource.Resource, error) {
result := map[string]resource.Resource{}
func postProcess(manifests map[string]kresource.KubeManifest, nser namespacer) ([]resource.Resource, error) {
result := []resource.Resource{}
for _, km := range manifests {
if nser != nil {
ns, err := nser.EffectiveNamespace(km)
Expand All @@ -36,21 +36,21 @@ func postProcess(manifests map[string]kresource.KubeManifest, nser namespacer) (
}
km.SetNamespace(ns)
}
result[km.ResourceID().String()] = km
result = append(result, km)
}
return result, nil
}

func (c *Manifests) LoadManifests(base string, paths []string) (map[string]resource.Resource, error) {
manifests, err := kresource.Load(base, paths)
func (c *Manifests) LoadManifests(baseDir string, paths []string) ([]resource.Resource, error) {
manifests, err := kresource.Load(baseDir, paths)
if err != nil {
return nil, err
}
return postProcess(manifests, c.Namespacer)
}

func (c *Manifests) UpdateImage(def []byte, id flux.ResourceID, container string, image image.Ref) ([]byte, error) {
func (c *Manifests) SetWorkloadContainerImage(def []byte, id flux.ResourceID, container string, image image.Ref) ([]byte, error) {
return updateWorkload(def, id, container, image)
}

// UpdatePolicies and ServicesWithPolicies in policies.go
// UpdateWorkloadPolicies in policies.go
14 changes: 1 addition & 13 deletions cluster/kubernetes/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ import (
"fmt"

"github.com/pkg/errors"
"gopkg.in/yaml.v2"

"github.com/weaveworks/flux"
kresource "github.com/weaveworks/flux/cluster/kubernetes/resource"
"github.com/weaveworks/flux/policy"
"github.com/weaveworks/flux/resource"
)

func (m *Manifests) UpdatePolicies(def []byte, id flux.ResourceID, update policy.Update) ([]byte, error) {
func (m *Manifests) UpdateWorkloadPolicies(def []byte, id flux.ResourceID, update policy.Update) ([]byte, error) {
ns, kind, name := id.Components()
add, del := update.Add, update.Remove

Expand Down Expand Up @@ -55,17 +54,6 @@ type manifest struct {
} `yaml:"metadata"`
}

func extractAnnotations(def []byte) (map[string]string, error) {
var m manifest
if err := yaml.Unmarshal(def, &m); err != nil {
return nil, errors.Wrap(err, "decoding manifest for annotations")
}
if m.Metadata.Annotations == nil {
return map[string]string{}, nil
}
return m.Metadata.Annotations, nil
}

func extractContainers(def []byte, id flux.ResourceID) ([]resource.Container, error) {
resources, err := kresource.ParseMultidoc(def, "stdin")
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cluster/kubernetes/policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestUpdatePolicies(t *testing.T) {
caseIn := templToString(t, annotationsTemplate, c.in)
caseOut := templToString(t, annotationsTemplate, c.out)
resourceID := flux.MustParseResourceID("default:deployment/nginx")
out, err := (&Manifests{}).UpdatePolicies([]byte(caseIn), resourceID, c.update)
out, err := (&Manifests{}).UpdateWorkloadPolicies([]byte(caseIn), resourceID, c.update)
assert.Equal(t, c.wantErr, err != nil)
if !c.wantErr {
assert.Equal(t, string(out), caseOut)
Expand All @@ -185,7 +185,7 @@ func TestUpdatePolicies_invalidTagPattern(t *testing.T) {
update := policy.Update{
Add: policy.Set{policy.TagPrefix("nginx"): "semver:invalid"},
}
_, err := (&Manifests{}).UpdatePolicies(nil, resourceID, update)
_, err := (&Manifests{}).UpdateWorkloadPolicies(nil, resourceID, update)
assert.Error(t, err)
}

Expand Down
8 changes: 6 additions & 2 deletions cluster/kubernetes/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/weaveworks/flux/cluster"
kresource "github.com/weaveworks/flux/cluster/kubernetes/resource"
fluxfake "github.com/weaveworks/flux/integrations/client/clientset/versioned/fake"
"github.com/weaveworks/flux/resource"
"github.com/weaveworks/flux/sync"
)

Expand Down Expand Up @@ -280,8 +281,11 @@ metadata:
if err != nil {
t.Fatal(err)
}

err = sync.Sync("testset", resources, kube)
resourcesByID := map[string]resource.Resource{}
for _, r := range resources {
resourcesByID[r.ResourceID().String()] = r
}
err = sync.Sync("testset", resourcesByID, kube)
if !expectErrors && err != nil {
t.Error(err)
}
Expand Down
49 changes: 7 additions & 42 deletions cluster/manifests.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package cluster

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/image"
Expand All @@ -17,52 +14,20 @@ type ManifestError struct {
}

func ErrResourceNotFound(name string) error {
return ManifestError{fmt.Errorf("manifest for resource %s not found under manifests path", name)}
return ManifestError{fmt.Errorf("manifest for resource %s not found", name)}
}

// Manifests represents how a set of files are used as definitions of
// Manifests represents a set of files containing definitions of
// resources, e.g., in Kubernetes, YAML files describing Kubernetes
// resources.
type Manifests interface {
// Update the image in a manifest's bytes to that given
UpdateImage(def []byte, resourceID flux.ResourceID, container string, newImageID image.Ref) ([]byte, error)
// Set the image of a container in a manifest's bytes to that given
SetWorkloadContainerImage(def []byte, resourceID flux.ResourceID, container string, newImageID image.Ref) ([]byte, error)
// Load all the resource manifests under the paths
// given. `baseDir` is used to relativise the paths, which are
// supplied as absolute paths to directories or files; at least
// one path should be supplied, even if it is the same as `baseDir`.
LoadManifests(baseDir string, paths []string) (map[string]resource.Resource, error)
// UpdatePolicies modifies a manifest to apply the policy update specified
UpdatePolicies([]byte, flux.ResourceID, policy.Update) ([]byte, error)
}

// UpdateManifest looks for the manifest for the identified resource,
// reads its contents, applies f(contents), and writes the results
// back to the file.
func UpdateManifest(m Manifests, root string, paths []string, id flux.ResourceID, f func(manifest []byte) ([]byte, error)) error {
resources, err := m.LoadManifests(root, paths)
if err != nil {
return err
}

resource, ok := resources[id.String()]
if !ok {
return ErrResourceNotFound(id.String())
}

path := filepath.Join(root, resource.Source())
def, err := ioutil.ReadFile(path)
if err != nil {
return err
}

newDef, err := f(def)
if err != nil {
return err
}

fi, err := os.Stat(path)
if err != nil {
return err
}
return ioutil.WriteFile(path, newDef, fi.Mode())
LoadManifests(baseDir string, paths []string) ([]resource.Resource, error)
// UpdatWorkloadPolicies modifies a manifest to apply the policy update specified
UpdateWorkloadPolicies([]byte, flux.ResourceID, policy.Update) ([]byte, error)
}
31 changes: 17 additions & 14 deletions cluster/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,20 @@ import (

// Doubles as a cluster.Cluster and cluster.Manifests implementation
type Mock struct {
AllWorkloadsFunc func(maybeNamespace string) ([]Workload, error)
SomeWorkloadsFunc func([]flux.ResourceID) ([]Workload, error)
PingFunc func() error
ExportFunc func() ([]byte, error)
SyncFunc func(SyncSet) error
PublicSSHKeyFunc func(regenerate bool) (ssh.PublicKey, error)
UpdateImageFunc func(def []byte, id flux.ResourceID, container string, newImageID image.Ref) ([]byte, error)
LoadManifestsFunc func(base string, paths []string) (map[string]resource.Resource, error)
UpdatePoliciesFunc func([]byte, flux.ResourceID, policy.Update) ([]byte, error)
AllWorkloadsFunc func(maybeNamespace string) ([]Workload, error)
SomeWorkloadsFunc func([]flux.ResourceID) ([]Workload, error)
PingFunc func() error
ExportFunc func() ([]byte, error)
SyncFunc func(SyncSet) error
PublicSSHKeyFunc func(regenerate bool) (ssh.PublicKey, error)
SetWorkloadContainerImageFunc func(def []byte, id flux.ResourceID, container string, newImageID image.Ref) ([]byte, error)
LoadManifestsFunc func(base string, paths []string) ([]resource.Resource, error)
UpdateWorkloadPoliciesFunc func([]byte, flux.ResourceID, policy.Update) ([]byte, error)
}

var _ Cluster = &Mock{}
var _ Manifests = &Mock{}

func (m *Mock) AllWorkloads(maybeNamespace string) ([]Workload, error) {
return m.AllWorkloadsFunc(maybeNamespace)
}
Expand All @@ -45,14 +48,14 @@ func (m *Mock) PublicSSHKey(regenerate bool) (ssh.PublicKey, error) {
return m.PublicSSHKeyFunc(regenerate)
}

func (m *Mock) UpdateImage(def []byte, id flux.ResourceID, container string, newImageID image.Ref) ([]byte, error) {
return m.UpdateImageFunc(def, id, container, newImageID)
func (m *Mock) SetWorkloadContainerImage(def []byte, id flux.ResourceID, container string, newImageID image.Ref) ([]byte, error) {
return m.SetWorkloadContainerImageFunc(def, id, container, newImageID)
}

func (m *Mock) LoadManifests(base string, paths []string) (map[string]resource.Resource, error) {
func (m *Mock) LoadManifests(base string, paths []string) ([]resource.Resource, error) {
return m.LoadManifestsFunc(base, paths)
}

func (m *Mock) UpdatePolicies(def []byte, id flux.ResourceID, p policy.Update) ([]byte, error) {
return m.UpdatePoliciesFunc(def, id, p)
func (m *Mock) UpdateWorkloadPolicies(def []byte, id flux.ResourceID, p policy.Update) ([]byte, error) {
return m.UpdateWorkloadPoliciesFunc(def, id, p)
}
160 changes: 160 additions & 0 deletions cluster/resourcestore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package cluster

import (
"bytes"
"io/ioutil"
"os"
"path"
"sync"

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/git"
"github.com/weaveworks/flux/image"
"github.com/weaveworks/flux/policy"
"github.com/weaveworks/flux/resource"
)

// ResourceStore manages all the cluster resources defined in a repository, explicitly declared in a file or not.
// e.g., generated and updated by a .flux.yaml file, explicit Kubernetes .yaml manifests files ...
type ResourceStore interface {
SetWorkloadContainerImage(resourceID flux.ResourceID, container string, newImageID image.Ref) error
GetAllResourcesByID() (map[string]resource.Resource, error)
GetAllResourcesBySource() (map[string]resource.Resource, error)
UpdateWorkloadPolicies(flux.ResourceID, policy.Update) (bool, error)
}

var _ ResourceStore = &resourceStore{}

type resourceStore struct {
manifests Manifests
checkout *git.Checkout
cachedResourcesBySource map[string]resource.Resource
cachedResourcesByID map[string]resource.Resource
// TODO(fons): do we really need locking, can the checkout shared across threads?
sync.RWMutex
}

func NewResourceStore(manifests Manifests, checkout *git.Checkout) *resourceStore {
return &resourceStore{
manifests: manifests,
checkout: checkout,
}
}

// Set the container image of a resource in the store
func (s *resourceStore) SetWorkloadContainerImage(id flux.ResourceID, container string, newImageID image.Ref) error {
resourcesByID, _, err := s.getAllResources()
if err != nil {
return err
}
resource, ok := resourcesByID[id.String()]
if !ok {
return ErrResourceNotFound(id.String())
}
absolutePath := path.Join(s.checkout.Dir(), resource.Source())
def, err := ioutil.ReadFile(absolutePath)
if err != nil {
return err
}
newDef, err := s.manifests.SetWorkloadContainerImage(def, id, container, newImageID)
if err != nil {
return err
}
fi, err := os.Stat(absolutePath)
if err != nil {
return err
}
if err := ioutil.WriteFile(absolutePath, newDef, fi.Mode()); err != nil {
return err
}
// Reset cached resources, since we have modified one
// TODO(fons): make this more performant.
// We could, for instance, parse it after modifying it and add it to the resources.
s.resetCache()
return nil
}

// Load all the resources in the store. The returned map is indexed by the resource IDs
func (s *resourceStore) GetAllResourcesByID() (map[string]resource.Resource, error) {
resourcesByID, _, err := s.getAllResources()
if err != nil {
return nil, err
}
return resourcesByID, nil
}

// Load all the resources in the store. The returned map is indexed by the file which defines de resource
func (s *resourceStore) GetAllResourcesBySource() (map[string]resource.Resource, error) {
_, resourcesBySource, err := s.getAllResources()
if err != nil {
return nil, err
}
return resourcesBySource, nil
}

// UpdateWorkloadPolicies modifies a resource in the store to apply the policy-update specified
// It returns whether a change in the resource was actually made as a result of the change
func (s *resourceStore) UpdateWorkloadPolicies(id flux.ResourceID, update policy.Update) (bool, error) {
resourcesByID, _, err := s.getAllResources()
if err != nil {
return false, err
}
resource, ok := resourcesByID[id.String()]
if !ok {
return false, ErrResourceNotFound(id.String())
}
absolutePath := path.Join(s.checkout.Dir(), resource.Source())
def, err := ioutil.ReadFile(absolutePath)
if err != nil {
return false, err
}
newDef, err := s.manifests.UpdateWorkloadPolicies(def, id, update)
if err != nil {
return false, err
}
fi, err := os.Stat(absolutePath)
if err != nil {
return false, err
}
if err := ioutil.WriteFile(absolutePath, newDef, fi.Mode()); err != nil {
return false, err
}
comparison := bytes.Compare(def, newDef)
// Reset cached resources, since we have modified one
// TODO(fons): make this more performant.
// We could, for instance, parse it after modifying it and add it to the resources.
s.resetCache()
return comparison != 0, nil
}

func (s *resourceStore) getAllResources() (map[string]resource.Resource, map[string]resource.Resource, error) {
s.RLock()
if s.cachedResourcesByID != nil && s.cachedResourcesBySource != nil {
s.RUnlock()
return s.cachedResourcesByID, s.cachedResourcesBySource, nil
}
s.RUnlock()

resources, err := s.manifests.LoadManifests(s.checkout.Dir(), s.checkout.ManifestDirs())
if err != nil {
return nil, nil, err
}
resourcesByID := map[string]resource.Resource{}
resourcesBySource := map[string]resource.Resource{}
for _, r := range resources {
resourcesByID[r.ResourceID().String()] = r
resourcesBySource[r.Source()] = r
}
s.Lock()
s.cachedResourcesByID = resourcesByID
s.cachedResourcesBySource = resourcesBySource
s.Unlock()
return resourcesByID, resourcesBySource, nil
}

func (s *resourceStore) resetCache() {
s.Lock()
s.cachedResourcesByID = nil
s.cachedResourcesBySource = nil
s.Unlock()
}
Loading

0 comments on commit df0cd55

Please sign in to comment.