Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
WIP shuffle
Browse files Browse the repository at this point in the history
  • Loading branch information
Timer committed Oct 11, 2018
1 parent 6031edf commit f58c1e8
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 73 deletions.
65 changes: 0 additions & 65 deletions cluster/sync.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
package cluster

import (
"crypto/sha1"
"encoding/hex"
"fmt"
"strings"

"github.com/imdario/mergo"
"github.com/pkg/errors"
"github.com/weaveworks/flux/resource"
yaml "gopkg.in/yaml.v2"
)

// Definitions for use in synchronising a cluster with a git repo.
Expand All @@ -22,7 +16,6 @@ type SyncAction struct {
}

type SyncDef struct {
StackName string
// The actions to undertake
Actions []SyncAction
}
Expand All @@ -34,64 +27,6 @@ type ResourceError struct {

type SyncError []ResourceError

// GetStackName returns the stack name associated with this SyncDef
func (def *SyncDef) GetStackName() string {
if def.StackName == "" {
return "default"
}
return def.StackName
}

// Checksum generates a unique identifier for all apply actions in the stack
func (def *SyncDef) Checksum() string {
checksum := sha1.New()
for _, action := range def.Actions {
if action.Apply != nil {
checksum.Write(action.Apply.Bytes())
}
}
return hex.EncodeToString(checksum.Sum(nil))
}

// Associate appends a label and metadata to a set K8s resource to identify
// them as originating from the same stack.
func (def *SyncDef) Associate(stackLabel, stackName, checksumAnnotation, checksum string) error {
for _, action := range def.Actions {
// There is no apply action associated with this action
if action.Apply == nil {
continue
}

source := action.Apply.Source()
bytes := action.Apply.Bytes()

// We decode the YAML into a generic map because we don't know its type
definition := make(map[interface{}]interface{})
if err := yaml.Unmarshal(bytes, &definition); err != nil {
return errors.Wrap(err, fmt.Sprintf("failed to parse yaml from %s", source))
}
mixin := make(map[interface{}]interface{})
mixinYaml := []byte(fmt.Sprintf(("" +
"metadata:\n" +
" labels:\n" +
" %s: %s\n" +
" annotations:\n" +
" %s: %s\n"),
stackLabel, stackName, checksumAnnotation, checksum,
))
if err := yaml.Unmarshal(mixinYaml, &mixin); err != nil {
return errors.Wrap(err, "failed to parse yaml for mixin")
}
mergo.Merge(&definition, mixin)
newBytes, err := yaml.Marshal(definition)
if err != nil {
return errors.Wrap(err, "failed to serialize yaml after mixing in association")
}
action.Apply.UpdateBytes(newBytes)
}
return nil
}

func (err SyncError) Error() string {
var errs []string
for _, e := range err {
Expand Down
62 changes: 54 additions & 8 deletions sync/sync.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,66 @@
package sync

import (
"crypto/sha1"
"encoding/hex"
"fmt"

"github.com/go-kit/kit/log"
"github.com/imdario/mergo"
"github.com/pkg/errors"
yaml "gopkg.in/yaml.v2"

"github.com/weaveworks/flux/cluster"
"github.com/weaveworks/flux/policy"
"github.com/weaveworks/flux/resource"
)

// Checksum generates a unique identifier for all apply actions in the stack
func getStackChecksum(repoResources map[string]resource.Resource) string {
checksum := sha1.New()
for _, resource := range repoResources {
checksum.Write(resource.Bytes())
}
return hex.EncodeToString(checksum.Sum(nil))
}

func (def *SyncDef) Associate(stackLabel, stackName, checksumAnnotation, checksum string) error {
for _, action := range def.Actions {
// There is no apply action associated with this action
if action.Apply == nil {
continue
}

source := action.Apply.Source()
bytes := action.Apply.Bytes()

// We decode the YAML into a generic map because we don't know its type
definition := make(map[interface{}]interface{})
if err := yaml.Unmarshal(bytes, &definition); err != nil {
return errors.Wrap(err, fmt.Sprintf("failed to parse yaml from %s", source))
}
mixin := make(map[interface{}]interface{})
mixinYaml := []byte(fmt.Sprintf(("" +
"metadata:\n" +
" labels:\n" +
" %s: %s\n" +
" annotations:\n" +
" %s: %s\n"),
stackLabel, stackName, checksumAnnotation, checksum,
))
if err := yaml.Unmarshal(mixinYaml, &mixin); err != nil {
return errors.Wrap(err, "failed to parse yaml for mixin")
}
mergo.Merge(&definition, mixin)
newBytes, err := yaml.Marshal(definition)
if err != nil {
return errors.Wrap(err, "failed to serialize yaml after mixing in association")
}
action.Apply.UpdateBytes(newBytes)
}
return nil
}

// Sync synchronises the cluster to the files in a directory
func Sync(m cluster.Manifests, repoResources map[string]resource.Resource, clus cluster.Cluster, tracks bool, deletes bool, logger log.Logger) error {
// Get a map of resources defined in the cluster
Expand All @@ -27,7 +79,7 @@ func Sync(m cluster.Manifests, repoResources map[string]resource.Resource, clus
// to figuring out what's changed, and applying that. We're
// relying on Kubernetes to decide for each application if it is a
// no-op.
sync := cluster.SyncDef{StackName: "default"}
sync := cluster.SyncDef{}

// DANGER ZONE (tamara) This works and is dangerous. At the moment will delete Flux and
// other pods unless the relevant manifests are part of the user repo. Needs a lot of thought
Expand All @@ -53,13 +105,7 @@ func Sync(m cluster.Manifests, repoResources map[string]resource.Resource, clus
}
}

// Exit function if the cluster Sync failed or if the cluster is not in
// tracking mode. Tracking mode cleans up orphaned resources created in
// a previous sync.
if err = clus.Sync(sync); err != nil || !tracks {
return err
}
return nil
return clus.Sync(sync)
}

func prepareSyncDelete(logger log.Logger, repoResources map[string]resource.Resource, id string, res resource.Resource, sync *cluster.SyncDef) {
Expand Down

0 comments on commit f58c1e8

Please sign in to comment.