Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Extend namespace filtering to all operations on namespaced resources #1668

Merged
merged 4 commits into from
Mar 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Cluster interface {
// Get all of the services (optionally, from a specific namespace), excluding those
AllWorkloads(maybeNamespace string) ([]Workload, error)
SomeWorkloads([]flux.ResourceID) ([]Workload, error)
IsAllowedResource(flux.ResourceID) bool
squaremo marked this conversation as resolved.
Show resolved Hide resolved
Ping() error
Export() ([]byte, error)
Sync(SyncSet) error
Expand Down
2 changes: 1 addition & 1 deletion cluster/kubernetes/images.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func mergeCredentials(log func(...interface{}) error,
func (c *Cluster) ImagesToFetch() registry.ImageCreds {
allImageCreds := make(registry.ImageCreds)

namespaces, err := c.getAllowedNamespaces()
namespaces, err := c.getAllowedAndExistingNamespaces()
if err != nil {
c.logger.Log("err", errors.Wrap(err, "getting namespaces"))
return allImageCreds
Expand Down
76 changes: 53 additions & 23 deletions cluster/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
"github.com/weaveworks/flux/cluster/kubernetes/resource"
fhrclient "github.com/weaveworks/flux/integrations/client/clientset/versioned"
"github.com/weaveworks/flux/ssh"
)
Expand Down Expand Up @@ -95,22 +96,22 @@ type Cluster struct {
syncErrors map[flux.ResourceID]error
muSyncErrors sync.RWMutex

nsWhitelist []string
nsWhitelistLogged map[string]bool // to keep track of whether we've logged a problem with seeing a whitelisted ns
allowedNamespaces []string
loggedAllowedNS map[string]bool // to keep track of whether we've logged a problem with seeing an allowed namespace

imageExcludeList []string
mu sync.Mutex
}

// NewCluster returns a usable cluster.
func NewCluster(client ExtendedClient, applier Applier, sshKeyRing ssh.KeyRing, logger log.Logger, nsWhitelist []string, imageExcludeList []string) *Cluster {
func NewCluster(client ExtendedClient, applier Applier, sshKeyRing ssh.KeyRing, logger log.Logger, allowedNamespaces []string, imageExcludeList []string) *Cluster {
c := &Cluster{
client: client,
applier: applier,
logger: logger,
sshKeyRing: sshKeyRing,
nsWhitelist: nsWhitelist,
nsWhitelistLogged: map[string]bool{},
allowedNamespaces: allowedNamespaces,
loggedAllowedNS: map[string]bool{},
imageExcludeList: imageExcludeList,
}

Expand All @@ -119,12 +120,15 @@ func NewCluster(client ExtendedClient, applier Applier, sshKeyRing ssh.KeyRing,

// --- cluster.Cluster

// SomeWorkloads returns the workloads named, missing out any that aren't
// accessible in the cluster. They do not necessarily have to be returned
// in the order requested.
// SomeWorkloads returns the workloads named, missing out any that don't
2opremio marked this conversation as resolved.
Show resolved Hide resolved
// exist in the cluster or aren't in an allowed namespace.
// They do not necessarily have to be returned in the order requested.
func (c *Cluster) SomeWorkloads(ids []flux.ResourceID) (res []cluster.Workload, err error) {
var workloads []cluster.Workload
for _, id := range ids {
if !c.IsAllowedResource(id) {
continue
}
ns, kind, name := id.Components()

resourceKind, ok := resourceKinds[kind]
Expand All @@ -150,10 +154,10 @@ func (c *Cluster) SomeWorkloads(ids []flux.ResourceID) (res []cluster.Workload,
return workloads, nil
}

// AllWorkloads returns all workloads matching the criteria; that is, in
// AllWorkloads returns all workloads in allowed namespaces matching the criteria; that is, in
// the namespace (or any namespace if that argument is empty)
func (c *Cluster) AllWorkloads(namespace string) (res []cluster.Workload, err error) {
namespaces, err := c.getAllowedNamespaces()
namespaces, err := c.getAllowedAndExistingNamespaces()
if err != nil {
return nil, errors.Wrap(err, "getting namespaces")
}
Expand Down Expand Up @@ -213,7 +217,7 @@ func (c *Cluster) Ping() error {
func (c *Cluster) Export() ([]byte, error) {
var config bytes.Buffer

namespaces, err := c.getAllowedNamespaces()
namespaces, err := c.getAllowedAndExistingNamespaces()
if err != nil {
return nil, errors.Wrap(err, "getting namespaces")
}
Expand Down Expand Up @@ -262,24 +266,24 @@ func (c *Cluster) PublicSSHKey(regenerate bool) (ssh.PublicKey, error) {
return publicKey, nil
}

// getAllowedNamespaces returns a list of namespaces that the Flux instance is expected
// to have access to and can look for resources inside of.
// It returns a list of all namespaces unless a namespace whitelist has been set on the Cluster
// instance, in which case it returns a list containing the namespaces from the whitelist
// that exist in the cluster.
func (c *Cluster) getAllowedNamespaces() ([]apiv1.Namespace, error) {
if len(c.nsWhitelist) > 0 {
// getAllowedAndExistingNamespaces returns a list of existing namespaces that
// the Flux instance is expected to have access to and can look for resources inside of.
// It returns a list of all namespaces unless an explicit list of allowed namespaces
// has been set on the Cluster instance.
func (c *Cluster) getAllowedAndExistingNamespaces() ([]apiv1.Namespace, error) {
if len(c.allowedNamespaces) > 0 {
nsList := []apiv1.Namespace{}
for _, name := range c.nsWhitelist {
for _, name := range c.allowedNamespaces {
ns, err := c.client.CoreV1().Namespaces().Get(name, meta_v1.GetOptions{})
switch {
case err == nil:
c.nsWhitelistLogged[name] = false // reset, so if the namespace goes away we'll log it again
c.loggedAllowedNS[name] = false // reset, so if the namespace goes away we'll log it again
nsList = append(nsList, *ns)
case apierrors.IsUnauthorized(err) || apierrors.IsForbidden(err) || apierrors.IsNotFound(err):
if !c.nsWhitelistLogged[name] {
c.logger.Log("warning", "whitelisted namespace inaccessible", "namespace", name, "err", err)
c.nsWhitelistLogged[name] = true
if !c.loggedAllowedNS[name] {
c.logger.Log("warning", "cannot access allowed namespace",
"namespace", name, "err", err)
c.loggedAllowedNS[name] = true
}
default:
return nil, err
Expand All @@ -295,6 +299,32 @@ func (c *Cluster) getAllowedNamespaces() ([]apiv1.Namespace, error) {
return namespaces.Items, nil
}

func (c *Cluster) IsAllowedResource(id flux.ResourceID) bool {
if len(c.allowedNamespaces) == 0 {
// All resources are allowed when all namespaces are allowed
return true
}

namespace, kind, name := id.Components()
namespaceToCheck := namespace

if namespace == resource.ClusterScope {
squaremo marked this conversation as resolved.
Show resolved Hide resolved
// All cluster-scoped resources (not namespaced) are allowed ...
if kind != "namespace" {
return true
}
// ... except namespaces themselves, whose name needs to be explicitly allowed
namespaceToCheck = name
}

for _, allowedNS := range c.allowedNamespaces {
if namespaceToCheck == allowedNS {
return true
}
}
return false
}

// kind & apiVersion must be passed separately as the object's TypeMeta is not populated
func appendYAML(buffer *bytes.Buffer, apiVersion, kind string, object interface{}) error {
yamlBytes, err := k8syaml.Marshal(object)
Expand Down
2 changes: 1 addition & 1 deletion cluster/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func testGetAllowedNamespaces(t *testing.T, namespace []string, expected []strin
client := ExtendedClient{coreClient: clientset}
c := NewCluster(client, nil, nil, log.NewNopLogger(), namespace, []string{})

namespaces, err := c.getAllowedNamespaces()
namespaces, err := c.getAllowedAndExistingNamespaces()
if err != nil {
t.Errorf("The error should be nil, not: %s", err)
}
Expand Down
64 changes: 48 additions & 16 deletions cluster/kubernetes/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,19 @@ func (c *Cluster) Sync(syncSet cluster.SyncSet) error {

// NB we get all resources, since we care about leaving unsynced,
// _ignored_ resources alone.
clusterResources, err := c.getResourcesBySelector("")
clusterResources, err := c.getAllowedResourcesBySelector("")
if err != nil {
return errors.Wrap(err, "collating resources in cluster for sync")
}

cs := makeChangeSet()
var errs cluster.SyncError
for _, res := range syncSet.Resources {
id := res.ResourceID().String()
resID := res.ResourceID()
if !c.IsAllowedResource(resID) {
continue
}
id := resID.String()
// make a record of the checksum, whether we stage it to
// be applied or not, so that we don't delete it later.
csum := sha1.Sum(res.Bytes())
Expand Down Expand Up @@ -122,7 +126,7 @@ func (c *Cluster) collectGarbage(

orphanedResources := makeChangeSet()

clusterResources, err := c.getGCMarkedResourcesInSyncSet(syncSet.Name)
clusterResources, err := c.getAllowedGCMarkedResourcesInSyncSet(syncSet.Name)
if err != nil {
return nil, errors.Wrap(err, "collating resources in cluster for calculating garbage collection")
}
Expand Down Expand Up @@ -188,7 +192,7 @@ func (r *kuberesource) GetGCMark() string {
return r.obj.GetLabels()[gcMarkLabel]
}

func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesource, error) {
func (c *Cluster) getAllowedResourcesBySelector(selector string) (map[string]*kuberesource, error) {
listOptions := meta_v1.ListOptions{}
if selector != "" {
listOptions.LabelSelector = selector
Expand Down Expand Up @@ -216,14 +220,12 @@ func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesou
if !contains(verbs, "list") {
continue
}

groupVersion, err := schema.ParseGroupVersion(resource.GroupVersion)
if err != nil {
return nil, err
}

resourceClient := c.client.dynamicClient.Resource(groupVersion.WithResource(apiResource.Name))
data, err := resourceClient.List(listOptions)
gvr := groupVersion.WithResource(apiResource.Name)
list, err := c.listAllowedResources(apiResource.Namespaced, gvr, listOptions)
if err != nil {
if apierrors.IsForbidden(err) {
// we are not allowed to list this resource but
Expand All @@ -233,7 +235,7 @@ func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesou
return nil, err
}

for i, item := range data.Items {
for i, item := range list {
apiVersion := item.GetAPIVersion()
kind := item.GetKind()

Expand All @@ -244,7 +246,7 @@ func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesou
}
// TODO(michael) also exclude anything that has an ownerReference (that isn't "standard"?)

res := &kuberesource{obj: &data.Items[i], namespaced: apiResource.Namespaced}
res := &kuberesource{obj: &list[i], namespaced: apiResource.Namespaced}
result[res.ResourceID().String()] = res
}
}
Expand All @@ -253,18 +255,48 @@ func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesou
return result, nil
}

func (c *Cluster) getGCMarkedResourcesInSyncSet(syncSetName string) (map[string]*kuberesource, error) {
allGCMarkedResources, err := c.getResourcesBySelector(gcMarkLabel) // means "gcMarkLabel exists"
func (c *Cluster) listAllowedResources(
namespaced bool, gvr schema.GroupVersionResource, options meta_v1.ListOptions) ([]unstructured.Unstructured, error) {
if !namespaced {
// The resource is not namespaced, everything is allowed
resourceClient := c.client.dynamicClient.Resource(gvr)
data, err := resourceClient.List(options)
if err != nil {
return nil, err
}
return data.Items, nil
}

// List resources only from the allowed namespaces
namespaces, err := c.getAllowedAndExistingNamespaces()
if err != nil {
return nil, err
}
var result []unstructured.Unstructured
for _, ns := range namespaces {
data, err := c.client.dynamicClient.Resource(gvr).Namespace(ns.Name).List(options)
if err != nil {
return result, err
}
result = append(result, data.Items...)
}
return result, nil
}

func (c *Cluster) getAllowedGCMarkedResourcesInSyncSet(syncSetName string) (map[string]*kuberesource, error) {
allGCMarkedResources, err := c.getAllowedResourcesBySelector(gcMarkLabel) // means "gcMarkLabel exists"
if err != nil {
return nil, err
}
syncSetGCMarkedResources := map[string]*kuberesource{}
allowedSyncSetGCMarkedResources := map[string]*kuberesource{}
for resID, kres := range allGCMarkedResources {
if kres.GetGCMark() == makeGCMark(syncSetName, resID) {
syncSetGCMarkedResources[resID] = kres
// Discard resources whose mark doesn't match their resource ID
if kres.GetGCMark() != makeGCMark(syncSetName, resID) {
continue
}
allowedSyncSetGCMarkedResources[resID] = kres
}
return syncSetGCMarkedResources, nil
return allowedSyncSetGCMarkedResources, nil
}

func applyMetadata(res resource.Resource, syncSetName, checksum string) ([]byte, error) {
Expand Down
Loading