From 358f66c466626d995f0626355cb86ba5f054f308 Mon Sep 17 00:00:00 2001 From: Kyle Schochenmaier Date: Thu, 8 Apr 2021 19:59:20 -0500 Subject: [PATCH] change the annotation key since its a label as well now (#477) Change annotationInject to keyInjectStatus since it's used as an annotation as well as a label. --- connect-inject/annotations.go | 4 +- connect-inject/cleanup_resource.go | 281 ++++++++++++++++ connect-inject/cleanup_resource_ent_test.go | 324 +++++++++++++++++++ connect-inject/cleanup_resource_test.go | 338 ++++++++++++++++++++ connect-inject/endpoints_controller.go | 2 +- connect-inject/endpoints_controller_test.go | 4 +- connect-inject/handler.go | 6 +- connect-inject/handler_test.go | 14 +- 8 files changed, 958 insertions(+), 15 deletions(-) create mode 100644 connect-inject/cleanup_resource.go create mode 100644 connect-inject/cleanup_resource_ent_test.go create mode 100644 connect-inject/cleanup_resource_test.go diff --git a/connect-inject/annotations.go b/connect-inject/annotations.go index 8158fc6f7f30..48a2989761f0 100644 --- a/connect-inject/annotations.go +++ b/connect-inject/annotations.go @@ -1,9 +1,9 @@ package connectinject const ( - // annotationStatus is the key of the annotation that is added to + // keyInjectStatus is the key of the annotation+label that is added to // a pod after an injection is done. - annotationStatus = "consul.hashicorp.com/connect-inject-status" + keyInjectStatus = "consul.hashicorp.com/connect-inject-status" // annotationInject is the key of the annotation that controls whether // injection is explicitly enabled or disabled for a pod. This should diff --git a/connect-inject/cleanup_resource.go b/connect-inject/cleanup_resource.go new file mode 100644 index 000000000000..c5b1594a17a8 --- /dev/null +++ b/connect-inject/cleanup_resource.go @@ -0,0 +1,281 @@ +package connectinject + +import ( + "fmt" + "sync" + "time" + + "github.com/hashicorp/consul-k8s/consul" + capi "github.com/hashicorp/consul/api" + "github.com/hashicorp/go-hclog" + "golang.org/x/net/context" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +// CleanupResource implements Resource and is used to clean up Consul service +// instances that weren't deregistered when their pods were deleted. +// Usually the preStop hook in the pods handles this but during a force delete +// or OOM the preStop hook doesn't run. +type CleanupResource struct { + Log hclog.Logger + KubernetesClient kubernetes.Interface + // ConsulClient points at the agent on the same node as this pod. + ConsulClient *capi.Client + ReconcilePeriod time.Duration + Ctx context.Context + // ConsulScheme is the scheme to use when making API calls to Consul, + // i.e. "http" or "https". + ConsulScheme string + // ConsulPort is the port to make HTTP API calls to Consul agents on. + ConsulPort string + EnableConsulNamespaces bool + + lock sync.Mutex +} + +// Run starts the long-running Reconcile loop that runs on a timer. +func (c *CleanupResource) Run(stopCh <-chan struct{}) { + reconcileTimer := time.NewTimer(c.ReconcilePeriod) + defer reconcileTimer.Stop() + + for { + c.reconcile() + reconcileTimer.Reset(c.ReconcilePeriod) + + select { + case <-stopCh: + c.Log.Info("received stop signal, shutting down") + return + + case <-reconcileTimer.C: + // Fall through and continue the loop. + } + } +} + +// reconcile checks all registered Consul connect service instances and ensures +// the pod they represent is still running. If the pod is no longer running, +// it deregisters the service instance from its agent. +func (c *CleanupResource) reconcile() { + c.Log.Debug("starting reconcile") + + // currentPods is a map of currently existing pods. Keys are in the form + // "namespace/pod-name". Value doesn't matter since we're using this + // solely for quick "exists" checks. + // Use currentPodsKey() to construct the key when accessing this map. + currentPods := make(map[string]bool) + + // Gather needed data on nodes, services and pods. + kubeNodes, err := c.KubernetesClient.CoreV1().Nodes().List(c.Ctx, metav1.ListOptions{}) + if err != nil { + c.Log.Error("unable to get nodes", "error", err) + return + } + + // namespacesToServiceNames maps from Consul namespace to the list of service + // names registered in that namespace. + // If Consul namespaces are disabled, there will be only one key with value + // "", i.e. the empty string. + namespacesToServiceNames := make(map[string][]string) + if c.EnableConsulNamespaces { + namespaces, _, err := c.ConsulClient.Namespaces().List(nil) + if err != nil { + c.Log.Error("unable to get Consul namespaces", "error", err) + return + } + for _, ns := range namespaces { + namespacesToServiceNames[ns.Name] = nil + } + } else { + // This allows us to treat OSS the same as enterprise for the rest of + // the code path. + namespacesToServiceNames[""] = nil + } + + for ns := range namespacesToServiceNames { + serviceNames, _, err := c.ConsulClient.Catalog().Services(&capi.QueryOptions{Namespace: ns}) + if err != nil { + c.Log.Error("unable to get Consul services", "error", err) + return + } + namespacesToServiceNames[ns] = keys(serviceNames) + } + + podList, err := c.KubernetesClient.CoreV1().Pods(corev1.NamespaceAll).List(c.Ctx, + metav1.ListOptions{LabelSelector: keyInjectStatus}) + if err != nil { + c.Log.Error("unable to get pods", "error", err) + return + } + + // Build up our map of currently running pods. + for _, pod := range podList.Items { + currentPods[currentPodsKey(pod.Name, pod.Namespace)] = true + } + + // For each registered service, find the associated pod. + for ns, serviceNames := range namespacesToServiceNames { + for _, serviceName := range serviceNames { + serviceInstances, _, err := c.ConsulClient.Catalog().Service(serviceName, "", &capi.QueryOptions{Namespace: ns}) + if err != nil { + c.Log.Error("unable to get Consul service", "name", serviceName, "error", err) + return + } + for _, instance := range serviceInstances { + podName, hasPodMeta := instance.ServiceMeta[MetaKeyPodName] + k8sNamespace, hasNSMeta := instance.ServiceMeta[MetaKeyKubeNS] + if hasPodMeta && hasNSMeta { + + // Check if the instance matches a running pod. If not, deregister it. + if _, podExists := currentPods[currentPodsKey(podName, k8sNamespace)]; !podExists { + if !nodeInCluster(kubeNodes, instance.Node) { + c.Log.Debug("skipping deregistration because instance is from node not in this cluster", + "pod", podName, "id", instance.ServiceID, "ns", ns, "node", instance.Node) + continue + } + + c.Log.Info("found service instance from terminated pod still registered", "pod", podName, "id", instance.ServiceID, "ns", ns) + err := c.deregisterInstance(instance, instance.Address) + if err != nil { + c.Log.Error("unable to deregister service instance", "id", instance.ServiceID, "ns", ns, "error", err) + continue + } + c.Log.Info("service instance deregistered", "id", instance.ServiceID, "ns", ns) + } + } + } + } + } + + c.Log.Debug("finished reconcile") + return +} + +// Delete is called when a pod is deleted. It checks that the Consul service +// instance for that pod is no longer registered with Consul. +// If the instance is still registered, it deregisters it. +func (c *CleanupResource) Delete(key string, obj interface{}) error { + pod, ok := obj.(*corev1.Pod) + if !ok { + return fmt.Errorf("expected pod, got: %#v", obj) + } + if pod == nil { + return fmt.Errorf("object for key %s was nil", key) + } + serviceName, ok := pod.ObjectMeta.Annotations[annotationService] + if !ok { + return fmt.Errorf("pod did not have %s annotation", annotationService) + } + kubeNS := pod.Namespace + podName := pod.Name + // NOTE: This will be an empty string with Consul OSS. + consulNS := pod.ObjectMeta.Annotations[annotationConsulNamespace] + + // Look for both the service and its sidecar proxy. + consulServiceNames := []string{serviceName, fmt.Sprintf("%s-sidecar-proxy", serviceName)} + + for _, consulServiceName := range consulServiceNames { + instances, _, err := c.ConsulClient.Catalog().Service(consulServiceName, "", &capi.QueryOptions{ + Filter: fmt.Sprintf(`ServiceMeta[%q] == %q and ServiceMeta[%q] == %q`, MetaKeyPodName, podName, MetaKeyKubeNS, kubeNS), + Namespace: consulNS, + }) + if err != nil { + c.Log.Error("unable to get Consul Services", "error", err) + return err + } + if len(instances) == 0 { + c.Log.Debug("terminated pod had no still-registered instances", "service", consulServiceName, "pod", podName, "ns", consulNS) + continue + } + + // NOTE: We only expect a single instance because there's only one + // per pod but we may as well range over all of them just to be safe. + for _, instance := range instances { + // NOTE: We don't need to check if this instance belongs to a kube + // node in this cluster (like we do in Reconcile) because we only + // get the delete event if a pod in this cluster is deleted so + // we know this is one of our instances. + + c.Log.Info("found service instance from terminated pod still registered", "pod", podName, "id", instance.ServiceID, "ns", consulNS) + err := c.deregisterInstance(instance, pod.Status.HostIP) + if err != nil { + c.Log.Error("unable to deregister service instance", "id", instance.ServiceID, "error", err) + return err + } + c.Log.Info("service instance deregistered", "id", instance.ServiceID, "ns", consulNS) + } + } + return nil +} + +// Upsert is a no-op because we're only interested in pods that are deleted. +func (c *CleanupResource) Upsert(_ string, _ interface{}) error { + return nil +} + +func (c *CleanupResource) Informer() cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return c.KubernetesClient.CoreV1().Pods(metav1.NamespaceAll).List(c.Ctx, + metav1.ListOptions{LabelSelector: keyInjectStatus}) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return c.KubernetesClient.CoreV1().Pods(metav1.NamespaceAll).Watch(c.Ctx, + metav1.ListOptions{LabelSelector: keyInjectStatus}) + }, + }, + &corev1.Pod{}, + // Resync is 0 because we perform our own reconcile loop on our own timer. + 0, + cache.Indexers{}, + ) +} + +// deregisterInstance deregisters instance from Consul by calling the agent at +// hostIP's deregister service API. +func (c *CleanupResource) deregisterInstance(instance *capi.CatalogService, hostIP string) error { + fullAddr := fmt.Sprintf("%s://%s:%s", c.ConsulScheme, hostIP, c.ConsulPort) + localConfig := capi.DefaultConfig() + if instance.Namespace != "" { + localConfig.Namespace = instance.Namespace + } + localConfig.Address = fullAddr + client, err := consul.NewClient(localConfig) + if err != nil { + return fmt.Errorf("constructing client for address %q: %s", hostIP, err) + } + + return client.Agent().ServiceDeregister(instance.ServiceID) +} + +// currentPodsKey should be used to construct the key to access the +// currentPods map. +func currentPodsKey(podName, k8sNamespace string) string { + return fmt.Sprintf("%s/%s", k8sNamespace, podName) +} + +// nodeInCluster returns whether nodeName is the name of a node in nodes, i.e. +// it's the name of a node in this cluster. +func nodeInCluster(nodes *corev1.NodeList, nodeName string) bool { + for _, n := range nodes.Items { + if n.Name == nodeName { + return true + } + } + return false +} + +// keys returns the keys of m. +func keys(m map[string][]string) []string { + var ks []string + for k := range m { + ks = append(ks, k) + } + return ks +} diff --git a/connect-inject/cleanup_resource_ent_test.go b/connect-inject/cleanup_resource_ent_test.go new file mode 100644 index 000000000000..59613f74232f --- /dev/null +++ b/connect-inject/cleanup_resource_ent_test.go @@ -0,0 +1,324 @@ +// +build enterprise + +package connectinject + +import ( + "net/url" + "testing" + + capi "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/sdk/testutil" + "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" +) + +func TestReconcile_ConsulNamespaces(t *testing.T) { + t.Parallel() + + cases := map[string]struct { + ConsulServices []capi.AgentServiceRegistration + KubePods []runtime.Object + // ExpConsulServiceIDs maps from Consul namespace to + // list of expected service ids in that namespace. + ExpConsulServiceIDs map[string][]string + }{ + "default namespace, pod deleted": { + ConsulServices: []capi.AgentServiceRegistration{ + consulFooSvcDefaultNS, + }, + KubePods: nil, + ExpConsulServiceIDs: map[string][]string{ + "default": {"consul"}, + }, + }, + "default namespace, pod not deleted": { + ConsulServices: []capi.AgentServiceRegistration{ + consulFooSvcDefaultNS, + }, + KubePods: []runtime.Object{consulFooPodDefaultNS}, + ExpConsulServiceIDs: map[string][]string{ + "default": {"consul", "foo-abc123-foo"}, + }, + }, + "foo namespace, pod deleted": { + ConsulServices: []capi.AgentServiceRegistration{ + consulFooSvcFooNS, + }, + KubePods: nil, + ExpConsulServiceIDs: map[string][]string{ + "default": {"consul"}, + "foo": nil, + }, + }, + "foo namespace, pod not deleted": { + ConsulServices: []capi.AgentServiceRegistration{ + consulFooSvcFooNS, + }, + KubePods: []runtime.Object{consulFooPodFooNS}, + ExpConsulServiceIDs: map[string][]string{ + "default": {"consul"}, + "foo": {"foo-abc123-foo"}, + }, + }, + "does not delete instances with same id in different namespaces": { + ConsulServices: []capi.AgentServiceRegistration{ + consulFooSvcFooNS, + consulFooSvcBarNS, + }, + KubePods: []runtime.Object{consulFooPodFooNS}, + ExpConsulServiceIDs: map[string][]string{ + "default": {"consul"}, + "foo": {"foo-abc123-foo"}, + "bar": nil, + }, + }, + } + + for name, c := range cases { + t.Run(name, func(t *testing.T) { + require := require.New(t) + + // Start Consul server. + server, err := testutil.NewTestServerConfigT(t, nil) + defer server.Stop() + require.NoError(err) + server.WaitForSerfCheck(t) + consulClient, err := capi.NewClient(&capi.Config{Address: server.HTTPAddr}) + require.NoError(err) + + // Register Consul services. + for _, svc := range c.ConsulServices { + _, _, err := consulClient.Namespaces().Create(&capi.Namespace{ + Name: svc.Namespace, + }, nil) + require.NoError(err) + require.NoError(consulClient.Agent().ServiceRegister(&svc)) + } + + // Create the cleanup resource. + log := hclog.Default().Named("cleanupResource") + log.SetLevel(hclog.Debug) + consulURL, err := url.Parse("http://" + server.HTTPAddr) + require.NoError(err) + node := nodeName(t, consulClient) + // NOTE: we need to add the node because the reconciler checks if + // the node the service is registered with actually exists in this + // cluster. + kubeResources := append(c.KubePods, &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node, + }, + }) + cleanupResource := CleanupResource{ + Log: log, + KubernetesClient: fake.NewSimpleClientset(kubeResources...), + ConsulClient: consulClient, + ConsulScheme: consulURL.Scheme, + ConsulPort: consulURL.Port(), + EnableConsulNamespaces: true, + } + + // Run Reconcile. + cleanupResource.reconcile() + + // Test that the remaining services are what we expect. + for ns, expSvcs := range c.ExpConsulServiceIDs { + // Note: we need to use the catalog endpoints because + // Agent().Services() does not currently support namespaces + // (https://github.com/hashicorp/consul/issues/9710). + services, _, err := consulClient.Catalog().Services(&capi.QueryOptions{Namespace: ns}) + require.NoError(err) + + var actualServiceIDs []string + for actSvcName := range services { + services, _, err := consulClient.Catalog().Service(actSvcName, "", &capi.QueryOptions{Namespace: ns}) + require.NoError(err) + for _, actSvc := range services { + actualServiceIDs = append(actualServiceIDs, actSvc.ServiceID) + } + } + require.ElementsMatch(actualServiceIDs, expSvcs, "ns=%s act=%v", ns, actualServiceIDs) + } + }) + } +} + +func TestDelete_ConsulNamespaces(t *testing.T) { + t.Parallel() + + cases := map[string]struct { + Pod *corev1.Pod + ConsulServices []capi.AgentServiceRegistration + // ExpConsulServiceIDs maps from Consul namespace to + // list of expected service ids in that namespace. + ExpConsulServiceIDs map[string][]string + ExpErr string + }{ + "default namespace": { + ConsulServices: []capi.AgentServiceRegistration{ + consulFooSvcDefaultNS, + }, + Pod: consulFooPodDefaultNS, + ExpConsulServiceIDs: map[string][]string{ + "default": {"consul"}, + }, + }, + "foo namespace": { + ConsulServices: []capi.AgentServiceRegistration{ + consulFooSvcFooNS, + }, + Pod: consulFooPodFooNS, + ExpConsulServiceIDs: map[string][]string{ + "default": {"consul"}, + "foo": nil, + }, + }, + "does not delete instances with same id in different namespaces": { + ConsulServices: []capi.AgentServiceRegistration{ + consulFooSvcFooNS, + consulFooSvcBarNS, + }, + Pod: consulFooPodFooNS, + ExpConsulServiceIDs: map[string][]string{ + "default": {"consul"}, + "foo": nil, + "bar": {"foo-abc123-foo"}, + }, + }, + } + + for name, c := range cases { + t.Run(name, func(t *testing.T) { + require := require.New(t) + + // Start Consul server. + server, err := testutil.NewTestServerConfigT(t, nil) + defer server.Stop() + require.NoError(err) + server.WaitForSerfCheck(t) + consulClient, err := capi.NewClient(&capi.Config{Address: server.HTTPAddr}) + require.NoError(err) + + // Register Consul services. + for _, svc := range c.ConsulServices { + _, _, err := consulClient.Namespaces().Create(&capi.Namespace{ + Name: svc.Namespace, + }, nil) + require.NoError(err) + require.NoError(consulClient.Agent().ServiceRegister(&svc)) + } + + // Create the cleanup resource. + log := hclog.Default().Named("cleanupResource") + log.SetLevel(hclog.Debug) + consulURL, err := url.Parse("http://" + server.HTTPAddr) + require.NoError(err) + cleanupResource := CleanupResource{ + Log: log, + KubernetesClient: fake.NewSimpleClientset(), + ConsulClient: consulClient, + ConsulScheme: consulURL.Scheme, + ConsulPort: consulURL.Port(), + EnableConsulNamespaces: true, + } + + // Run Delete. + err = cleanupResource.Delete("default/foo", c.Pod) + if c.ExpErr != "" { + require.EqualError(err, c.ExpErr) + } else { + require.NoError(err) + + // Test that the remaining services are what we expect. + for ns, expSvcs := range c.ExpConsulServiceIDs { + // Note: we need to use the catalog endpoints because + // Agent().Services() does not currently support namespaces + // (https://github.com/hashicorp/consul/issues/9710). + services, _, err := consulClient.Catalog().Services(&capi.QueryOptions{Namespace: ns}) + require.NoError(err) + + var actualServiceIDs []string + for actSvcName := range services { + services, _, err := consulClient.Catalog().Service(actSvcName, "", &capi.QueryOptions{Namespace: ns}) + require.NoError(err) + for _, actSvc := range services { + actualServiceIDs = append(actualServiceIDs, actSvc.ServiceID) + } + } + require.ElementsMatch(actualServiceIDs, expSvcs, "ns=%s act=%v", ns, actualServiceIDs) + } + } + }) + } +} + +var ( + consulFooSvcDefaultNS = capi.AgentServiceRegistration{ + ID: "foo-abc123-foo", + Name: "foo", + Namespace: "default", + Address: "127.0.0.1", + Meta: map[string]string{ + MetaKeyPodName: "foo-abc123", + MetaKeyKubeNS: "default", + }, + } + consulFooSvcFooNS = capi.AgentServiceRegistration{ + ID: "foo-abc123-foo", + Name: "foo", + Namespace: "foo", + Address: "127.0.0.1", + Meta: map[string]string{ + MetaKeyPodName: "foo-abc123", + MetaKeyKubeNS: "default", + }, + } + consulFooSvcBarNS = capi.AgentServiceRegistration{ + ID: "foo-abc123-foo", + Name: "foo", + Namespace: "bar", + Address: "127.0.0.1", + Meta: map[string]string{ + MetaKeyPodName: "foo-abc123", + MetaKeyKubeNS: "bar", + }, + } + consulFooPodDefaultNS = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo-abc123", + Namespace: "default", + Labels: map[string]string{ + keyInjectStatus: injected, + }, + Annotations: map[string]string{ + keyInjectStatus: injected, + annotationService: "foo", + annotationConsulNamespace: "default", + }, + }, + Status: corev1.PodStatus{ + HostIP: "127.0.0.1", + }, + } + consulFooPodFooNS = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo-abc123", + Namespace: "default", + Labels: map[string]string{ + keyInjectStatus: injected, + }, + Annotations: map[string]string{ + keyInjectStatus: injected, + annotationService: "foo", + annotationConsulNamespace: "foo", + }, + }, + Status: corev1.PodStatus{ + HostIP: "127.0.0.1", + }, + } +) diff --git a/connect-inject/cleanup_resource_test.go b/connect-inject/cleanup_resource_test.go new file mode 100644 index 000000000000..b4a6face0970 --- /dev/null +++ b/connect-inject/cleanup_resource_test.go @@ -0,0 +1,338 @@ +package connectinject + +import ( + "net/url" + "testing" + + capi "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/sdk/testutil" + "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" +) + +func TestReconcile(t *testing.T) { + t.Parallel() + + cases := map[string]struct { + ConsulServices []capi.AgentServiceRegistration + KubePods []runtime.Object + ExpConsulServiceIDs []string + // OutOfClusterNode controls whether the services are registered on a + // node that does not exist in this Kube cluster. + OutOfClusterNode bool + }{ + "no instances running": { + ConsulServices: nil, + KubePods: nil, + ExpConsulServiceIDs: nil, + }, + "instance does not have pod-name meta key": { + ConsulServices: []capi.AgentServiceRegistration{consulNoPodNameMetaSvc}, + ExpConsulServiceIDs: []string{"foo-abc123-foo"}, + }, + "instance does not have k8s-namespace meta key": { + ConsulServices: []capi.AgentServiceRegistration{consulNoK8sNSMetaSvc}, + ExpConsulServiceIDs: []string{"foo-abc123-foo"}, + }, + "out of cluster node": { + ConsulServices: []capi.AgentServiceRegistration{consulFooSvc, consulFooSvcSidecar}, + ExpConsulServiceIDs: []string{"foo-abc123-foo", "foo-abc123-foo-sidecar-proxy"}, + OutOfClusterNode: true, + }, + "app and sidecar still running": { + ConsulServices: []capi.AgentServiceRegistration{consulFooSvc, consulFooSvcSidecar}, + KubePods: []runtime.Object{fooPod}, + ExpConsulServiceIDs: []string{"foo-abc123-foo", "foo-abc123-foo-sidecar-proxy"}, + }, + "app and sidecar terminated": { + ConsulServices: []capi.AgentServiceRegistration{consulFooSvc, consulFooSvcSidecar}, + KubePods: nil, + ExpConsulServiceIDs: nil, + }, + "only app is registered, no sidecar": { + ConsulServices: []capi.AgentServiceRegistration{consulFooSvc}, + KubePods: nil, + ExpConsulServiceIDs: nil, + }, + "only sidecar is registered, no app": { + ConsulServices: []capi.AgentServiceRegistration{consulFooSvcSidecar}, + KubePods: nil, + ExpConsulServiceIDs: nil, + }, + "multiple instances of the same service": { + ConsulServices: []capi.AgentServiceRegistration{ + consulFooSvc, + consulFooSvcSidecar, + consulFooSvcPod2, + consulFooSvcSidecarPod2, + }, + KubePods: []runtime.Object{fooPod}, + ExpConsulServiceIDs: []string{"foo-abc123-foo", "foo-abc123-foo-sidecar-proxy"}, + }, + } + + for name, c := range cases { + t.Run(name, func(t *testing.T) { + require := require.New(t) + + // Start Consul server. + server, err := testutil.NewTestServerConfigT(t, nil) + defer server.Stop() + require.NoError(err) + server.WaitForLeader(t) + consulClient, err := capi.NewClient(&capi.Config{Address: server.HTTPAddr}) + require.NoError(err) + + // Register Consul services. + for _, svc := range c.ConsulServices { + require.NoError(consulClient.Agent().ServiceRegister(&svc)) + } + + // Create the cleanup resource. + log := hclog.Default().Named("cleanupResource") + log.SetLevel(hclog.Debug) + consulURL, err := url.Parse("http://" + server.HTTPAddr) + require.NoError(err) + kubeResources := c.KubePods + if !c.OutOfClusterNode { + node := nodeName(t, consulClient) + // NOTE: we need to add the node because the reconciler checks if + // the node the service is registered with actually exists in this + // cluster. + kubeResources = append(kubeResources, &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node, + }, + }) + + } + cleanupResource := CleanupResource{ + Log: log, + KubernetesClient: fake.NewSimpleClientset(kubeResources...), + ConsulClient: consulClient, + ConsulScheme: consulURL.Scheme, + ConsulPort: consulURL.Port(), + } + + // Run Reconcile. + cleanupResource.reconcile() + + // Test that the remaining services are what we expect. + services, err := consulClient.Agent().Services() + require.NoError(err) + var actualServiceIDs []string + for id := range services { + actualServiceIDs = append(actualServiceIDs, id) + } + require.ElementsMatch(actualServiceIDs, c.ExpConsulServiceIDs) + }) + } +} + +func TestDelete(t *testing.T) { + t.Parallel() + + var nilPod *corev1.Pod + cases := map[string]struct { + Pod interface{} + ConsulServices []capi.AgentServiceRegistration + ExpConsulServiceIDs []string + ExpErr string + }{ + "pod is nil": { + Pod: nilPod, + ExpErr: "object for key default/foo was nil", + }, + "not expected type": { + Pod: &corev1.Service{}, + ExpErr: "expected pod, got: &v1.Service", + }, + "pod does not have service-name annotation": { + Pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo-abc123", + Namespace: "default", + }, + Status: corev1.PodStatus{ + HostIP: "127.0.0.1", + }, + }, + ExpErr: "pod did not have consul.hashicorp.com/connect-service annotation", + }, + "instance does not have pod-name meta": { + Pod: fooPod, + ConsulServices: []capi.AgentServiceRegistration{consulNoPodNameMetaSvc}, + ExpConsulServiceIDs: []string{"foo-abc123-foo"}, + }, + "instance does not have k8s-namespace meta": { + Pod: fooPod, + ConsulServices: []capi.AgentServiceRegistration{consulNoK8sNSMetaSvc}, + ExpConsulServiceIDs: []string{"foo-abc123-foo"}, + }, + "no instances still registered": { + Pod: fooPod, + ConsulServices: nil, + ExpConsulServiceIDs: nil, + }, + "app and sidecar terminated": { + Pod: fooPod, + ConsulServices: []capi.AgentServiceRegistration{consulFooSvc, consulFooSvcSidecar}, + ExpConsulServiceIDs: nil, + }, + "only app is registered, no sidecar": { + Pod: fooPod, + ConsulServices: []capi.AgentServiceRegistration{consulFooSvc}, + ExpConsulServiceIDs: nil, + }, + "only sidecar is registered, no app": { + Pod: fooPod, + ConsulServices: []capi.AgentServiceRegistration{consulFooSvcSidecar}, + ExpConsulServiceIDs: nil, + }, + "multiple instances of the same service": { + Pod: fooPod, + ConsulServices: []capi.AgentServiceRegistration{ + consulFooSvc, + consulFooSvcSidecar, + consulFooSvcPod2, + consulFooSvcSidecarPod2, + }, + ExpConsulServiceIDs: []string{"foo-def456-foo", "foo-def456-foo-sidecar-proxy"}, + }, + } + + for name, c := range cases { + t.Run(name, func(t *testing.T) { + require := require.New(t) + + // Start Consul server. + server, err := testutil.NewTestServerConfigT(t, nil) + defer server.Stop() + require.NoError(err) + server.WaitForLeader(t) + consulClient, err := capi.NewClient(&capi.Config{Address: server.HTTPAddr}) + require.NoError(err) + + // Register Consul services. + for _, svc := range c.ConsulServices { + require.NoError(consulClient.Agent().ServiceRegister(&svc)) + } + + // Create the cleanup resource. + log := hclog.Default().Named("cleanupResource") + log.SetLevel(hclog.Debug) + consulURL, err := url.Parse("http://" + server.HTTPAddr) + require.NoError(err) + cleanupResource := CleanupResource{ + Log: log, + KubernetesClient: fake.NewSimpleClientset(), + ConsulClient: consulClient, + ConsulScheme: consulURL.Scheme, + ConsulPort: consulURL.Port(), + } + + // Run Delete. + err = cleanupResource.Delete("default/foo", c.Pod) + if c.ExpErr != "" { + require.Error(err) + require.Contains(err.Error(), c.ExpErr) + } else { + require.NoError(err) + + // Test that the remaining services are what we expect. + services, err := consulClient.Agent().Services() + require.NoError(err) + var actualServiceIDs []string + for id := range services { + actualServiceIDs = append(actualServiceIDs, id) + } + require.ElementsMatch(actualServiceIDs, c.ExpConsulServiceIDs) + } + }) + } +} + +// nodeName returns the Consul node name for the agent that client +// points at. +func nodeName(t *testing.T, client *capi.Client) string { + self, err := client.Agent().Self() + require.NoError(t, err) + require.Contains(t, self, "Config") + require.Contains(t, self["Config"], "NodeName") + return self["Config"]["NodeName"].(string) +} + +var ( + consulFooSvc = capi.AgentServiceRegistration{ + ID: "foo-abc123-foo", + Name: "foo", + Address: "127.0.0.1", + Meta: map[string]string{ + MetaKeyPodName: "foo-abc123", + MetaKeyKubeNS: "default", + }, + } + consulFooSvcSidecar = capi.AgentServiceRegistration{ + ID: "foo-abc123-foo-sidecar-proxy", + Name: "foo-sidecar-proxy", + Address: "127.0.0.1", + Meta: map[string]string{ + MetaKeyPodName: "foo-abc123", + MetaKeyKubeNS: "default", + }, + } + consulFooSvcPod2 = capi.AgentServiceRegistration{ + ID: "foo-def456-foo", + Name: "foo", + Address: "127.0.0.1", + Meta: map[string]string{ + MetaKeyPodName: "foo-def456", + MetaKeyKubeNS: "default", + }, + } + consulFooSvcSidecarPod2 = capi.AgentServiceRegistration{ + ID: "foo-def456-foo-sidecar-proxy", + Name: "foo-sidecar-proxy", + Address: "127.0.0.1", + Meta: map[string]string{ + MetaKeyPodName: "foo-def456", + MetaKeyKubeNS: "default", + }, + } + consulNoPodNameMetaSvc = capi.AgentServiceRegistration{ + ID: "foo-abc123-foo", + Name: "foo", + Address: "127.0.0.1", + Meta: map[string]string{ + MetaKeyKubeNS: "default", + }, + } + consulNoK8sNSMetaSvc = capi.AgentServiceRegistration{ + ID: "foo-abc123-foo", + Name: "foo", + Address: "127.0.0.1", + Meta: map[string]string{ + MetaKeyPodName: "foo-abc123", + }, + } + fooPod = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo-abc123", + Namespace: "default", + Labels: map[string]string{ + keyInjectStatus: injected, + }, + Annotations: map[string]string{ + keyInjectStatus: injected, + annotationService: "foo", + }, + }, + Status: corev1.PodStatus{ + HostIP: "127.0.0.1", + }, + } +) diff --git a/connect-inject/endpoints_controller.go b/connect-inject/endpoints_controller.go index 4c1dd996508f..f80cf7e6ab1e 100644 --- a/connect-inject/endpoints_controller.go +++ b/connect-inject/endpoints_controller.go @@ -614,7 +614,7 @@ func (r EndpointsController) requestsForRunningAgentPods(object client.Object) [ // hasBeenInjected checks the value of the status annotation and returns true if the Pod has been injected. func hasBeenInjected(pod corev1.Pod) bool { - if anno, ok := pod.Annotations[annotationStatus]; ok { + if anno, ok := pod.Annotations[keyInjectStatus]; ok { if anno == injected { return true } diff --git a/connect-inject/endpoints_controller_test.go b/connect-inject/endpoints_controller_test.go index 73c2aa02c1e8..f3bf5cf7c2c8 100644 --- a/connect-inject/endpoints_controller_test.go +++ b/connect-inject/endpoints_controller_test.go @@ -2503,8 +2503,8 @@ func createPod(name, ip string, inject bool) *corev1.Pod { }, } if inject { - pod.Labels[annotationStatus] = injected - pod.Annotations[annotationStatus] = injected + pod.Labels[keyInjectStatus] = injected + pod.Annotations[keyInjectStatus] = injected } return pod } diff --git a/connect-inject/handler.go b/connect-inject/handler.go index f82bf5fdae92..cc1480f62fe3 100644 --- a/connect-inject/handler.go +++ b/connect-inject/handler.go @@ -228,7 +228,7 @@ func (h *Handler) Handle(_ context.Context, req admission.Request) admission.Res // pod.Annotations has already been initialized by h.defaultAnnotations() // and does not need to be checked for being a nil value. - pod.Annotations[annotationStatus] = injected + pod.Annotations[keyInjectStatus] = injected // Add annotations for metrics. if err = h.prometheusAnnotations(&pod); err != nil { @@ -239,7 +239,7 @@ func (h *Handler) Handle(_ context.Context, req admission.Request) admission.Res if pod.Labels == nil { pod.Labels = make(map[string]string) } - pod.Labels[annotationStatus] = injected + pod.Labels[keyInjectStatus] = injected // Consul-ENT only: Add the Consul destination namespace as an annotation to the pod. if h.EnableNamespaces { @@ -294,7 +294,7 @@ func (h *Handler) shouldInject(pod corev1.Pod, namespace string) (bool, error) { } // If we already injected then don't inject again - if pod.Annotations[annotationStatus] != "" { + if pod.Annotations[keyInjectStatus] != "" { return false, nil } diff --git a/connect-inject/handler_test.go b/connect-inject/handler_test.go index 8b104b11aa6d..b4a181da6579 100644 --- a/connect-inject/handler_test.go +++ b/connect-inject/handler_test.go @@ -75,7 +75,7 @@ func TestHandlerHandle(t *testing.T) { Object: encodeRaw(t, &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ - annotationStatus: injected, + keyInjectStatus: injected, }, }, Spec: basicSpec, @@ -155,7 +155,7 @@ func TestHandlerHandle(t *testing.T) { }, { Operation: "add", - Path: "/metadata/annotations/" + escapeJSONPointer(annotationStatus), + Path: "/metadata/annotations/" + escapeJSONPointer(keyInjectStatus), }, { Operation: "add", @@ -240,7 +240,7 @@ func TestHandlerHandle(t *testing.T) { }, { Operation: "add", - Path: "/metadata/annotations/" + escapeJSONPointer(annotationStatus), + Path: "/metadata/annotations/" + escapeJSONPointer(keyInjectStatus), }, { Operation: "add", @@ -285,7 +285,7 @@ func TestHandlerHandle(t *testing.T) { }, { Operation: "add", - Path: "/metadata/annotations/" + escapeJSONPointer(annotationStatus), + Path: "/metadata/annotations/" + escapeJSONPointer(keyInjectStatus), }, { Operation: "add", @@ -334,7 +334,7 @@ func TestHandlerHandle(t *testing.T) { }, { Operation: "add", - Path: "/metadata/labels/" + escapeJSONPointer(annotationStatus), + Path: "/metadata/labels/" + escapeJSONPointer(keyInjectStatus), }, }, }, @@ -386,7 +386,7 @@ func TestHandlerHandle(t *testing.T) { }, { Operation: "add", - Path: "/metadata/annotations/" + escapeJSONPointer(annotationStatus), + Path: "/metadata/annotations/" + escapeJSONPointer(keyInjectStatus), }, { Operation: "add", @@ -406,7 +406,7 @@ func TestHandlerHandle(t *testing.T) { }, { Operation: "add", - Path: "/metadata/labels/" + escapeJSONPointer(annotationStatus), + Path: "/metadata/labels/" + escapeJSONPointer(keyInjectStatus), }, }, },