Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Introduce operational metrics in OpenTelemetry Collector #4860

Merged
merged 29 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
Here is an overview of all new **experimental** features:

- **General**: Add support for formula based evaluation of metric values ([#2440](https://github.com/kedacore/keda/issues/2440)|[#4998](https://github.com/kedacore/keda/pull/4998))
- **General**: Introduce operational metrics in OpenTelemetry Collector ([#3078](https://github.com/kedacore/keda/issues/3078))

### Improvements
- **General**: Add apiserver Prometheus metrics to KEDA Metric Server ([#4460](https://github.com/kedacore/keda/issues/4460))
Expand Down
18 changes: 9 additions & 9 deletions apis/keda/v1alpha1/scaledobject_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

prommetrics "github.com/kedacore/keda/v2/pkg/prommetrics/webhook"
metricscollector "github.com/kedacore/keda/v2/pkg/metricscollector/webhook"
)

var scaledobjectlog = logf.Log.WithName("scaledobject-validation-webhook")
Expand Down Expand Up @@ -95,7 +95,7 @@ func isRemovingFinalizer(so *ScaledObject, old runtime.Object) bool {
}

func validateWorkload(so *ScaledObject, action string) (admission.Warnings, error) {
prommetrics.RecordScaledObjectValidatingTotal(so.Namespace, action)
metricscollector.RecordScaledObjectValidatingTotal(so.Namespace, action)

verifyFunctions := []func(*ScaledObject, string) error{
verifyCPUMemoryScalers,
Expand All @@ -119,7 +119,7 @@ func verifyTriggers(incomingSo *ScaledObject, action string) error {
err := ValidateTriggers(incomingSo.Spec.Triggers)
if err != nil {
scaledobjectlog.WithValues("name", incomingSo.Name).Error(err, "validation error")
prommetrics.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "incorrect-triggers")
metricscollector.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "incorrect-triggers")
}
return err
}
Expand Down Expand Up @@ -170,7 +170,7 @@ func verifyHpas(incomingSo *ScaledObject, action string) error {
} else {
err = fmt.Errorf("the workload '%s' of type '%s' is already managed by the hpa '%s'", incomingSo.Spec.ScaleTargetRef.Name, incomingSoGckr.GVKString(), hpa.Name)
scaledobjectlog.Error(err, "validation error")
prommetrics.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "other-hpa")
metricscollector.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "other-hpa")
return err
}
}
Expand Down Expand Up @@ -212,7 +212,7 @@ func verifyScaledObjects(incomingSo *ScaledObject, action string) error {
so.Spec.ScaleTargetRef.Name == incomingSo.Spec.ScaleTargetRef.Name {
err = fmt.Errorf("the workload '%s' of type '%s' is already managed by the ScaledObject '%s'", so.Spec.ScaleTargetRef.Name, incomingSoGckr.GVKString(), so.Name)
scaledobjectlog.Error(err, "validation error")
prommetrics.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "other-scaled-object")
metricscollector.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "other-scaled-object")
return err
}
}
Expand All @@ -222,7 +222,7 @@ func verifyScaledObjects(incomingSo *ScaledObject, action string) error {
_, err = ValidateAndCompileScalingModifiers(incomingSo)
if err != nil {
scaledobjectlog.Error(err, "error validating ScalingModifiers")
prommetrics.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "scaling-modifiers")
metricscollector.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "scaling-modifiers")

return err
}
Expand Down Expand Up @@ -275,7 +275,7 @@ func verifyCPUMemoryScalers(incomingSo *ScaledObject, action string) error {
container.Resources.Requests.Cpu().AsApproximateFloat64() == 0 {
err := fmt.Errorf("the scaledobject has a cpu trigger but the container %s doesn't have the cpu request defined", container.Name)
scaledobjectlog.Error(err, "validation error")
prommetrics.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "missing-requests")
metricscollector.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "missing-requests")
return err
}
} else if trigger.Type == memoryString {
Expand All @@ -284,7 +284,7 @@ func verifyCPUMemoryScalers(incomingSo *ScaledObject, action string) error {
container.Resources.Requests.Memory().AsApproximateFloat64() == 0 {
err := fmt.Errorf("the scaledobject has a memory trigger but the container %s doesn't have the memory request defined", container.Name)
scaledobjectlog.Error(err, "validation error")
prommetrics.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "missing-requests")
metricscollector.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "missing-requests")
return err
}
}
Expand All @@ -304,7 +304,7 @@ func verifyCPUMemoryScalers(incomingSo *ScaledObject, action string) error {
if (scaleToZeroErr && incomingSo.Spec.MinReplicaCount == nil) || (scaleToZeroErr && *incomingSo.Spec.MinReplicaCount == 0) {
err := fmt.Errorf("scaledobject has only cpu/memory triggers AND minReplica is 0 (scale to zero doesn't work in this case)")
scaledobjectlog.Error(err, "validation error")
prommetrics.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "scale-to-zero-requirements-not-met")
metricscollector.RecordScaledObjectValidatingErrors(incomingSo.Namespace, action, "scale-to-zero-requirements-not-met")
return err
}
}
Expand Down
12 changes: 11 additions & 1 deletion cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
kedacontrollers "github.com/kedacore/keda/v2/controllers/keda"
"github.com/kedacore/keda/v2/pkg/certificates"
"github.com/kedacore/keda/v2/pkg/k8s"
"github.com/kedacore/keda/v2/pkg/metricscollector"
"github.com/kedacore/keda/v2/pkg/metricsservice"
"github.com/kedacore/keda/v2/pkg/scaling"
kedautil "github.com/kedacore/keda/v2/pkg/util"
Expand All @@ -61,6 +62,8 @@ func init() {
}

func main() {
var enablePrometheusMetrics bool
var enableOpenTelemetryMetrics bool
var metricsAddr string
var probeAddr string
var metricsServiceAddr string
Expand All @@ -75,7 +78,9 @@ func main() {
var webhooksServiceName string
var enableCertRotation bool
var validatingWebhookName string
pflag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
pflag.BoolVar(&enablePrometheusMetrics, "enable-prometheus-metrics", true, "Enable the prometheus metric of keda-operator.")
pflag.BoolVar(&enableOpenTelemetryMetrics, "enable-opentelemetry-metrics", false, "Enable the opentelemetry metric of keda-operator.")
pflag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the prometheus metric endpoint binds to.")
pflag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
pflag.StringVar(&metricsServiceAddr, "metrics-service-bind-address", ":9666", "The address the gRPRC Metrics Service endpoint binds to.")
pflag.BoolVar(&enableLeaderElection, "leader-elect", false,
Expand Down Expand Up @@ -127,6 +132,11 @@ func main() {
cfg.Burst = adapterClientRequestBurst
cfg.DisableCompression = disableCompression

if !enablePrometheusMetrics {
metricsAddr = "0"
}
metricscollector.NewMetricsCollectors(enablePrometheusMetrics, enableOpenTelemetryMetrics)

mgr, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme,
Metrics: server.Options{
Expand Down
16 changes: 16 additions & 0 deletions config/e2e/patch_operator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,19 @@
- op: replace
path: /spec/template/spec/containers/0/resources/requests/cpu
value: 500m

- op: add
path: /spec/template/spec/containers/0/args/-
value: --enable-opentelemetry-metrics=true

- op: add
path: /spec/template/spec/containers/0/env/-
value:
name: OTEL_EXPORTER_OTLP_ENDPOINT
value: "http://opentelemetry-collector.default.svc.cluster.local:4318"

- op: add
path: /spec/template/spec/containers/0/env/-
value:
name: OTEL_METRIC_EXPORT_INTERVAL
value: "3000"
8 changes: 4 additions & 4 deletions controllers/keda/clustertriggerauthentication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/prommetrics"
"github.com/kedacore/keda/v2/pkg/metricscollector"
)

// ClusterTriggerAuthenticationReconciler reconciles a ClusterTriggerAuthentication object
Expand Down Expand Up @@ -97,10 +97,10 @@ func (r *ClusterTriggerAuthenticationReconciler) updatePromMetrics(clusterTrigge
defer clusterTriggerAuthPromMetricsLock.Unlock()

if metricsData, ok := clusterTriggerAuthPromMetricsMap[namespacedName]; ok {
prommetrics.DecrementCRDTotal(prommetrics.ClusterTriggerAuthenticationResource, metricsData.namespace)
metricscollector.DecrementCRDTotal(metricscollector.ClusterTriggerAuthenticationResource, metricsData.namespace)
}

prommetrics.IncrementCRDTotal(prommetrics.ClusterTriggerAuthenticationResource, clusterTriggerAuth.Namespace)
metricscollector.IncrementCRDTotal(metricscollector.ClusterTriggerAuthenticationResource, clusterTriggerAuth.Namespace)
clusterTriggerAuthPromMetricsMap[namespacedName] = clusterTriggerAuthMetricsData{namespace: clusterTriggerAuth.Namespace}
}

Expand All @@ -110,7 +110,7 @@ func (r *ClusterTriggerAuthenticationReconciler) UpdatePromMetricsOnDelete(names
defer clusterTriggerAuthPromMetricsLock.Unlock()

if metricsData, ok := clusterTriggerAuthPromMetricsMap[namespacedName]; ok {
prommetrics.DecrementCRDTotal(prommetrics.ClusterTriggerAuthenticationResource, metricsData.namespace)
metricscollector.DecrementCRDTotal(metricscollector.ClusterTriggerAuthenticationResource, metricsData.namespace)
}

delete(clusterTriggerAuthPromMetricsMap, namespacedName)
Expand Down
14 changes: 7 additions & 7 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/prommetrics"
"github.com/kedacore/keda/v2/pkg/metricscollector"
"github.com/kedacore/keda/v2/pkg/scaling"
kedastatus "github.com/kedacore/keda/v2/pkg/status"
)
Expand Down Expand Up @@ -327,18 +327,18 @@ func (r *ScaledJobReconciler) updatePromMetrics(scaledJob *kedav1alpha1.ScaledJo
metricsData, ok := scaledJobPromMetricsMap[namespacedName]

if ok {
prommetrics.DecrementCRDTotal(prommetrics.ScaledJobResource, metricsData.namespace)
metricscollector.DecrementCRDTotal(metricscollector.ScaledJobResource, metricsData.namespace)
for _, triggerType := range metricsData.triggerTypes {
prommetrics.DecrementTriggerTotal(triggerType)
metricscollector.DecrementTriggerTotal(triggerType)
}
}

prommetrics.IncrementCRDTotal(prommetrics.ScaledJobResource, scaledJob.Namespace)
metricscollector.IncrementCRDTotal(metricscollector.ScaledJobResource, scaledJob.Namespace)
metricsData.namespace = scaledJob.Namespace

triggerTypes := make([]string, len(scaledJob.Spec.Triggers))
for _, trigger := range scaledJob.Spec.Triggers {
prommetrics.IncrementTriggerTotal(trigger.Type)
metricscollector.IncrementTriggerTotal(trigger.Type)
triggerTypes = append(triggerTypes, trigger.Type)
}
metricsData.triggerTypes = triggerTypes
Expand All @@ -351,9 +351,9 @@ func (r *ScaledJobReconciler) updatePromMetricsOnDelete(namespacedName string) {
defer scaledJobPromMetricsLock.Unlock()

if metricsData, ok := scaledJobPromMetricsMap[namespacedName]; ok {
prommetrics.DecrementCRDTotal(prommetrics.ScaledJobResource, metricsData.namespace)
metricscollector.DecrementCRDTotal(metricscollector.ScaledJobResource, metricsData.namespace)
for _, triggerType := range metricsData.triggerTypes {
prommetrics.DecrementTriggerTotal(triggerType)
metricscollector.DecrementTriggerTotal(triggerType)
}
}

Expand Down
14 changes: 7 additions & 7 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util"
"github.com/kedacore/keda/v2/pkg/common/message"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/prommetrics"
"github.com/kedacore/keda/v2/pkg/metricscollector"
"github.com/kedacore/keda/v2/pkg/scaling"
kedastatus "github.com/kedacore/keda/v2/pkg/status"
)
Expand Down Expand Up @@ -564,18 +564,18 @@ func (r *ScaledObjectReconciler) updatePromMetrics(scaledObject *kedav1alpha1.Sc
metricsData, ok := scaledObjectPromMetricsMap[namespacedName]

if ok {
prommetrics.DecrementCRDTotal(prommetrics.ScaledObjectResource, metricsData.namespace)
metricscollector.DecrementCRDTotal(metricscollector.ScaledObjectResource, metricsData.namespace)
for _, triggerType := range metricsData.triggerTypes {
prommetrics.DecrementTriggerTotal(triggerType)
metricscollector.DecrementTriggerTotal(triggerType)
}
}

prommetrics.IncrementCRDTotal(prommetrics.ScaledObjectResource, scaledObject.Namespace)
metricscollector.IncrementCRDTotal(metricscollector.ScaledObjectResource, scaledObject.Namespace)
metricsData.namespace = scaledObject.Namespace

triggerTypes := make([]string, len(scaledObject.Spec.Triggers))
for _, trigger := range scaledObject.Spec.Triggers {
prommetrics.IncrementTriggerTotal(trigger.Type)
metricscollector.IncrementTriggerTotal(trigger.Type)
triggerTypes = append(triggerTypes, trigger.Type)
}
metricsData.triggerTypes = triggerTypes
Expand All @@ -588,9 +588,9 @@ func (r *ScaledObjectReconciler) updatePromMetricsOnDelete(namespacedName string
defer scaledObjectPromMetricsLock.Unlock()

if metricsData, ok := scaledObjectPromMetricsMap[namespacedName]; ok {
prommetrics.DecrementCRDTotal(prommetrics.ScaledObjectResource, metricsData.namespace)
metricscollector.DecrementCRDTotal(metricscollector.ScaledObjectResource, metricsData.namespace)
for _, triggerType := range metricsData.triggerTypes {
prommetrics.DecrementTriggerTotal(triggerType)
metricscollector.DecrementTriggerTotal(triggerType)
}
}

Expand Down
8 changes: 4 additions & 4 deletions controllers/keda/triggerauthentication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/prommetrics"
"github.com/kedacore/keda/v2/pkg/metricscollector"
)

// TriggerAuthenticationReconciler reconciles a TriggerAuthentication object
Expand Down Expand Up @@ -98,10 +98,10 @@ func (r *TriggerAuthenticationReconciler) updatePromMetrics(triggerAuth *kedav1a
defer triggerAuthPromMetricsLock.Unlock()

if metricsData, ok := triggerAuthPromMetricsMap[namespacedName]; ok {
prommetrics.DecrementCRDTotal(prommetrics.TriggerAuthenticationResource, metricsData.namespace)
metricscollector.DecrementCRDTotal(metricscollector.TriggerAuthenticationResource, metricsData.namespace)
}

prommetrics.IncrementCRDTotal(prommetrics.TriggerAuthenticationResource, triggerAuth.Namespace)
metricscollector.IncrementCRDTotal(metricscollector.TriggerAuthenticationResource, triggerAuth.Namespace)
triggerAuthPromMetricsMap[namespacedName] = triggerAuthMetricsData{namespace: triggerAuth.Namespace}
}

Expand All @@ -111,7 +111,7 @@ func (r *TriggerAuthenticationReconciler) UpdatePromMetricsOnDelete(namespacedNa
defer triggerAuthPromMetricsLock.Unlock()

if metricsData, ok := triggerAuthPromMetricsMap[namespacedName]; ok {
prommetrics.DecrementCRDTotal(prommetrics.TriggerAuthenticationResource, metricsData.namespace)
metricscollector.DecrementCRDTotal(metricscollector.TriggerAuthenticationResource, metricsData.namespace)
}

delete(triggerAuthPromMetricsMap, namespacedName)
Expand Down
20 changes: 12 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ require (
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a
go.etcd.io/etcd/client/v3 v3.5.9
go.mongodb.org/mongo-driver v1.12.1
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.39.0
go.opentelemetry.io/otel/metric v1.16.0
golang.org/x/oauth2 v0.12.0
golang.org/x/sync v0.3.0
google.golang.org/api v0.142.0
Expand All @@ -104,6 +107,8 @@ require (
sigs.k8s.io/kustomize/kustomize/v5 v5.1.1
)

require go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.39.0 // indirect

replace (
// we need a version with license
github.com/chzyer/logex => github.com/chzyer/logex v1.2.1
Expand Down Expand Up @@ -287,15 +292,14 @@ require (
go.etcd.io/etcd/api/v3 v3.5.9 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.9 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.40.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.40.0 // indirect
go.opentelemetry.io/otel v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.42.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.16.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0 // indirect
go.opentelemetry.io/otel/metric v0.37.0 // indirect
go.opentelemetry.io/otel/sdk v1.14.0 // indirect
go.opentelemetry.io/otel/trace v1.14.0 // indirect
go.opentelemetry.io/otel/sdk v1.16.0 // indirect
go.opentelemetry.io/otel/sdk/metric v0.39.0
go.opentelemetry.io/otel/trace v1.16.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5 // indirect
go.uber.org/atomic v1.11.0 // indirect
Expand Down
Loading