Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metric Server - consolidate apiserver to one port #4982

Merged
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **Governance**: KEDA transitioned to CNCF Graduated project ([#63](https://github.com/kedacore/governance/issues/63))

### Improvements

- **General**: Add apiserver prometheus metrics to keda metric server ([#4460](https://github.com/kedacore/keda/issues/4460))
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
- **General**: Add more events for user checking ([#796](https://github.com/kedacore/keda/issues/3764))
- **General**: Add ScaledObject/ScaledJob names to output of `kubectl get triggerauthentication/clustertriggerauthentication` ([#796](https://github.com/kedacore/keda/issues/796))
- **General**: Add standalone CRD generation to release workflow ([#2726](https://github.com/kedacore/keda/issues/2726))
Expand Down
64 changes: 62 additions & 2 deletions cmd/adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,21 @@ import (
"context"
"flag"
"fmt"
"net/http"
"os"
"time"

"github.com/prometheus/client_golang/prometheus/collectors"
appsv1 "k8s.io/api/apps/v1"
apimetrics "k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/client-go/kubernetes/scheme"
kubemetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
ctrl "sigs.k8s.io/controller-runtime"
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/metrics/server"
basecmd "sigs.k8s.io/custom-metrics-apiserver/pkg/cmd"
"sigs.k8s.io/custom-metrics-apiserver/pkg/provider"
Expand Down Expand Up @@ -96,10 +103,9 @@ func (a *Adapter) makeProvider(ctx context.Context) (provider.ExternalMetricsPro
cfg.Burst = adapterClientRequestBurst
cfg.DisableCompression = disableCompression

metricsBindAddress := fmt.Sprintf(":%v", metricsAPIServerPort)
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
Metrics: server.Options{
BindAddress: metricsBindAddress,
BindAddress: "0", // disabled since we use our own server to serve metrics
},
Scheme: scheme,
Cache: ctrlcache.Options{
Expand Down Expand Up @@ -131,6 +137,57 @@ func (a *Adapter) makeProvider(ctx context.Context) (provider.ExternalMetricsPro
return kedaprovider.NewProvider(ctx, logger, mgr.GetClient(), *grpcClient), stopCh, nil
}

// getMetricHandler returns a http handler that exposes metrics from controller-runtime and apiserver
func getMetricHandler() http.HandlerFunc {
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
// Register apiserver metrics in legacy registry
// this contains the apiserver_* metrics
apimetrics.Register()

// unregister duplicate collectors that are already handled by controller-runtime's registry
legacyregistry.Registerer().Unregister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
legacyregistry.Registerer().Unregister(collectors.NewGoCollector(collectors.WithGoCollectorRuntimeMetrics(collectors.MetricsAll)))

// Return handler that serves metrics from both legacy and controller-runtime registry
return func(w http.ResponseWriter, req *http.Request) {
legacyregistry.Handler().ServeHTTP(w, req)

kubemetrics.HandlerFor(ctrlmetrics.Registry, kubemetrics.HandlerOpts{}).ServeHTTP(w, req)
}
}

// RunMetricsServer runs a http listener and handles the /metrics endpoint
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
func RunMetricsServer(stopCh <-chan struct{}) {
h := getMetricHandler()
mux := http.NewServeMux()
mux.Handle("/metrics", h)
metricsBindAddress := fmt.Sprintf(":%v", metricsAPIServerPort)

server := &http.Server{
Addr: metricsBindAddress,
Handler: mux,
}

go func() {
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
logger.Info("starting /metrics server endpoint")
// nosemgrep: use-tls
err := server.ListenAndServe()
if err != nil {
panic(err)
}
}()

go func() {
<-stopCh
logger.Info("Shutting down the /metrics server gracefully...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()

if err := server.Shutdown(ctx); err != nil {
logger.Error(err, "http server shutdown error")
}
}()
}

// generateDefaultMetricsServiceAddr generates default Metrics Service gRPC Server address based on the current Namespace.
// By default the Metrics Service gRPC Server runs in the same namespace on the keda-operator pod.
func generateDefaultMetricsServiceAddr() string {
Expand Down Expand Up @@ -196,6 +253,9 @@ func main() {
cmd.WithExternalMetrics(kedaProvider)

logger.Info(cmd.Message)

RunMetricsServer(stopCh)

if err = cmd.Run(stopCh); err != nil {
return
}
Expand Down