Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Resiliency metrics #224

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion internal/aws/containerinsight/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,13 @@
GpuRequest = "gpu_request"
GpuReservedCapacity = "gpu_reserved_capacity"

UnschedulablePendingReplacementMetric = "unschedulable_pending_replacement"
UnschedulablePendingRebootMetric = "unschedulable_pending_reboot"
SchedulableMetric = "schedulable"
SchedulablePreferredMetric = "schedulable_preferred"
UnschedulableMetric = "unschedulable"
Unknown = "unknown"

// Define the metric types
TypeCluster = "Cluster"
TypeClusterService = "ClusterService"
Expand Down Expand Up @@ -179,6 +186,7 @@
TypeContainerEFA = "ContainerEFA"
TypePodEFA = "PodEFA"
TypeNodeEFA = "NodeEFA"
TypeHyperPodNode = "HyperPodNode"

// unit
UnitBytes = "Bytes"
Expand All @@ -202,6 +210,15 @@
"StartError": StatusContainerWaitingReasonStartError,
}

var ConditionToMetricName = map[string]string{
"UnschedulablePendingReplacement": UnschedulablePendingReplacementMetric,
"UnschedulablePendingReboot": UnschedulablePendingRebootMetric,
"Schedulable": SchedulableMetric,
"SchedulablePreferred": SchedulablePreferredMetric,
"Unschedulable": UnschedulableMetric,
"Unknown": Unknown,
}

var metricToUnitMap map[string]string

func init() {
Expand Down Expand Up @@ -327,8 +344,15 @@
EfaTxBytes: UnitBytesPerSec,

GpuLimit: UnitCount,
GpuUsageTotal: UnitCount,
GpuTotal: UnitCount,

Check failure on line 347 in internal/aws/containerinsight/const.go

View workflow job for this annotation

GitHub Actions / govulncheck (internal)

undefined: GpuTotal

Check failure on line 347 in internal/aws/containerinsight/const.go

View workflow job for this annotation

GitHub Actions / govulncheck (receiver-0)

undefined: GpuTotal

Check failure on line 347 in internal/aws/containerinsight/const.go

View workflow job for this annotation

GitHub Actions / unittest-matrix (1.22.5, ubuntu-latest, other)

undefined: GpuTotal

Check failure on line 347 in internal/aws/containerinsight/const.go

View workflow job for this annotation

GitHub Actions / unittest-matrix (1.22.5, ubuntu-latest, other)

undefined: GpuTotal

Check failure on line 347 in internal/aws/containerinsight/const.go

View workflow job for this annotation

GitHub Actions / unittest-matrix (1.21.11, ubuntu-latest, other)

undefined: GpuTotal

Check failure on line 347 in internal/aws/containerinsight/const.go

View workflow job for this annotation

GitHub Actions / unittest-matrix (1.21.11, ubuntu-latest, other)

undefined: GpuTotal
GpuRequest: UnitCount,
GpuReservedCapacity: UnitPercent,

UnschedulablePendingReplacementMetric: UnitCount,
UnschedulablePendingRebootMetric: UnitCount,
SchedulableMetric: UnitCount,
SchedulablePreferredMetric: UnitCount,
UnschedulableMetric: UnitCount,
Unknown: UnitCount,
}
}
3 changes: 3 additions & 0 deletions internal/aws/containerinsight/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func getPrefixByMetricType(mType string) string {
instanceNetPrefix := "instance_interface_"
nodeNetPrefix := "node_interface_"
nodeEfaPrefix := "node_efa_"
hyperPodNodeHealthStatus := "hyper_pod_node_health_status_"
podPrefix := "pod_"
podNetPrefix := "pod_interface_"
podEfaPrefix := "pod_efa_"
Expand Down Expand Up @@ -169,6 +170,8 @@ func getPrefixByMetricType(mType string) string {
prefix = statefulSet
case TypeClusterReplicaSet:
prefix = replicaSet
case TypeHyperPodNode:
prefix = hyperPodNodeHealthStatus
default:
log.Printf("E! Unexpected MetricType: %s", mType)
}
Expand Down
14 changes: 13 additions & 1 deletion internal/aws/k8s/k8sclient/clientset.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,17 @@ func CaptureNodeLevelInfo(captureNodeLevelInfo bool) Option {
}
}

// CaptureOnlyNodeLabelsInfo allows one to specify whether node label
// should be captured and retained in memory
func CaptureOnlyNodeLabelsInfo(captureOnlyNodeLabelInfo bool) Option {
return Option{
name: "captureOnlyNodeLabelInfo:" + strconv.FormatBool(captureOnlyNodeLabelInfo),
set: func(kc *K8sClient) {
kc.captureOnlyNodeLabelInfo = captureOnlyNodeLabelInfo
},
}
}

func getStringifiedOptions(options ...Option) string {
opts := make([]string, len(options))
for i, option := range options {
Expand Down Expand Up @@ -227,6 +238,7 @@ type K8sClient struct {

nodeSelector fields.Selector
captureNodeLevelInfo bool
captureOnlyNodeLabelInfo bool

jobMu sync.Mutex
job jobClientWithStopper
Expand Down Expand Up @@ -326,7 +338,7 @@ func (c *K8sClient) ShutdownPodClient() {
func (c *K8sClient) GetNodeClient() NodeClient {
c.nodeMu.Lock()
if c.node == nil {
opts := []nodeClientOption{nodeSyncCheckerOption(c.syncChecker), captureNodeLevelInfoOption(c.captureNodeLevelInfo)}
opts := []nodeClientOption{nodeSyncCheckerOption(c.syncChecker), captureNodeLevelInfoOption(c.captureNodeLevelInfo), captureOnlyNodeLabelInfoOption(c.captureOnlyNodeLabelInfo)}
if c.nodeSelector != nil {
opts = append(opts, nodeSelectorOption(c.nodeSelector))
}
Expand Down
46 changes: 44 additions & 2 deletions internal/aws/k8s/k8sclient/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"fmt"
"sync"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sutil"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -34,6 +35,7 @@
NodeToCapacityMap() map[string]v1.ResourceList
NodeToAllocatableMap() map[string]v1.ResourceList
NodeToConditionsMap() map[string]map[v1.NodeConditionType]v1.ConditionStatus
NodeToLabelsMap() map[string]map[Label]int8
}

type nodeClientOption func(*nodeClient)
Expand All @@ -56,6 +58,12 @@
}
}

func captureOnlyNodeLabelInfoOption(captureOnlyNodeLabelInfo bool) nodeClientOption {
return func(n *nodeClient) {
n.captureOnlyNodeLabelInfo = captureOnlyNodeLabelInfo
}
}

type nodeClient struct {
stopChan chan struct{}
store *ObjStore
Expand All @@ -69,14 +77,16 @@
// The node client can be used in several places, including code paths that execute on both leader and non-leader nodes.
// But for logic on the leader node (for ex in k8sapiserver.go), there is no need to obtain node level info since only cluster
// level info is needed there. Hence, this optimization allows us to save on memory by not capturing node level info when not needed.
captureNodeLevelInfo bool
captureNodeLevelInfo bool
captureOnlyNodeLabelInfo bool

mu sync.RWMutex
clusterFailedNodeCount int
clusterNodeCount int
nodeToCapacityMap map[string]v1.ResourceList
nodeToAllocatableMap map[string]v1.ResourceList
nodeToConditionsMap map[string]map[v1.NodeConditionType]v1.ConditionStatus
nodeToLabelsMap map[string]map[Label]int8
}

func (c *nodeClient) ClusterFailedNodeCount() int {
Expand Down Expand Up @@ -133,7 +143,20 @@
return c.nodeToConditionsMap
}

func (c *nodeClient) NodeToLabelsMap() map[string]map[Label]int8 {
if !c.captureOnlyNodeLabelInfo {
c.logger.Warn("trying to access node label info when captureOnlyNodeLabelInfo is not set, will return empty data")
}
if c.store.GetResetRefreshStatus() {
c.refresh()
}
c.mu.RLock()
defer c.mu.RUnlock()
return c.nodeToLabelsMap
}

func (c *nodeClient) refresh() {

c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -144,9 +167,10 @@
nodeToCapacityMap := make(map[string]v1.ResourceList)
nodeToAllocatableMap := make(map[string]v1.ResourceList)
nodeToConditionsMap := make(map[string]map[v1.NodeConditionType]v1.ConditionStatus)
nodeToLabelsMap := make(map[string]map[Label]int8)

for _, obj := range objsList {
node := obj.(*nodeInfo)
node := obj.(*NodeInfo)

Check failure on line 173 in internal/aws/k8s/k8sclient/node.go

View workflow job for this annotation

GitHub Actions / govulncheck (receiver-0)

undefined: NodeInfo

Check failure on line 173 in internal/aws/k8s/k8sclient/node.go

View workflow job for this annotation

GitHub Actions / unittest-matrix (1.22.5, ubuntu-latest, other)

undefined: NodeInfo

Check failure on line 173 in internal/aws/k8s/k8sclient/node.go

View workflow job for this annotation

GitHub Actions / unittest-matrix (1.22.5, ubuntu-latest, other)

undefined: NodeInfo

Check failure on line 173 in internal/aws/k8s/k8sclient/node.go

View workflow job for this annotation

GitHub Actions / unittest-matrix (1.21.11, ubuntu-latest, other)

undefined: NodeInfo

Check failure on line 173 in internal/aws/k8s/k8sclient/node.go

View workflow job for this annotation

GitHub Actions / unittest-matrix (1.21.11, ubuntu-latest, other)

undefined: NodeInfo

if c.captureNodeLevelInfo {
nodeToCapacityMap[node.name] = node.capacity
Expand All @@ -157,6 +181,13 @@
}
nodeToConditionsMap[node.name] = conditionsMap
}
if c.captureOnlyNodeLabelInfo {
labelsMap := make(map[Label]int8)
if HyperPodLabel, ok := node.labels[SageMakerNodeHealthStatus]; ok {
labelsMap[SageMakerNodeHealthStatus] = HyperPodLabel
nodeToLabelsMap[node.name] = labelsMap
}
}
clusterNodeCountNew++

failed := false
Expand All @@ -183,6 +214,7 @@
c.nodeToCapacityMap = nodeToCapacityMap
c.nodeToAllocatableMap = nodeToAllocatableMap
c.nodeToConditionsMap = nodeToConditionsMap
c.nodeToLabelsMap = nodeToLabelsMap
}

func newNodeClient(clientSet kubernetes.Interface, logger *zap.Logger, options ...nodeClientOption) *nodeClient {
Expand Down Expand Up @@ -227,6 +259,16 @@
info.capacity = node.Status.Capacity
info.allocatable = node.Status.Allocatable
info.conditions = []*NodeCondition{}

if sageMakerHealthStatus, ok := node.Labels[SageMakerNodeHealthStatus.String()]; ok {
info.labels = make(map[Label]int8)
if condition, ok := k8sutil.ParseString(sageMakerHealthStatus); ok {
info.labels[SageMakerNodeHealthStatus] = condition
} else {
info.labels[SageMakerNodeHealthStatus] = int8(k8sutil.Unknown)
}
}

for _, condition := range node.Status.Conditions {
info.conditions = append(info.conditions, &NodeCondition{
Type: condition.Type,
Expand Down
21 changes: 17 additions & 4 deletions internal/aws/k8s/k8sclient/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,24 @@ import (
v1 "k8s.io/api/core/v1"
)

type Label int8

const (
SageMakerNodeHealthStatus Label = iota
)

func (ct Label) String() string {
return [...]string{"sagemaker.amazonaws.com/node-health-status"}[ct]
}

type nodeInfo struct {
name string
conditions []*NodeCondition
capacity v1.ResourceList
allocatable v1.ResourceList
name string
conditions []*NodeCondition
capacity v1.ResourceList
allocatable v1.ResourceList
providerID string
instanceType string
labels map[Label]int8
}

type NodeCondition struct {
Expand Down
42 changes: 28 additions & 14 deletions internal/aws/k8s/k8sclient/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package k8sclient

import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/k8s/k8sutil"
"testing"
"time"

Expand All @@ -30,12 +31,14 @@ var nodeArray = []any{
Time: time.Now(),
},
Labels: map[string]string{
"kubernetes.io/arch": "amd64",
"beta.kubernetes.io/instance-type": "t3.medium",
"kubernetes.io/os": "linux",
"failure-domain.beta.kubernetes.io/region": "eu-west-1",
"failure-domain.beta.kubernetes.io/zone": "eu-west-1c",
"kubernetes.io/hostname": "ip-192-168-200-63.eu-west-1.compute.internal",
"kubernetes.io/arch": "amd64",
"beta.kubernetes.io/instance-type": "t3.medium",
"kubernetes.io/os": "linux",
"failure-domain.beta.kubernetes.io/region": "eu-west-1",
"failure-domain.beta.kubernetes.io/zone": "eu-west-1c",
"kubernetes.io/hostname": "ip-192-168-200-63.eu-west-1.compute.internal",
"node.kubernetes.io/instance-type": "t3.medium",
"sagemaker.amazonaws.com/node-health-status": "Schedulable",
},
Annotations: map[string]string{
"node.alpha.kubernetes.io/ttl": "0",
Expand Down Expand Up @@ -126,13 +129,14 @@ var nodeArray = []any{
Time: time.Now(),
},
Labels: map[string]string{
"kubernetes.io/os": "linux",
"failure-domain.beta.kubernetes.io/region": "eu-west-1",
"failure-domain.beta.kubernetes.io/zone": "eu-west-1a",
"kubernetes.io/hostname": "ip-192-168-76-61.eu-west-1.compute.internal",
"kubernetes.io/arch": "amd64",
"beta.kubernetes.io/instance-type": "t3.medium",
"node.kubernetes.io/instance-type": "t3.medium",
"kubernetes.io/os": "linux",
"failure-domain.beta.kubernetes.io/region": "eu-west-1",
"failure-domain.beta.kubernetes.io/zone": "eu-west-1a",
"kubernetes.io/hostname": "ip-192-168-76-61.eu-west-1.compute.internal",
"kubernetes.io/arch": "amd64",
"beta.kubernetes.io/instance-type": "t3.medium",
"node.kubernetes.io/instance-type": "t3.medium",
"sagemaker.amazonaws.com/node-health-status": "SchedulablePreferred",
},
Annotations: map[string]string{
"node.alpha.kubernetes.io/ttl": "0",
Expand Down Expand Up @@ -323,12 +327,14 @@ func TestNodeClient(t *testing.T) {
"nodeToCapacityMap": map[string]v1.ResourceList{}, // Node level info is not captured by default
"nodeToAllocatableMap": map[string]v1.ResourceList{}, // Node level info is not captured by default
"nodeToConditionsMap": map[string]map[v1.NodeConditionType]v1.ConditionStatus{}, // Node level info is not captured by default
"NodeToLabelsMap": map[string]map[Label]int8{},
},
},
"CaptureNodeLevelInfo": {
options: []nodeClientOption{
nodeSyncCheckerOption(&mockReflectorSyncChecker{}),
captureNodeLevelInfoOption(true),
captureOnlyNodeLabelInfoOption(true),
},
want: map[string]any{
"clusterNodeCount": 3,
Expand Down Expand Up @@ -375,6 +381,14 @@ func TestNodeClient(t *testing.T) {
"Ready": "False",
},
},
"NodeToLabelsMap": map[string]map[Label]int8{
"ip-192-168-200-63.eu-west-1.compute.internal": {
SageMakerNodeHealthStatus: int8(k8sutil.Schedulable),
},
"ip-192-168-76-61.eu-west-1.compute.internal": {
SageMakerNodeHealthStatus: int8(k8sutil.SchedulablePreferred),
},
},
},
},
}
Expand All @@ -389,7 +403,7 @@ func TestNodeClient(t *testing.T) {
require.Equal(t, testCase.want["nodeToCapacityMap"], client.NodeToCapacityMap())
require.Equal(t, testCase.want["nodeToAllocatableMap"], client.NodeToAllocatableMap())
require.Equal(t, testCase.want["nodeToConditionsMap"], client.NodeToConditionsMap())

require.Equal(t, testCase.want["NodeToLabelsMap"], client.NodeToLabelsMap())
client.shutdown()
assert.True(t, client.stopped)
})
Expand Down
34 changes: 34 additions & 0 deletions internal/aws/k8s/k8sutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,37 @@ func CreateContainerKey(namespace, podName, containerName string) string {
}
return fmt.Sprintf("namespace:%s,podName:%s,containerName:%s", namespace, podName, containerName)
}

type HyperPodConditionType int8

const (
Schedulable HyperPodConditionType = iota
SchedulablePreferred
UnschedulablePendingReplacement
UnschedulablePendingReboot
Unschedulable
Unknown
)

func (ct HyperPodConditionType) String() string {
return [...]string{"Schedulable", "SchedulablePreferred", "UnschedulablePendingReplacement", "UnschedulablePendingReboot", "Unschedulable", "Unknown"}[ct]
}

func (ct HyperPodConditionType) EnumIndex() int {
return int(ct)
}

var (
HyperPodConditionTypeMap = map[string]HyperPodConditionType{
"Schedulable": Schedulable,
"SchedulablePreferred": SchedulablePreferred,
"UnschedulablePendingReplacement": UnschedulablePendingReplacement,
"UnschedulablePendingReboot": UnschedulablePendingReboot,
"Unschedulable": Unschedulable,
}
)

func ParseString(str string) (int8, bool) {
c, ok := HyperPodConditionTypeMap[str]
return int8(c), ok
}
Loading
Loading