Skip to content

Commit

Permalink
enhance metrics trace and add more test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyulin0719 committed Oct 19, 2023
1 parent 95a6287 commit d2ac35d
Show file tree
Hide file tree
Showing 5 changed files with 295 additions and 94 deletions.
7 changes: 7 additions & 0 deletions pkg/metrics/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,16 @@ type Metrics struct {

type CoreQueueMetrics interface {
IncQueueApplicationsAccepted()
GetQueueApplicationsAccepted() (int, error)
IncQueueApplicationsRejected()
GetQueueApplicationsRejected() (int, error)
IncQueueApplicationsRunning()
DecQueueApplicationsRunning()
GetQueueApplicationsRunning() (int, error)
IncQueueApplicationsFailed()
GetQueueApplicationsFailed() (int, error)
IncQueueApplicationsCompleted()
GetQueueApplicationsCompleted() (int, error)
IncAllocatedContainer()
IncReleasedContainer()
SetQueueGuaranteedResourceMetrics(resourceName string, value float64)
Expand Down Expand Up @@ -101,6 +106,7 @@ type CoreSchedulerMetrics interface {
// Metrics Ops related to TotalApplicationsRejected
IncTotalApplicationsRejected()
AddTotalApplicationsRejected(value int)
GetTotalApplicationsRejected() (int, error)

// Metrics Ops related to TotalApplicationsRunning
IncTotalApplicationsRunning()
Expand All @@ -119,6 +125,7 @@ type CoreSchedulerMetrics interface {
DecTotalApplicationsCompleted()
SubTotalApplicationsCompleted(value int)
SetTotalApplicationsCompleted(value int)
GetTotalApplicationsCompleted() (int, error)

// Metrics Ops related to ActiveNodes
IncActiveNodes()
Expand Down
46 changes: 46 additions & 0 deletions pkg/metrics/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package metrics

import (
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"go.uber.org/zap"

"github.com/apache/yunikorn-core/pkg/log"
Expand Down Expand Up @@ -84,22 +85,67 @@ func (m *QueueMetrics) DecQueueApplicationsRunning() {
m.appMetrics.With(prometheus.Labels{"state": "running"}).Dec()
}

func (m *QueueMetrics) GetQueueApplicationsRunning() (int, error) {
metricDto := &dto.Metric{}
err := m.appMetrics.With(prometheus.Labels{"state": "running"}).Write(metricDto)
if err == nil {
return int(*metricDto.Gauge.Value), nil
}
return -1, err
}

func (m *QueueMetrics) IncQueueApplicationsAccepted() {
m.appMetrics.With(prometheus.Labels{"state": "accepted"}).Inc()
}

func (m *QueueMetrics) GetQueueApplicationsAccepted() (int, error) {
metricDto := &dto.Metric{}
err := m.appMetrics.With(prometheus.Labels{"state": "accepted"}).Write(metricDto)
if err == nil {
return int(*metricDto.Gauge.Value), nil
}
return -1, err
}

func (m *QueueMetrics) IncQueueApplicationsRejected() {
m.appMetrics.With(prometheus.Labels{"state": "rejected"}).Inc()
}

func (m *QueueMetrics) GetQueueApplicationsRejected() (int, error) {
metricDto := &dto.Metric{}
err := m.appMetrics.With(prometheus.Labels{"state": "rejected"}).Write(metricDto)
if err == nil {
return int(*metricDto.Gauge.Value), nil
}
return -1, err
}

func (m *QueueMetrics) IncQueueApplicationsFailed() {
m.appMetrics.With(prometheus.Labels{"state": "failed"}).Inc()
}

func (m *QueueMetrics) GetQueueApplicationsFailed() (int, error) {
metricDto := &dto.Metric{}
err := m.appMetrics.With(prometheus.Labels{"state": "failed"}).Write(metricDto)
if err == nil {
return int(*metricDto.Gauge.Value), nil
}
return -1, err
}

func (m *QueueMetrics) IncQueueApplicationsCompleted() {
m.appMetrics.With(prometheus.Labels{"state": "completed"}).Inc()
}

func (m *QueueMetrics) GetQueueApplicationsCompleted() (int, error) {
metricDto := &dto.Metric{}
err := m.appMetrics.With(prometheus.Labels{"state": "completed"}).Write(metricDto)
if err == nil {
return int(*metricDto.Gauge.Value), nil
}
return -1, err
}

func (m *QueueMetrics) IncAllocatedContainer() {
m.appMetrics.With(prometheus.Labels{"state": "allocated"}).Inc()
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/metrics/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,15 @@ func (m *SchedulerMetrics) AddTotalApplicationsRejected(value int) {
m.applicationSubmission.With(prometheus.Labels{"result": "rejected"}).Add(float64(value))
}

func (m *SchedulerMetrics) GetTotalApplicationsRejected() (int, error) {
metricDto := &dto.Metric{}
err := m.applicationSubmission.With(prometheus.Labels{"result": "rejected"}).Write(metricDto)
if err == nil {
return int(*metricDto.Counter.Value), nil
}
return -1, err
}

func (m *SchedulerMetrics) IncTotalApplicationsRunning() {
m.application.With(prometheus.Labels{"state": "running"}).Inc()
}
Expand Down Expand Up @@ -317,6 +326,15 @@ func (m *SchedulerMetrics) SetTotalApplicationsCompleted(value int) {
m.application.With(prometheus.Labels{"state": "completed"}).Set(float64(value))
}

func (m *SchedulerMetrics) GetTotalApplicationsCompleted() (int, error) {
metricDto := &dto.Metric{}
err := m.application.With(prometheus.Labels{"state": "completed"}).Write(metricDto)
if err == nil {
return int(*metricDto.Gauge.Value), nil
}
return -1, err
}

func (m *SchedulerMetrics) IncActiveNodes() {
m.node.With(prometheus.Labels{"state": "active"}).Inc()
}
Expand Down
34 changes: 14 additions & 20 deletions pkg/scheduler/objects/application_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,24 +169,29 @@ func NewAppState() *fsm.FSM {
fmt.Sprintf("enter_%s", Starting.String()): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.startTime = time.Now()
app.queue.incRunningApps(app.ApplicationID)
app.setStateTimer(app.startTimeout, app.stateMachine.Current(), RunApplication)
app.queue.incRunningApps(app.ApplicationID)
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsRunning()
metrics.GetSchedulerMetrics().IncTotalApplicationsRunning()
},
fmt.Sprintf("enter_%s", Completing.String()): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
if event.Src == Starting.String() {
fmt.Sprintf("leave_%s", Starting.String()): func(_ context.Context, event *fsm.Event) {
if event.Dst != Running.String() {
app := event.Args[0].(*Application) //nolint:errcheck
app.queue.decRunningApps()
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning()
metrics.GetSchedulerMetrics().DecTotalApplicationsRunning()
}
},
fmt.Sprintf("enter_%s", Completing.String()): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.setStateTimer(completingTimeout, app.stateMachine.Current(), CompleteApplication)
},
fmt.Sprintf("leave_%s", New.String()): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsAccepted()
metrics.GetSchedulerMetrics().IncTotalApplicationsAccepted()
if event.Dst == Resuming.String() || event.Dst == Accepted.String() {
app := event.Args[0].(*Application) //nolint:errcheck
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsAccepted()
metrics.GetSchedulerMetrics().IncTotalApplicationsAccepted()
}
},
fmt.Sprintf("enter_%s", Rejected.String()): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
Expand All @@ -200,14 +205,6 @@ func NewAppState() *fsm.FSM {
app.rejectedMessage = event.Args[1].(string) //nolint:errcheck
}
},
fmt.Sprintf("enter_%s", Running.String()): func(_ context.Context, event *fsm.Event) {
if event.Src == Completing.String() {
app := event.Args[0].(*Application) //nolint:errcheck
app.queue.incRunningApps(app.ApplicationID)
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsRunning()
metrics.GetSchedulerMetrics().IncTotalApplicationsRunning()
}
},
fmt.Sprintf("leave_%s", Running.String()): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.queue.decRunningApps()
Expand All @@ -219,6 +216,8 @@ func NewAppState() *fsm.FSM {
// account for going back into running state
if event.Dst == Running.String() {
app.queue.incRunningApps(app.ApplicationID)
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsRunning()
metrics.GetSchedulerMetrics().IncTotalApplicationsRunning()
}
},
fmt.Sprintf("enter_%s", Completed.String()): func(_ context.Context, event *fsm.Event) {
Expand All @@ -234,11 +233,6 @@ func NewAppState() *fsm.FSM {
app := event.Args[0].(*Application) //nolint:errcheck
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsFailed()
metrics.GetSchedulerMetrics().IncTotalApplicationsFailed()
if event.Src == Starting.String() {
app.queue.decRunningApps()
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning()
metrics.GetSchedulerMetrics().DecTotalApplicationsRunning()
}
},
fmt.Sprintf("enter_%s", Failed.String()): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
Expand Down
Loading

0 comments on commit d2ac35d

Please sign in to comment.