Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Implement configurable timeout for RPC operations #2171

Merged
merged 2 commits into from
Jun 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cluster

import (
"context"
"errors"

"github.com/weaveworks/flux"
Expand All @@ -25,11 +26,11 @@ const (
// are distinct interfaces.
type Cluster interface {
// Get all of the services (optionally, from a specific namespace), excluding those
AllWorkloads(maybeNamespace string) ([]Workload, error)
SomeWorkloads([]flux.ResourceID) ([]Workload, error)
AllWorkloads(ctx context.Context, maybeNamespace string) ([]Workload, error)
SomeWorkloads(ctx context.Context, ids []flux.ResourceID) ([]Workload, error)
IsAllowedResource(flux.ResourceID) bool
Ping() error
Export() ([]byte, error)
Export(ctx context.Context) ([]byte, error)
Sync(SyncSet) error
PublicSSHKey(regenerate bool) (ssh.PublicKey, error)
}
Expand Down
6 changes: 4 additions & 2 deletions cluster/kubernetes/images.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kubernetes

import (
"context"
"fmt"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -122,8 +123,9 @@ func mergeCredentials(log func(...interface{}) error,
// ImagesToFetch is a k8s specific method to get a list of images to update along with their credentials
func (c *Cluster) ImagesToFetch() registry.ImageCreds {
allImageCreds := make(registry.ImageCreds)
ctx := context.Background()

namespaces, err := c.getAllowedAndExistingNamespaces()
namespaces, err := c.getAllowedAndExistingNamespaces(ctx)
if err != nil {
c.logger.Log("err", errors.Wrap(err, "getting namespaces"))
return allImageCreds
Expand All @@ -132,7 +134,7 @@ func (c *Cluster) ImagesToFetch() registry.ImageCreds {
for _, ns := range namespaces {
seenCreds := make(map[string]registry.Credentials)
for kind, resourceKind := range resourceKinds {
workloads, err := resourceKind.getWorkloads(c, ns.Name)
workloads, err := resourceKind.getWorkloads(ctx, c, ns.Name)
if err != nil {
if apierrors.IsNotFound(err) || apierrors.IsForbidden(err) {
// Skip unsupported or forbidden resource kinds
Expand Down
25 changes: 16 additions & 9 deletions cluster/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kubernetes

import (
"bytes"
"context"
"encoding/json"
"fmt"
"sync"
Expand Down Expand Up @@ -127,7 +128,7 @@ func NewCluster(client ExtendedClient, applier Applier, sshKeyRing ssh.KeyRing,
// SomeWorkloads returns the workloads named, missing out any that don't
// exist in the cluster or aren't in an allowed namespace.
// They do not necessarily have to be returned in the order requested.
func (c *Cluster) SomeWorkloads(ids []flux.ResourceID) (res []cluster.Workload, err error) {
func (c *Cluster) SomeWorkloads(ctx context.Context, ids []flux.ResourceID) (res []cluster.Workload, err error) {
var workloads []cluster.Workload
for _, id := range ids {
if !c.IsAllowedResource(id) {
Expand All @@ -141,7 +142,7 @@ func (c *Cluster) SomeWorkloads(ids []flux.ResourceID) (res []cluster.Workload,
continue
}

workload, err := resourceKind.getWorkload(c, ns, name)
workload, err := resourceKind.getWorkload(ctx, c, ns, name)
if err != nil {
if apierrors.IsForbidden(err) || apierrors.IsNotFound(err) {
continue
Expand All @@ -161,8 +162,8 @@ func (c *Cluster) SomeWorkloads(ids []flux.ResourceID) (res []cluster.Workload,

// AllWorkloads returns all workloads in allowed namespaces matching the criteria; that is, in
// the namespace (or any namespace if that argument is empty)
func (c *Cluster) AllWorkloads(namespace string) (res []cluster.Workload, err error) {
namespaces, err := c.getAllowedAndExistingNamespaces()
func (c *Cluster) AllWorkloads(ctx context.Context, namespace string) (res []cluster.Workload, err error) {
namespaces, err := c.getAllowedAndExistingNamespaces(ctx)
if err != nil {
return nil, errors.Wrap(err, "getting namespaces")
}
Expand All @@ -174,7 +175,7 @@ func (c *Cluster) AllWorkloads(namespace string) (res []cluster.Workload, err er
}

for kind, resourceKind := range resourceKinds {
workloads, err := resourceKind.getWorkloads(c, ns.Name)
workloads, err := resourceKind.getWorkloads(ctx, c, ns.Name)
if err != nil {
switch {
case apierrors.IsNotFound(err):
Expand Down Expand Up @@ -219,10 +220,10 @@ func (c *Cluster) Ping() error {
}

// Export exports cluster resources
func (c *Cluster) Export() ([]byte, error) {
func (c *Cluster) Export(ctx context.Context) ([]byte, error) {
var config bytes.Buffer

namespaces, err := c.getAllowedAndExistingNamespaces()
namespaces, err := c.getAllowedAndExistingNamespaces(ctx)
if err != nil {
return nil, errors.Wrap(err, "getting namespaces")
}
Expand All @@ -240,7 +241,7 @@ func (c *Cluster) Export() ([]byte, error) {
}

for _, resourceKind := range resourceKinds {
workloads, err := resourceKind.getWorkloads(c, ns.Name)
workloads, err := resourceKind.getWorkloads(ctx, c, ns.Name)
squaremo marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
switch {
case apierrors.IsNotFound(err):
Expand Down Expand Up @@ -281,10 +282,13 @@ func (c *Cluster) PublicSSHKey(regenerate bool) (ssh.PublicKey, error) {
// the Flux instance is expected to have access to and can look for resources inside of.
// It returns a list of all namespaces unless an explicit list of allowed namespaces
// has been set on the Cluster instance.
func (c *Cluster) getAllowedAndExistingNamespaces() ([]apiv1.Namespace, error) {
func (c *Cluster) getAllowedAndExistingNamespaces(ctx context.Context) ([]apiv1.Namespace, error) {
hiddeco marked this conversation as resolved.
Show resolved Hide resolved
if len(c.allowedNamespaces) > 0 {
nsList := []apiv1.Namespace{}
for _, name := range c.allowedNamespaces {
if err := ctx.Err(); err != nil {
return nil, err
}
ns, err := c.client.CoreV1().Namespaces().Get(name, meta_v1.GetOptions{})
switch {
case err == nil:
Expand All @@ -303,6 +307,9 @@ func (c *Cluster) getAllowedAndExistingNamespaces() ([]apiv1.Namespace, error) {
return nsList, nil
}

if err := ctx.Err(); err != nil {
return nil, err
}
namespaces, err := c.client.CoreV1().Namespaces().List(meta_v1.ListOptions{})
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion cluster/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kubernetes

import (
"context"
"reflect"
"testing"

Expand Down Expand Up @@ -28,7 +29,7 @@ func testGetAllowedNamespaces(t *testing.T, namespace []string, expected []strin
client := ExtendedClient{coreClient: clientset}
c := NewCluster(client, nil, nil, log.NewNopLogger(), namespace, []string{})

namespaces, err := c.getAllowedAndExistingNamespaces()
namespaces, err := c.getAllowedAndExistingNamespaces(context.Background())
if err != nil {
t.Errorf("The error should be nil, not: %s", err)
}
Expand Down
65 changes: 51 additions & 14 deletions cluster/kubernetes/resourcekinds.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kubernetes

import (
"context"
"strings"

apiapps "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -30,8 +31,8 @@ const AntecedentAnnotation = "flux.weave.works/antecedent"
// Kind registry

type resourceKind interface {
getWorkload(c *Cluster, namespace, name string) (workload, error)
getWorkloads(c *Cluster, namespace string) ([]workload, error)
getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error)
getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error)
}

var (
Expand Down Expand Up @@ -114,7 +115,10 @@ func (w workload) toClusterWorkload(resourceID flux.ResourceID) cluster.Workload

type deploymentKind struct{}

func (dk *deploymentKind) getWorkload(c *Cluster, namespace, name string) (workload, error) {
func (dk *deploymentKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) {
if err := ctx.Err(); err != nil {
return workload{}, err
}
deployment, err := c.client.AppsV1().Deployments(namespace).Get(name, meta_v1.GetOptions{})
if err != nil {
return workload{}, err
Expand All @@ -123,7 +127,10 @@ func (dk *deploymentKind) getWorkload(c *Cluster, namespace, name string) (workl
return makeDeploymentWorkload(deployment), nil
}

func (dk *deploymentKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) {
func (dk *deploymentKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
deployments, err := c.client.AppsV1().Deployments(namespace).List(meta_v1.ListOptions{})
if err != nil {
return nil, err
Expand Down Expand Up @@ -191,7 +198,10 @@ func makeDeploymentWorkload(deployment *apiapps.Deployment) workload {

type daemonSetKind struct{}

func (dk *daemonSetKind) getWorkload(c *Cluster, namespace, name string) (workload, error) {
func (dk *daemonSetKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) {
if err := ctx.Err(); err != nil {
return workload{}, err
}
daemonSet, err := c.client.AppsV1().DaemonSets(namespace).Get(name, meta_v1.GetOptions{})
if err != nil {
return workload{}, err
Expand All @@ -200,7 +210,10 @@ func (dk *daemonSetKind) getWorkload(c *Cluster, namespace, name string) (worklo
return makeDaemonSetWorkload(daemonSet), nil
}

func (dk *daemonSetKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) {
func (dk *daemonSetKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
daemonSets, err := c.client.AppsV1().DaemonSets(namespace).List(meta_v1.ListOptions{})
if err != nil {
return nil, err
Expand Down Expand Up @@ -252,7 +265,10 @@ func makeDaemonSetWorkload(daemonSet *apiapps.DaemonSet) workload {

type statefulSetKind struct{}

func (dk *statefulSetKind) getWorkload(c *Cluster, namespace, name string) (workload, error) {
func (dk *statefulSetKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) {
if err := ctx.Err(); err != nil {
return workload{}, err
}
statefulSet, err := c.client.AppsV1().StatefulSets(namespace).Get(name, meta_v1.GetOptions{})
if err != nil {
return workload{}, err
Expand All @@ -261,7 +277,10 @@ func (dk *statefulSetKind) getWorkload(c *Cluster, namespace, name string) (work
return makeStatefulSetWorkload(statefulSet), nil
}

func (dk *statefulSetKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) {
func (dk *statefulSetKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
statefulSets, err := c.client.AppsV1().StatefulSets(namespace).List(meta_v1.ListOptions{})
if err != nil {
return nil, err
Expand Down Expand Up @@ -345,7 +364,10 @@ func makeStatefulSetWorkload(statefulSet *apiapps.StatefulSet) workload {

type cronJobKind struct{}

func (dk *cronJobKind) getWorkload(c *Cluster, namespace, name string) (workload, error) {
func (dk *cronJobKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) {
if err := ctx.Err(); err != nil {
return workload{}, err
}
cronJob, err := c.client.BatchV1beta1().CronJobs(namespace).Get(name, meta_v1.GetOptions{})
if err != nil {
return workload{}, err
Expand All @@ -354,7 +376,10 @@ func (dk *cronJobKind) getWorkload(c *Cluster, namespace, name string) (workload
return makeCronJobWorkload(cronJob), nil
}

func (dk *cronJobKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) {
func (dk *cronJobKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
cronJobs, err := c.client.BatchV1beta1().CronJobs(namespace).List(meta_v1.ListOptions{})
if err != nil {
return nil, err
Expand Down Expand Up @@ -382,15 +407,21 @@ func makeCronJobWorkload(cronJob *apibatch.CronJob) workload {

type fluxHelmReleaseKind struct{}

func (fhr *fluxHelmReleaseKind) getWorkload(c *Cluster, namespace, name string) (workload, error) {
func (fhr *fluxHelmReleaseKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) {
if err := ctx.Err(); err != nil {
return workload{}, err
}
fluxHelmRelease, err := c.client.HelmV1alpha2().FluxHelmReleases(namespace).Get(name, meta_v1.GetOptions{})
if err != nil {
return workload{}, err
}
return makeFluxHelmReleaseWorkload(fluxHelmRelease), nil
}

func (fhr *fluxHelmReleaseKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) {
func (fhr *fluxHelmReleaseKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
fluxHelmReleases, err := c.client.HelmV1alpha2().FluxHelmReleases(namespace).List(meta_v1.ListOptions{})
if err != nil {
return nil, err
Expand Down Expand Up @@ -444,15 +475,21 @@ func createK8sFHRContainers(values map[string]interface{}) []apiv1.Container {

type helmReleaseKind struct{}

func (hr *helmReleaseKind) getWorkload(c *Cluster, namespace, name string) (workload, error) {
func (hr *helmReleaseKind) getWorkload(ctx context.Context, c *Cluster, namespace, name string) (workload, error) {
if err := ctx.Err(); err != nil {
return workload{}, err
}
helmRelease, err := c.client.FluxV1beta1().HelmReleases(namespace).Get(name, meta_v1.GetOptions{})
if err != nil {
return workload{}, err
}
return makeHelmReleaseWorkload(helmRelease), nil
}

func (hr *helmReleaseKind) getWorkloads(c *Cluster, namespace string) ([]workload, error) {
func (hr *helmReleaseKind) getWorkloads(ctx context.Context, c *Cluster, namespace string) ([]workload, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
helmReleases, err := c.client.FluxV1beta1().HelmReleases(namespace).List(meta_v1.ListOptions{})
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion cluster/kubernetes/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kubernetes

import (
"bytes"
"context"
"crypto/sha1"
"crypto/sha256"
"encoding/base64"
Expand Down Expand Up @@ -292,7 +293,7 @@ func (c *Cluster) listAllowedResources(
}

// List resources only from the allowed namespaces
namespaces, err := c.getAllowedAndExistingNamespaces()
namespaces, err := c.getAllowedAndExistingNamespaces(context.Background())
if err != nil {
return nil, err
}
Expand Down
19 changes: 10 additions & 9 deletions cluster/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mock

import (
"bytes"
"context"

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
Expand All @@ -14,11 +15,11 @@ import (

// Doubles as a cluster.Cluster and cluster.Manifests implementation
type Mock struct {
AllWorkloadsFunc func(maybeNamespace string) ([]cluster.Workload, error)
SomeWorkloadsFunc func([]flux.ResourceID) ([]cluster.Workload, error)
AllWorkloadsFunc func(ctx context.Context, maybeNamespace string) ([]cluster.Workload, error)
SomeWorkloadsFunc func(ctx context.Context, ids []flux.ResourceID) ([]cluster.Workload, error)
IsAllowedResourceFunc func(flux.ResourceID) bool
PingFunc func() error
ExportFunc func() ([]byte, error)
ExportFunc func(ctx context.Context) ([]byte, error)
SyncFunc func(cluster.SyncSet) error
PublicSSHKeyFunc func(regenerate bool) (ssh.PublicKey, error)
SetWorkloadContainerImageFunc func(def []byte, id flux.ResourceID, container string, newImageID image.Ref) ([]byte, error)
Expand All @@ -33,12 +34,12 @@ type Mock struct {
var _ cluster.Cluster = &Mock{}
var _ manifests.Manifests = &Mock{}

func (m *Mock) AllWorkloads(maybeNamespace string) ([]cluster.Workload, error) {
return m.AllWorkloadsFunc(maybeNamespace)
func (m *Mock) AllWorkloads(ctx context.Context, maybeNamespace string) ([]cluster.Workload, error) {
return m.AllWorkloadsFunc(ctx, maybeNamespace)
}

func (m *Mock) SomeWorkloads(s []flux.ResourceID) ([]cluster.Workload, error) {
return m.SomeWorkloadsFunc(s)
func (m *Mock) SomeWorkloads(ctx context.Context, ids []flux.ResourceID) ([]cluster.Workload, error) {
return m.SomeWorkloadsFunc(ctx, ids)
}

func (m *Mock) IsAllowedResource(id flux.ResourceID) bool {
Expand All @@ -49,8 +50,8 @@ func (m *Mock) Ping() error {
return m.PingFunc()
}

func (m *Mock) Export() ([]byte, error) {
return m.ExportFunc()
func (m *Mock) Export(ctx context.Context) ([]byte, error) {
return m.ExportFunc(ctx)
}

func (m *Mock) Sync(c cluster.SyncSet) error {
Expand Down
Loading