diff --git a/internal/aws/containerinsight/const.go b/internal/aws/containerinsight/const.go index 8d837aac96da..9497047fe8ae 100644 --- a/internal/aws/containerinsight/const.go +++ b/internal/aws/containerinsight/const.go @@ -93,6 +93,14 @@ const ( StatusConditionNetworkUnavailable = "status_condition_network_unavailable" StatusCapacityPods = "status_capacity_pods" StatusAllocatablePods = "status_allocatable_pods" + StatusNumberAvailable = "status_number_available" + StatusNumberUnavailable = "status_number_unavailable" + StatusDesiredNumberScheduled = "status_desired_number_scheduled" + StatusCurrentNumberScheduled = "status_current_number_scheduled" + StatusReplicasAvailable = "status_replicas_available" + StatusReplicasUnavailable = "status_replicas_unavailable" + StatusReplicas = "status_replicas" + SpecReplicas = "spec_replicas" StatusRunning = "status_running" StatusTerminated = "status_terminated" StatusWaiting = "status_waiting" @@ -115,23 +123,25 @@ const ( DiskIOTotal = "Total" // Define the metric types - TypeCluster = "Cluster" - TypeClusterService = "ClusterService" - TypeClusterNamespace = "ClusterNamespace" - TypeService = "Service" - TypeInstance = "Instance" // mean EC2 Instance in ECS - TypeNode = "Node" // mean EC2 Instance in EKS - TypeInstanceFS = "InstanceFS" - TypeNodeFS = "NodeFS" - TypeInstanceNet = "InstanceNet" - TypeNodeNet = "NodeNet" - TypeInstanceDiskIO = "InstanceDiskIO" - TypeNodeDiskIO = "NodeDiskIO" - TypePod = "Pod" - TypePodNet = "PodNet" - TypeContainer = "Container" - TypeContainerFS = "ContainerFS" - TypeContainerDiskIO = "ContainerDiskIO" + TypeCluster = "Cluster" + TypeClusterService = "ClusterService" + TypeClusterDeployment = "ClusterDeployment" + TypeClusterDaemonSet = "ClusterDaemonSet" + TypeClusterNamespace = "ClusterNamespace" + TypeService = "Service" + TypeInstance = "Instance" // mean EC2 Instance in ECS + TypeNode = "Node" // mean EC2 Instance in EKS + TypeInstanceFS = "InstanceFS" + TypeNodeFS = "NodeFS" + TypeInstanceNet = "InstanceNet" + TypeNodeNet = "NodeNet" + TypeInstanceDiskIO = "InstanceDiskIO" + TypeNodeDiskIO = "NodeDiskIO" + TypePod = "Pod" + TypePodNet = "PodNet" + TypeContainer = "Container" + TypeContainerFS = "ContainerFS" + TypeContainerDiskIO = "ContainerDiskIO" // Special type for pause container // because containerd does not set container name pause container name to POD like docker does. TypeInfraContainer = "InfraContainer" @@ -213,7 +223,7 @@ func init() { FSInodesfree: UnitCount, FSUtilization: UnitPercent, - // status metrics + // status & spec metrics StatusConditionReady: UnitCount, StatusConditionDiskPressure: UnitCount, StatusConditionMemoryPressure: UnitCount, @@ -221,6 +231,14 @@ func init() { StatusConditionNetworkUnavailable: UnitCount, StatusCapacityPods: UnitCount, StatusAllocatablePods: UnitCount, + StatusReplicas: UnitCount, + StatusReplicasAvailable: UnitCount, + StatusReplicasUnavailable: UnitCount, + StatusNumberAvailable: UnitCount, + StatusNumberUnavailable: UnitCount, + StatusDesiredNumberScheduled: UnitCount, + StatusCurrentNumberScheduled: UnitCount, + SpecReplicas: UnitCount, // kube-state-metrics equivalents StatusRunning: UnitCount, diff --git a/internal/aws/containerinsight/utils.go b/internal/aws/containerinsight/utils.go index 5cf90a014bb3..237d162fc21f 100644 --- a/internal/aws/containerinsight/utils.go +++ b/internal/aws/containerinsight/utils.go @@ -106,6 +106,8 @@ func getPrefixByMetricType(mType string) string { service := "service_" cluster := "cluster_" namespace := "namespace_" + deployment := "deployment_" + daemonSet := "daemonset_" switch mType { case TypeInstance: @@ -142,6 +144,10 @@ func getPrefixByMetricType(mType string) string { prefix = service case TypeClusterNamespace: prefix = namespace + case TypeClusterDeployment: + prefix = deployment + case TypeClusterDaemonSet: + prefix = daemonSet default: log.Printf("E! Unexpected MetricType: %s", mType) } diff --git a/internal/aws/k8s/k8sclient/clientset.go b/internal/aws/k8s/k8sclient/clientset.go index 66abc3f79fd6..dd857d099305 100644 --- a/internal/aws/k8s/k8sclient/clientset.go +++ b/internal/aws/k8s/k8sclient/clientset.go @@ -194,6 +194,16 @@ type replicaSetClientWithStopper interface { stopper } +type deploymentClientWithStopper interface { + DeploymentClient + stopper +} + +type daemonSetClientWithStopper interface { + DaemonSetClient + stopper +} + type K8sClient struct { kubeConfigPath string initSyncPollInterval time.Duration @@ -221,6 +231,12 @@ type K8sClient struct { rsMu sync.Mutex replicaSet replicaSetClientWithStopper + dMu sync.Mutex + deployment deploymentClientWithStopper + + dsMu sync.Mutex + daemonSet daemonSetClientWithStopper + logger *zap.Logger } @@ -264,6 +280,8 @@ func (c *K8sClient) init(logger *zap.Logger, options ...Option) error { c.node = nil c.job = nil c.replicaSet = nil + c.deployment = nil + c.daemonSet = nil return nil } @@ -357,6 +375,46 @@ func (c *K8sClient) ShutdownReplicaSetClient() { }) } +func (c *K8sClient) GetDeploymentClient() DeploymentClient { + var err error + c.dMu.Lock() + if c.deployment == nil || reflect.ValueOf(c.deployment).IsNil() { + c.deployment, err = newDeploymentClient(c.clientSet, c.logger, deploymentSyncCheckerOption(c.syncChecker)) + if err != nil { + c.logger.Error("use an no-op deployment client instead because of error", zap.Error(err)) + c.deployment = &noOpDeploymentClient{} + } + } + c.dMu.Unlock() + return c.deployment +} + +func (c *K8sClient) ShutdownDeploymentClient() { + shutdownClient(c.deployment, &c.dMu, func() { + c.deployment = nil + }) +} + +func (c *K8sClient) GetDaemonSetClient() DaemonSetClient { + var err error + c.dsMu.Lock() + if c.daemonSet == nil || reflect.ValueOf(c.daemonSet).IsNil() { + c.daemonSet, err = newDaemonSetClient(c.clientSet, c.logger, daemonSetSyncCheckerOption(c.syncChecker)) + if err != nil { + c.logger.Error("use an no-op daemonSet client instead because of error", zap.Error(err)) + c.daemonSet = &noOpDaemonSetClient{} + } + } + c.dsMu.Unlock() + return c.daemonSet +} + +func (c *K8sClient) ShutdownDaemonSetClient() { + shutdownClient(c.daemonSet, &c.dsMu, func() { + c.daemonSet = nil + }) +} + func (c *K8sClient) GetClientSet() kubernetes.Interface { return c.clientSet } @@ -371,6 +429,8 @@ func (c *K8sClient) Shutdown() { c.ShutdownNodeClient() c.ShutdownJobClient() c.ShutdownReplicaSetClient() + c.ShutdownDeploymentClient() + c.ShutdownDaemonSetClient() // remove the current instance of k8s client from map for key, val := range optionsToK8sClient { diff --git a/internal/aws/k8s/k8sclient/daemonset.go b/internal/aws/k8s/k8sclient/daemonset.go new file mode 100644 index 000000000000..06c79b0a04db --- /dev/null +++ b/internal/aws/k8s/k8sclient/daemonset.go @@ -0,0 +1,151 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package k8sclient // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" + +import ( + "context" + "fmt" + "go.uber.org/zap" + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "sync" +) + +type DaemonSetClient interface { + // DaemonSetInfos contains the information about each daemon set in the cluster + DaemonSetInfos() []*DaemonSetInfo +} + +type noOpDaemonSetClient struct { +} + +func (nd *noOpDaemonSetClient) DaemonSetInfos() []*DaemonSetInfo { + return []*DaemonSetInfo{} +} + +func (nd *noOpDaemonSetClient) shutdown() { +} + +type daemonSetClientOption func(*daemonSetClient) + +func daemonSetSyncCheckerOption(checker initialSyncChecker) daemonSetClientOption { + return func(d *daemonSetClient) { + d.syncChecker = checker + } +} + +type daemonSetClient struct { + stopChan chan struct{} + stopped bool + + store *ObjStore + + syncChecker initialSyncChecker + + mu sync.RWMutex + daemonSetInfos []*DaemonSetInfo +} + +func (d *daemonSetClient) refresh() { + d.mu.Lock() + defer d.mu.Unlock() + + var daemonSetInfos []*DaemonSetInfo + objsList := d.store.List() + for _, obj := range objsList { + daemonSet, ok := obj.(*DaemonSetInfo) + if !ok { + continue + } + daemonSetInfos = append(daemonSetInfos, daemonSet) + } + + d.daemonSetInfos = daemonSetInfos +} + +func (d *daemonSetClient) DaemonSetInfos() []*DaemonSetInfo { + if d.store.GetResetRefreshStatus() { + d.refresh() + } + d.mu.RLock() + defer d.mu.RUnlock() + return d.daemonSetInfos +} + +func newDaemonSetClient(clientSet kubernetes.Interface, logger *zap.Logger, options ...daemonSetClientOption) (*daemonSetClient, error) { + d := &daemonSetClient{ + stopChan: make(chan struct{}), + } + + for _, option := range options { + option(d) + } + + ctx := context.Background() + if _, err := clientSet.AppsV1().DaemonSets(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}); err != nil { + return nil, fmt.Errorf("cannot list DaemonSets. err: %w", err) + } + + d.store = NewObjStore(transformFuncDaemonSet, logger) + lw := createDaemonSetListWatch(clientSet, metav1.NamespaceAll) + reflector := cache.NewReflector(lw, &appsv1.DaemonSet{}, d.store, 0) + + go reflector.Run(d.stopChan) + + if d.syncChecker != nil { + // check the init sync for potential connection issue + d.syncChecker.Check(reflector, "DaemonSet initial sync timeout") + } + + return d, nil +} + +func (d *daemonSetClient) shutdown() { + close(d.stopChan) + d.stopped = true +} + +func transformFuncDaemonSet(obj interface{}) (interface{}, error) { + daemonSet, ok := obj.(*appsv1.DaemonSet) + if !ok { + return nil, fmt.Errorf("input obj %v is not DaemonSet type", obj) + } + info := new(DaemonSetInfo) + info.Name = daemonSet.Name + info.Namespace = daemonSet.Namespace + info.Status = &DaemonSetStatus{ + NumberAvailable: uint32(daemonSet.Status.NumberAvailable), + NumberUnavailable: uint32(daemonSet.Status.NumberUnavailable), + DesiredNumberScheduled: uint32(daemonSet.Status.DesiredNumberScheduled), + CurrentNumberScheduled: uint32(daemonSet.Status.CurrentNumberScheduled), + } + return info, nil +} + +func createDaemonSetListWatch(client kubernetes.Interface, ns string) cache.ListerWatcher { + ctx := context.Background() + return &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + return client.AppsV1().DaemonSets(ns).List(ctx, opts) + }, + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + return client.AppsV1().DaemonSets(ns).Watch(ctx, opts) + }, + } +} diff --git a/internal/aws/k8s/k8sclient/daemonset_info.go b/internal/aws/k8s/k8sclient/daemonset_info.go new file mode 100644 index 000000000000..ba31301a974f --- /dev/null +++ b/internal/aws/k8s/k8sclient/daemonset_info.go @@ -0,0 +1,28 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package k8sclient // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" + +type DaemonSetInfo struct { + Name string + Namespace string + Status *DaemonSetStatus +} + +type DaemonSetStatus struct { + NumberAvailable uint32 + NumberUnavailable uint32 + DesiredNumberScheduled uint32 + CurrentNumberScheduled uint32 +} diff --git a/internal/aws/k8s/k8sclient/daemonset_test.go b/internal/aws/k8s/k8sclient/daemonset_test.go new file mode 100644 index 000000000000..99c91f94c07e --- /dev/null +++ b/internal/aws/k8s/k8sclient/daemonset_test.go @@ -0,0 +1,94 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package k8sclient + +import ( + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/fake" + "testing" +) + +var daemonSetObjects = []runtime.Object{ + &appsv1.DaemonSet{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-daemonset-1", + Namespace: "test-namespace", + UID: types.UID("test-daemonset-1-uid"), + }, + Status: appsv1.DaemonSetStatus{ + NumberAvailable: 5, + NumberUnavailable: 3, + DesiredNumberScheduled: 2, + CurrentNumberScheduled: 1, + }, + }, + &appsv1.DaemonSet{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-daemonset-2", + Namespace: "test-namespace", + UID: types.UID("test-daemonset-2-uid"), + }, + Status: appsv1.DaemonSetStatus{ + NumberAvailable: 10, + NumberUnavailable: 4, + DesiredNumberScheduled: 7, + CurrentNumberScheduled: 7, + }, + }, +} + +func TestDaemonSetClient(t *testing.T) { + options := daemonSetSyncCheckerOption(&mockReflectorSyncChecker{}) + + fakeClientSet := fake.NewSimpleClientset(daemonSetObjects...) + client, _ := newDaemonSetClient(fakeClientSet, zap.NewNop(), options) + + daemonSets := make([]interface{}, len(daemonSetObjects)) + for i := range daemonSetObjects { + daemonSets[i] = daemonSetObjects[i] + } + assert.NoError(t, client.store.Replace(daemonSets, "")) + + expected := []*DaemonSetInfo{ + { + Name: "test-daemonset-1", + Namespace: "test-namespace", + Status: &DaemonSetStatus{ + NumberAvailable: 5, + NumberUnavailable: 3, + DesiredNumberScheduled: 2, + CurrentNumberScheduled: 1, + }, + }, + { + Name: "test-daemonset-2", + Namespace: "test-namespace", + Status: &DaemonSetStatus{ + NumberAvailable: 10, + NumberUnavailable: 4, + DesiredNumberScheduled: 7, + CurrentNumberScheduled: 7, + }, + }, + } + actual := client.DaemonSetInfos() + assert.Equal(t, expected, actual) + client.shutdown() + assert.True(t, client.stopped) +} diff --git a/internal/aws/k8s/k8sclient/deployment.go b/internal/aws/k8s/k8sclient/deployment.go new file mode 100644 index 000000000000..967ea35cfa0c --- /dev/null +++ b/internal/aws/k8s/k8sclient/deployment.go @@ -0,0 +1,153 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package k8sclient // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" + +import ( + "context" + "fmt" + "go.uber.org/zap" + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "sync" +) + +type DeploymentClient interface { + // DeploymentInfos contains the information about each deployment in the cluster + DeploymentInfos() []*DeploymentInfo +} + +type noOpDeploymentClient struct { +} + +func (nd *noOpDeploymentClient) DeploymentInfos() []*DeploymentInfo { + return []*DeploymentInfo{} +} + +func (nd *noOpDeploymentClient) shutdown() { +} + +type deploymentClientOption func(*deploymentClient) + +func deploymentSyncCheckerOption(checker initialSyncChecker) deploymentClientOption { + return func(d *deploymentClient) { + d.syncChecker = checker + } +} + +type deploymentClient struct { + stopChan chan struct{} + stopped bool + + store *ObjStore + + syncChecker initialSyncChecker + + mu sync.RWMutex + deploymentInfos []*DeploymentInfo +} + +func (d *deploymentClient) refresh() { + d.mu.Lock() + defer d.mu.Unlock() + + var deploymentInfos []*DeploymentInfo + objsList := d.store.List() + for _, obj := range objsList { + deployment, ok := obj.(*DeploymentInfo) + if !ok { + continue + } + deploymentInfos = append(deploymentInfos, deployment) + } + + d.deploymentInfos = deploymentInfos +} + +func (d *deploymentClient) DeploymentInfos() []*DeploymentInfo { + if d.store.GetResetRefreshStatus() { + d.refresh() + } + d.mu.RLock() + defer d.mu.RUnlock() + return d.deploymentInfos +} + +func newDeploymentClient(clientSet kubernetes.Interface, logger *zap.Logger, options ...deploymentClientOption) (*deploymentClient, error) { + d := &deploymentClient{ + stopChan: make(chan struct{}), + } + + for _, option := range options { + option(d) + } + + ctx := context.Background() + if _, err := clientSet.AppsV1().Deployments(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}); err != nil { + return nil, fmt.Errorf("cannot list Deployments. err: %w", err) + } + + d.store = NewObjStore(transformFuncDeployment, logger) + lw := createDeploymentListWatch(clientSet, metav1.NamespaceAll) + reflector := cache.NewReflector(lw, &appsv1.Deployment{}, d.store, 0) + + go reflector.Run(d.stopChan) + + if d.syncChecker != nil { + // check the init sync for potential connection issue + d.syncChecker.Check(reflector, "Deployment initial sync timeout") + } + + return d, nil +} + +func (d *deploymentClient) shutdown() { + close(d.stopChan) + d.stopped = true +} + +func transformFuncDeployment(obj interface{}) (interface{}, error) { + deployment, ok := obj.(*appsv1.Deployment) + if !ok { + return nil, fmt.Errorf("input obj %v is not Deployment type", obj) + } + info := new(DeploymentInfo) + info.Name = deployment.Name + info.Namespace = deployment.Namespace + info.Spec = &DeploymentSpec{ + Replicas: uint32(*deployment.Spec.Replicas), + } + info.Status = &DeploymentStatus{ + Replicas: uint32(deployment.Status.Replicas), + AvailableReplicas: uint32(deployment.Status.AvailableReplicas), + UnavailableReplicas: uint32(deployment.Status.UnavailableReplicas), + } + return info, nil +} + +func createDeploymentListWatch(client kubernetes.Interface, ns string) cache.ListerWatcher { + ctx := context.Background() + return &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + return client.AppsV1().Deployments(ns).List(ctx, opts) + }, + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + return client.AppsV1().Deployments(ns).Watch(ctx, opts) + }, + } +} diff --git a/internal/aws/k8s/k8sclient/deployment_info.go b/internal/aws/k8s/k8sclient/deployment_info.go new file mode 100644 index 000000000000..a9f2b0713bb4 --- /dev/null +++ b/internal/aws/k8s/k8sclient/deployment_info.go @@ -0,0 +1,32 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package k8sclient // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sclient" + +type DeploymentInfo struct { + Name string + Namespace string + Spec *DeploymentSpec + Status *DeploymentStatus +} + +type DeploymentSpec struct { + Replicas uint32 +} + +type DeploymentStatus struct { + Replicas uint32 + AvailableReplicas uint32 + UnavailableReplicas uint32 +} diff --git a/internal/aws/k8s/k8sclient/deployment_test.go b/internal/aws/k8s/k8sclient/deployment_test.go new file mode 100644 index 000000000000..72a2291a41a8 --- /dev/null +++ b/internal/aws/k8s/k8sclient/deployment_test.go @@ -0,0 +1,104 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package k8sclient + +import ( + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/fake" + "testing" +) + +var desired = int32(20) + +var deploymentObjects = []runtime.Object{ + &appsv1.Deployment{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-deployment-1", + Namespace: "test-namespace", + UID: types.UID("test-deployment-1-uid"), + }, + Spec: appsv1.DeploymentSpec{ + Replicas: &desired, + }, + Status: appsv1.DeploymentStatus{ + Replicas: 5, + AvailableReplicas: 5, + UnavailableReplicas: 1, + }, + }, + &appsv1.Deployment{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-deployment-2", + Namespace: "test-namespace", + UID: types.UID("test-deployment-2-uid"), + }, + Spec: appsv1.DeploymentSpec{ + Replicas: &desired, + }, + Status: appsv1.DeploymentStatus{ + Replicas: 15, + AvailableReplicas: 15, + UnavailableReplicas: 2, + }, + }, +} + +func TestDeploymentClient(t *testing.T) { + options := deploymentSyncCheckerOption(&mockReflectorSyncChecker{}) + + fakeClientSet := fake.NewSimpleClientset(deploymentObjects...) + client, _ := newDeploymentClient(fakeClientSet, zap.NewNop(), options) + + deployments := make([]interface{}, len(deploymentObjects)) + for i := range deploymentObjects { + deployments[i] = deploymentObjects[i] + } + assert.NoError(t, client.store.Replace(deployments, "")) + + expected := []*DeploymentInfo{ + { + Name: "test-deployment-1", + Namespace: "test-namespace", + Spec: &DeploymentSpec{ + Replicas: 20, + }, + Status: &DeploymentStatus{ + Replicas: 5, + AvailableReplicas: 5, + UnavailableReplicas: 1, + }, + }, + { + Name: "test-deployment-2", + Namespace: "test-namespace", + Spec: &DeploymentSpec{ + Replicas: 20, + }, + Status: &DeploymentStatus{ + Replicas: 15, + AvailableReplicas: 15, + UnavailableReplicas: 2, + }, + }, + } + actual := client.DeploymentInfos() + assert.Equal(t, expected, actual) + client.shutdown() + assert.True(t, client.stopped) +} diff --git a/receiver/awscontainerinsightreceiver/README.md b/receiver/awscontainerinsightreceiver/README.md index ce1b1e93c797..e8cb9965c612 100644 --- a/receiver/awscontainerinsightreceiver/README.md +++ b/receiver/awscontainerinsightreceiver/README.md @@ -399,6 +399,58 @@ kubectl apply -f config.yaml



+### Cluster Deployment +| Metric | Unit | +|----------------------------------------|-------| +| deployment_spec_replicas | Count | +| deployment_status_replicas | Count | +| deployment_status_replicas_available | Count | +| deployment_status_replicas_unavailable | Count | + + +

+| Resource Attribute | +|--------------------| +| ClusterName | +| NodeName | +| Namespace | +| PodName | +| Type | +| Timestamp | +| Version | +| Sources | +| kubernetes | + + +

+

+ +### Cluster DaemonSet +| Metric | Unit | +|-------------------------------------------|-------| +| daemonset_status_number_available | Count | +| daemonset_status_number_unavailable | Count | +| daemonset_status_desired_number_scheduled | Count | +| daemonset_status_current_number_scheduled | Count | + + +

+| Resource Attribute | +|--------------------| +| ClusterName | +| NodeName | +| Namespace | +| PodName | +| Type | +| Timestamp | +| Version | +| Sources | +| kubernetes | + + +

+

+ ### Node | Metric | Unit | |-------------------------------------------|--------------| diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go index 4ee9a8d81c83..ae1ba132ffc3 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go @@ -59,8 +59,12 @@ type K8sClient interface { GetEpClient() k8sclient.EpClient GetNodeClient() k8sclient.NodeClient GetPodClient() k8sclient.PodClient + GetDeploymentClient() k8sclient.DeploymentClient + GetDaemonSetClient() k8sclient.DaemonSetClient ShutdownNodeClient() ShutdownPodClient() + ShutdownDeploymentClient() + ShutdownDaemonSetClient() } // K8sAPIServer is a struct that produces metrics from kubernetes api server @@ -75,10 +79,12 @@ type K8sAPIServer struct { leaderLockName string leaderLockUsingConfigMapOnly bool - k8sClient K8sClient // *k8sclient.K8sClient - epClient k8sclient.EpClient - nodeClient k8sclient.NodeClient - podClient k8sclient.PodClient + k8sClient K8sClient // *k8sclient.K8sClient + epClient k8sclient.EpClient + nodeClient k8sclient.NodeClient + podClient k8sclient.PodClient + deploymentClient k8sclient.DeploymentClient + daemonSetClient k8sclient.DaemonSetClient // the following can be set to mocks in testing broadcaster eventBroadcaster @@ -149,6 +155,16 @@ func (k *K8sAPIServer) GetMetrics() []pmetric.Metrics { k.logger.Info("collect data from K8s API Server...") timestampNs := strconv.FormatInt(time.Now().UnixNano(), 10) + result = append(result, k.getClusterMetrics(clusterName, timestampNs)) + result = append(result, k.getNamespaceMetrics(clusterName, timestampNs)...) + result = append(result, k.getDeploymentMetrics(clusterName, timestampNs)...) + result = append(result, k.getDaemonSetMetrics(clusterName, timestampNs)...) + result = append(result, k.getServiceMetrics(clusterName, timestampNs)...) + + return result +} + +func (k *K8sAPIServer) getClusterMetrics(clusterName, timestampNs string) pmetric.Metrics { fields := map[string]interface{}{ "cluster_failed_node_count": k.nodeClient.ClusterFailedNodeCount(), "cluster_node_count": k.nodeClient.ClusterNodeCount(), @@ -163,52 +179,117 @@ func (k *K8sAPIServer) GetMetrics() []pmetric.Metrics { attributes["NodeName"] = k.nodeName } attributes[ci.SourcesKey] = "[\"apiserver\"]" - md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) - result = append(result, md) + return ci.ConvertToOTLPMetrics(fields, attributes, k.logger) +} - for service, podNum := range k.epClient.ServiceToPodNum() { +func (k *K8sAPIServer) getNamespaceMetrics(clusterName, timestampNs string) []pmetric.Metrics { + var metrics []pmetric.Metrics + for namespace, podNum := range k.podClient.NamespaceToRunningPodNum() { fields := map[string]interface{}{ - "service_number_of_running_pods": podNum, + "namespace_number_of_running_pods": podNum, } attributes := map[string]string{ ci.ClusterNameKey: clusterName, - ci.MetricType: ci.TypeClusterService, + ci.MetricType: ci.TypeClusterNamespace, ci.Timestamp: timestampNs, - ci.TypeService: service.ServiceName, - ci.K8sNamespace: service.Namespace, + ci.K8sNamespace: namespace, ci.Version: "0", } if k.nodeName != "" { attributes["NodeName"] = k.nodeName } attributes[ci.SourcesKey] = "[\"apiserver\"]" - attributes[ci.Kubernetes] = fmt.Sprintf("{\"namespace_name\":\"%s\",\"service_name\":\"%s\"}", - service.Namespace, service.ServiceName) + attributes[ci.Kubernetes] = fmt.Sprintf("{\"namespace_name\":\"%s\"}", namespace) md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) - result = append(result, md) + metrics = append(metrics, md) } + return metrics +} - for namespace, podNum := range k.podClient.NamespaceToRunningPodNum() { +func (k *K8sAPIServer) getDeploymentMetrics(clusterName, timestampNs string) []pmetric.Metrics { + var metrics []pmetric.Metrics + deployments := k.deploymentClient.DeploymentInfos() + for _, deployment := range deployments { fields := map[string]interface{}{ - "namespace_number_of_running_pods": podNum, + ci.MetricName(ci.TypeClusterDeployment, ci.SpecReplicas): deployment.Spec.Replicas, // deployment_spec_replicas + ci.MetricName(ci.TypeClusterDeployment, ci.StatusReplicas): deployment.Status.Replicas, // deployment_status_replicas + ci.MetricName(ci.TypeClusterDeployment, ci.StatusReplicasAvailable): deployment.Status.AvailableReplicas, // deployment_status_replicas_available + ci.MetricName(ci.TypeClusterDeployment, ci.StatusReplicasUnavailable): deployment.Status.UnavailableReplicas, // deployment_status_replicas_unavailable } attributes := map[string]string{ ci.ClusterNameKey: clusterName, - ci.MetricType: ci.TypeClusterNamespace, + ci.MetricType: ci.TypeClusterDeployment, ci.Timestamp: timestampNs, - ci.K8sNamespace: namespace, + ci.PodNameKey: deployment.Name, + ci.K8sNamespace: deployment.Namespace, ci.Version: "0", } if k.nodeName != "" { - attributes["NodeName"] = k.nodeName + attributes[ci.NodeNameKey] = k.nodeName } attributes[ci.SourcesKey] = "[\"apiserver\"]" - attributes[ci.Kubernetes] = fmt.Sprintf("{\"namespace_name\":\"%s\"}", namespace) + //attributes[ci.Kubernetes] = fmt.Sprintf("{\"namespace_name\":\"%s\",\"deployment_name\":\"%s\"}", + // deployment.Namespace, deployment.Name) md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) - result = append(result, md) + metrics = append(metrics, md) } + return metrics +} - return result +func (k *K8sAPIServer) getDaemonSetMetrics(clusterName, timestampNs string) []pmetric.Metrics { + var metrics []pmetric.Metrics + daemonSets := k.daemonSetClient.DaemonSetInfos() + for _, daemonSet := range daemonSets { + fields := map[string]interface{}{ + ci.MetricName(ci.TypeClusterDaemonSet, ci.StatusNumberAvailable): daemonSet.Status.NumberAvailable, // daemonset_status_number_available + ci.MetricName(ci.TypeClusterDaemonSet, ci.StatusNumberUnavailable): daemonSet.Status.NumberUnavailable, // daemonset_status_number_unavailable + ci.MetricName(ci.TypeClusterDaemonSet, ci.StatusDesiredNumberScheduled): daemonSet.Status.DesiredNumberScheduled, // daemonset_status_desired_number_scheduled + ci.MetricName(ci.TypeClusterDaemonSet, ci.StatusCurrentNumberScheduled): daemonSet.Status.CurrentNumberScheduled, // daemonset_status_current_number_scheduled + } + attributes := map[string]string{ + ci.ClusterNameKey: clusterName, + ci.MetricType: ci.TypeClusterDaemonSet, + ci.Timestamp: timestampNs, + ci.PodNameKey: daemonSet.Name, + ci.K8sNamespace: daemonSet.Namespace, + ci.Version: "0", + } + if k.nodeName != "" { + attributes[ci.NodeNameKey] = k.nodeName + } + attributes[ci.SourcesKey] = "[\"apiserver\"]" + //attributes[ci.Kubernetes] = fmt.Sprintf("{\"namespace_name\":\"%s\",\"daemonset_name\":\"%s\"}", + // daemonSet.Namespace, daemonSet.Name) + md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) + metrics = append(metrics, md) + } + return metrics +} + +func (k *K8sAPIServer) getServiceMetrics(clusterName, timestampNs string) []pmetric.Metrics { + var metrics []pmetric.Metrics + for service, podNum := range k.epClient.ServiceToPodNum() { + fields := map[string]interface{}{ + "service_number_of_running_pods": podNum, + } + attributes := map[string]string{ + ci.ClusterNameKey: clusterName, + ci.MetricType: ci.TypeClusterService, + ci.Timestamp: timestampNs, + ci.TypeService: service.ServiceName, + ci.K8sNamespace: service.Namespace, + ci.Version: "0", + } + if k.nodeName != "" { + attributes["NodeName"] = k.nodeName + } + attributes[ci.SourcesKey] = "[\"apiserver\"]" + attributes[ci.Kubernetes] = fmt.Sprintf("{\"namespace_name\":\"%s\",\"service_name\":\"%s\"}", + service.Namespace, service.ServiceName) + md := ci.ConvertToOTLPMetrics(fields, attributes, k.logger) + metrics = append(metrics, md) + } + return metrics } func (k *K8sAPIServer) init() error { @@ -304,6 +385,8 @@ func (k *K8sAPIServer) startLeaderElection(ctx context.Context, lock resourceloc k.nodeClient = k.k8sClient.GetNodeClient() k.podClient = k.k8sClient.GetPodClient() k.epClient = k.k8sClient.GetEpClient() + k.deploymentClient = k.k8sClient.GetDeploymentClient() + k.daemonSetClient = k.k8sClient.GetDaemonSetClient() k.mu.Unlock() if k.isLeadingC != nil { @@ -333,9 +416,11 @@ func (k *K8sAPIServer) startLeaderElection(ctx context.Context, lock resourceloc k.mu.Lock() defer k.mu.Unlock() k.leading = false - // node and pod are only used for cluster level metrics, endpoint is used for decorator too. + // The following are only used for cluster level metrics, whereas endpoint is used for decorator too. k.k8sClient.ShutdownNodeClient() k.k8sClient.ShutdownPodClient() + k.k8sClient.ShutdownDeploymentClient() + k.k8sClient.ShutdownDaemonSetClient() }, OnNewLeader: func(identity string) { k.logger.Info(fmt.Sprintf("k8sapiserver Switch New Leader: %s", identity)) diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go index 5dad4bc93a3c..ac6a43f2d28b 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go @@ -60,6 +60,14 @@ func (m *mockK8sClient) GetPodClient() k8sclient.PodClient { return mockClient } +func (m *mockK8sClient) GetDeploymentClient() k8sclient.DeploymentClient { + return mockClient +} + +func (m *mockK8sClient) GetDaemonSetClient() k8sclient.DaemonSetClient { + return mockClient +} + func (m *mockK8sClient) ShutdownNodeClient() { } @@ -68,6 +76,14 @@ func (m *mockK8sClient) ShutdownPodClient() { } +func (m *mockK8sClient) ShutdownDeploymentClient() { + +} + +func (m *mockK8sClient) ShutdownDaemonSetClient() { + +} + type MockClient struct { k8sclient.PodClient k8sclient.NodeClient @@ -76,6 +92,18 @@ type MockClient struct { mock.Mock } +// k8sclient.DeploymentClient +func (client *MockClient) DeploymentInfos() []*k8sclient.DeploymentInfo { + args := client.Called() + return args.Get(0).([]*k8sclient.DeploymentInfo) +} + +// k8sclient.DaemonSetClient +func (client *MockClient) DaemonSetInfos() []*k8sclient.DaemonSetInfo { + args := client.Called() + return args.Get(0).([]*k8sclient.DaemonSetInfo) +} + // k8sclient.PodClient func (client *MockClient) NamespaceToRunningPodNum() map[string]int { args := client.Called() @@ -202,6 +230,32 @@ func TestK8sAPIServer_GetMetrics(t *testing.T) { NewService("service2", "kube-system"): 1, }, ) + mockClient.On("DeploymentInfos").Return([]*k8sclient.DeploymentInfo{ + { + Name: "deployment1", + Namespace: "kube-system", + Spec: &k8sclient.DeploymentSpec{ + Replicas: 10, + }, + Status: &k8sclient.DeploymentStatus{ + Replicas: 11, + AvailableReplicas: 9, + UnavailableReplicas: 2, + }, + }, + }) + mockClient.On("DaemonSetInfos").Return([]*k8sclient.DaemonSetInfo{ + { + Name: "daemonset1", + Namespace: "kube-system", + Status: &k8sclient.DaemonSetStatus{ + NumberAvailable: 10, + NumberUnavailable: 4, + DesiredNumberScheduled: 7, + CurrentNumberScheduled: 6, + }, + }, + }) <-k8sAPIServer.isLeadingC metrics := k8sAPIServer.GetMetrics() @@ -212,6 +266,8 @@ func TestK8sAPIServer_GetMetrics(t *testing.T) { tags: map[Service:service2 Timestamp:1557291396709 Type:ClusterService], fields: map[service_number_of_running_pods:1], tags: map[Service:service1 Timestamp:1557291396709 Type:ClusterService], fields: map[service_number_of_running_pods:1], tags: map[Namespace:default Timestamp:1557291396709 Type:ClusterNamespace], fields: map[namespace_number_of_running_pods:2], + tags: map[PodName:deployment1 Namespace:kube-system Timestamp:1557291396709 Type:ClusterDeployment], fields: map[deployment_spec_replicas:10 deployment_status_replicas:11 deployment_status_replicas_available:9 deployment_status_replicas_unavailable:2], + tags: map[PodName:daemonset1 Namespace:kube-system Timestamp:1557291396709 Type:ClusterDaemonSet], fields: map[daemonset_status_number_available:10 daemonset_status_number_unavailable:4 daemonset_status_desired_number_scheduled:7 daemonset_status_current_number_scheduled:6], */ for _, metric := range metrics { assert.Equal(t, "cluster-name", getStringAttrVal(metric, ci.ClusterNameKey)) @@ -227,6 +283,22 @@ func TestK8sAPIServer_GetMetrics(t *testing.T) { case ci.TypeClusterNamespace: assertMetricValueEqual(t, metric, "namespace_number_of_running_pods", int64(2)) assert.Equal(t, "default", getStringAttrVal(metric, ci.K8sNamespace)) + case ci.TypeClusterDeployment: + assertMetricValueEqual(t, metric, "deployment_spec_replicas", int64(10)) + assertMetricValueEqual(t, metric, "deployment_status_replicas", int64(11)) + assertMetricValueEqual(t, metric, "deployment_status_replicas_available", int64(9)) + assertMetricValueEqual(t, metric, "deployment_status_replicas_unavailable", int64(2)) + assert.Equal(t, "kube-system", getStringAttrVal(metric, ci.K8sNamespace)) + assert.Equal(t, "deployment1", getStringAttrVal(metric, ci.PodNameKey)) + assert.Equal(t, "ClusterDeployment", getStringAttrVal(metric, ci.MetricType)) + case ci.TypeClusterDaemonSet: + assertMetricValueEqual(t, metric, "daemonset_status_number_available", int64(10)) + assertMetricValueEqual(t, metric, "daemonset_status_number_unavailable", int64(4)) + assertMetricValueEqual(t, metric, "daemonset_status_desired_number_scheduled", int64(7)) + assertMetricValueEqual(t, metric, "daemonset_status_current_number_scheduled", int64(6)) + assert.Equal(t, "kube-system", getStringAttrVal(metric, ci.K8sNamespace)) + assert.Equal(t, "daemonset1", getStringAttrVal(metric, ci.PodNameKey)) + assert.Equal(t, "ClusterDaemonSet", getStringAttrVal(metric, ci.MetricType)) default: assert.Fail(t, "Unexpected metric type: "+metricType) }