Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: set inElecting to false when handling follow hub, and add metrics #1145

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions charts/openyurt/templates/pool-coordinator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ spec:
- --client-cert-auth=true
- --max-txn-ops=1000
- --data-dir=/var/lib/etcd
- --max-request-bytes=100000000
- --key-file=/etc/kubernetes/pki/etcd-server.key
- --listen-metrics-urls=http://0.0.0.0:{{ .Values.poolCoordinator.etcdMetricPort }}
- --snapshot-count=10000
Expand Down
6 changes: 4 additions & 2 deletions charts/openyurt/templates/yurt-controller-manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,12 @@ spec:
spec:
serviceAccountName: yurt-controller-manager
hostNetwork: true
{{- if .Values.imagePullSecrets }}
imagePullSecrets:
- name: regsecret
{{ toYaml .Values.imagePullSecrets | nindent 8 }}
{{- end }}
tolerations:
- operator: "Exists"
- operator: "Exists"
affinity:
nodeAffinity:
# we prefer allocating ecm on cloud node
Expand Down
26 changes: 12 additions & 14 deletions cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,11 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
// coordinatorRun will register secret informer into sharedInformerFactory, and start a new goroutine to periodically check
// if certs has been got from cloud APIServer. It will close the coordinatorInformerRegistryChan if the secret channel has
// been registered into informer factory.
coordinatorHealthCheckerGetter, coordinatorTransportManagerGetter, coordinatorGetter, err = coordinatorRun(ctx, cfg, restConfigMgr, cloudHealthChecker, coordinatorInformerRegistryChan)
if err != nil {
return fmt.Errorf("failed to wait for coordinator to run, %v", err)
}
coordinatorHealthCheckerGetter, coordinatorTransportManagerGetter, coordinatorGetter = coordinatorRun(ctx, cfg, restConfigMgr, cloudHealthChecker, coordinatorInformerRegistryChan)
// wait for coordinator informer registry
klog.Infof("waiting for coordinator informer registry")
<-coordinatorInformerRegistryChan
klog.Infof("coordinator informer registry finished")
}

// Start the informer factory if all informers have been registered
Expand Down Expand Up @@ -267,23 +266,23 @@ func coordinatorRun(ctx context.Context,
cfg *config.YurtHubConfiguration,
restConfigMgr *hubrest.RestConfigManager,
cloudHealthChecker healthchecker.MultipleBackendsHealthChecker,
coordinatorInformerRegistryChan chan struct{}) (func() healthchecker.HealthChecker, func() transport.Interface, func() poolcoordinator.Coordinator, error) {
coordinatorInformerRegistryChan chan struct{}) (func() healthchecker.HealthChecker, func() transport.Interface, func() poolcoordinator.Coordinator) {
var coordinatorHealthChecker healthchecker.HealthChecker
var coordinatorTransportMgr transport.Interface
var coordinator poolcoordinator.Coordinator
var returnErr error

go func() {
coorCertManager, err := coordinatorcertmgr.NewCertManager(cfg.CoordinatorPKIDir, cfg.YurtClient, cfg.SharedFactory)
close(coordinatorInformerRegistryChan) // notify the coordinator secret informer registry event
if err != nil {
returnErr = fmt.Errorf("failed to create coordinator cert manager, %v", err)
klog.Errorf("coordinator failed to create coordinator cert manager, %v", err)
return
}
klog.Infof("coordinator new certManager success")

coorTransportMgr, err := poolCoordinatorTransportMgrGetter(cfg.HeartbeatTimeoutSeconds, cfg.CoordinatorServerURL, coorCertManager, ctx.Done())
if err != nil {
returnErr = fmt.Errorf("failed to create coordinator transport manager, %v", err)
klog.Errorf("coordinator failed to create coordinator transport manager, %v", err)
return
}

Expand All @@ -293,35 +292,34 @@ func coordinatorRun(ctx context.Context,
Timeout: time.Duration(cfg.HeartbeatTimeoutSeconds) * time.Second,
})
if err != nil {
returnErr = fmt.Errorf("failed to get coordinator client for pool coordinator, %v", err)
klog.Errorf("coordinator failed to get coordinator client for pool coordinator, %v", err)
return
}

coorHealthChecker, err := healthchecker.NewCoordinatorHealthChecker(cfg, coordinatorClient, cloudHealthChecker, ctx.Done())
if err != nil {
returnErr = fmt.Errorf("failed to create coordinator health checker, %v", err)
klog.Errorf("coordinator failed to create coordinator health checker, %v", err)
return
}

var elector *poolcoordinator.HubElector
elector, err = poolcoordinator.NewHubElector(cfg, coordinatorClient, coorHealthChecker, cloudHealthChecker, ctx.Done())
if err != nil {
returnErr = fmt.Errorf("failed to create hub elector, %v", err)
klog.Errorf("coordinator failed to create hub elector, %v", err)
return
}
go elector.Run(ctx.Done())

coor, err := poolcoordinator.NewCoordinator(ctx, cfg, restConfigMgr, coorCertManager, coorTransportMgr, elector)
if err != nil {
returnErr = fmt.Errorf("failed to create coordinator, %v", err)
klog.Errorf("coordinator failed to create coordinator, %v", err)
return
}
go coor.Run()

coordinatorTransportMgr = coorTransportMgr
coordinatorHealthChecker = coorHealthChecker
coordinator = coor
returnErr = nil
}()

return func() healthchecker.HealthChecker {
Expand All @@ -330,7 +328,7 @@ func coordinatorRun(ctx context.Context,
return coordinatorTransportMgr
}, func() poolcoordinator.Coordinator {
return coordinator
}, returnErr
}
}

func poolCoordinatorTransportMgrGetter(heartbeatTimeoutSeconds int, coordinatorServer *url.URL, coordinatorCertMgr *coordinatorcertmgr.CertManager, stopCh <-chan struct{}) (transport.Interface, error) {
Expand Down
73 changes: 59 additions & 14 deletions pkg/yurthub/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,16 @@ var (
)

type HubMetrics struct {
serversHealthyCollector *prometheus.GaugeVec
inFlightRequestsCollector *prometheus.GaugeVec
inFlightRequestsGauge prometheus.Gauge
rejectedRequestsCounter prometheus.Counter
closableConnsCollector *prometheus.GaugeVec
proxyTrafficCollector *prometheus.CounterVec
proxyLatencyCollector *prometheus.GaugeVec
serversHealthyCollector *prometheus.GaugeVec
inFlightRequestsCollector *prometheus.GaugeVec
inFlightRequestsGauge prometheus.Gauge
rejectedRequestsCounter prometheus.Counter
closableConnsCollector *prometheus.GaugeVec
proxyTrafficCollector *prometheus.CounterVec
proxyLatencyCollector *prometheus.GaugeVec
poolCoordinatorYurthubRoleCollector *prometheus.GaugeVec
poolCoordinatorHealthyStatusCollector *prometheus.GaugeVec
poolCoordinatorReadyStatusCollector *prometheus.GaugeVec
}

func newHubMetrics() *HubMetrics {
Expand Down Expand Up @@ -108,21 +111,51 @@ func newHubMetrics() *HubMetrics {
Help: "collector of proxy latency of incoming requests(unit: ms)",
},
[]string{"client", "verb", "resource", "subresources", "type"})
poolCoordinatorYurthubRoleCollector := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "pool_coordinator_yurthub_role",
Help: "pool coordinator status of yurthub. 1: LeaderHub, 2: FollowerHub 3: Pending",
},
[]string{})
poolCoordinatorHealthyStatusCollector := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "pool_coordinator_healthy_status",
Help: "pool coordinator heahty status 1: healthy, 0: unhealthy",
},
[]string{})
poolCoordinatorReadyStatusCollector := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "pool_coordinator_ready_status",
Help: "pool coordinator ready status 1: ready, 0: notReady",
},
[]string{})
prometheus.MustRegister(serversHealthyCollector)
prometheus.MustRegister(inFlightRequestsCollector)
prometheus.MustRegister(inFlightRequestsGauge)
prometheus.MustRegister(rejectedRequestsCounter)
prometheus.MustRegister(closableConnsCollector)
prometheus.MustRegister(proxyTrafficCollector)
prometheus.MustRegister(proxyLatencyCollector)
prometheus.MustRegister(poolCoordinatorYurthubRoleCollector)
prometheus.MustRegister(poolCoordinatorHealthyStatusCollector)
prometheus.MustRegister(poolCoordinatorReadyStatusCollector)
return &HubMetrics{
serversHealthyCollector: serversHealthyCollector,
inFlightRequestsCollector: inFlightRequestsCollector,
inFlightRequestsGauge: inFlightRequestsGauge,
rejectedRequestsCounter: rejectedRequestsCounter,
closableConnsCollector: closableConnsCollector,
proxyTrafficCollector: proxyTrafficCollector,
proxyLatencyCollector: proxyLatencyCollector,
serversHealthyCollector: serversHealthyCollector,
inFlightRequestsCollector: inFlightRequestsCollector,
inFlightRequestsGauge: inFlightRequestsGauge,
rejectedRequestsCounter: rejectedRequestsCounter,
closableConnsCollector: closableConnsCollector,
proxyTrafficCollector: proxyTrafficCollector,
proxyLatencyCollector: proxyLatencyCollector,
poolCoordinatorHealthyStatusCollector: poolCoordinatorHealthyStatusCollector,
poolCoordinatorReadyStatusCollector: poolCoordinatorReadyStatusCollector,
poolCoordinatorYurthubRoleCollector: poolCoordinatorYurthubRoleCollector,
}
}

Expand All @@ -139,6 +172,18 @@ func (hm *HubMetrics) ObserveServerHealthy(server string, status int) {
hm.serversHealthyCollector.WithLabelValues(server).Set(float64(status))
}

func (hm *HubMetrics) ObservePoolCoordinatorYurthubRole(status int32) {
hm.poolCoordinatorYurthubRoleCollector.WithLabelValues().Set(float64(status))
}

func (hm *HubMetrics) ObservePoolCoordinatorReadyStatus(status int32) {
hm.poolCoordinatorReadyStatusCollector.WithLabelValues().Set(float64(status))
}

func (hm *HubMetrics) ObservePoolCoordinatorHealthyStatus(status int32) {
hm.poolCoordinatorHealthyStatusCollector.WithLabelValues().Set(float64(status))
}

func (hm *HubMetrics) IncInFlightRequests(verb, resource, subresource, client string) {
hm.inFlightRequestsCollector.WithLabelValues(verb, resource, subresource, client).Inc()
hm.inFlightRequestsGauge.Inc()
Expand Down
9 changes: 9 additions & 0 deletions pkg/yurthub/poolcoordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta"
yurtrest "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/metrics"
"github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/certmanager"
"github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/constants"
"github.com/openyurtio/openyurt/pkg/yurthub/storage"
Expand Down Expand Up @@ -204,6 +205,7 @@ func (coordinator *coordinator) Run() {
if !ok {
return
}
metrics.Metrics.ObservePoolCoordinatorYurthubRole(electorStatus)

switch electorStatus {
case PendingHub:
Expand All @@ -227,10 +229,12 @@ func (coordinator *coordinator) Run() {
klog.Errorf("cloud not get cloud lease client when becoming leader yurthub, %v", err)
continue
}
klog.Infof("coordinator newCloudLeaseClient success.")
if err := coordinator.poolCacheSyncManager.EnsureStart(); err != nil {
klog.Errorf("failed to sync pool-scoped resource, %v", err)
continue
}
klog.Infof("coordinator poolCacheSyncManager has ensure started")
coordinator.delegateNodeLeaseManager.EnsureStartWithHandler(cache.FilteringResourceEventHandler{
FilterFunc: ifDelegateHeartBeat,
Handler: cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -300,8 +304,10 @@ func (coordinator *coordinator) IsReady() (cachemanager.CacheManager, bool) {
coordinator.Lock()
defer coordinator.Unlock()
if coordinator.electStatus != PendingHub && coordinator.isPoolCacheSynced && !coordinator.needUploadLocalCache {
metrics.Metrics.ObservePoolCoordinatorReadyStatus(1)
return coordinator.poolCacheManager, true
}
metrics.Metrics.ObservePoolCoordinatorReadyStatus(0)
return nil, false
}

Expand All @@ -311,8 +317,10 @@ func (coordinator *coordinator) IsHealthy() (cachemanager.CacheManager, bool) {
coordinator.Lock()
defer coordinator.Unlock()
if coordinator.electStatus != PendingHub {
metrics.Metrics.ObservePoolCoordinatorHealthyStatus(1)
return coordinator.poolCacheManager, true
}
metrics.Metrics.ObservePoolCoordinatorHealthyStatus(0)
return nil, false
}

Expand Down Expand Up @@ -420,6 +428,7 @@ func (p *poolScopedCacheSyncManager) EnsureStart() error {
hasInformersSynced := []cache.InformerSynced{}
informerFactory := informers.NewSharedInformerFactory(p.proxiedClient, 0)
for gvr := range constants.PoolScopedResources {
klog.Infof("coordinator informer with resources gvr %+v registered", gvr)
informer, err := informerFactory.ForResource(gvr)
if err != nil {
cancel()
Expand Down
3 changes: 2 additions & 1 deletion pkg/yurthub/poolcoordinator/leader_election.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ func NewHubElector(
},
OnStoppedLeading: func() {
klog.Infof("yurthub of %s is no more a leader", cfg.NodeName)
he.electorStatus <- PendingHub
he.electorStatus <- FollowerHub
he.inElecting = false
},
},
})
Expand Down