diff --git a/go.mod b/go.mod index b74bf299d..e639aadeb 100644 --- a/go.mod +++ b/go.mod @@ -45,6 +45,7 @@ require ( github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index 84f8f9874..57c1165d7 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -376,7 +376,7 @@ func (sa *Application) timeoutStateTimer(expectedState string, event application sa.notifyRMAllocationReleased(sa.rmID, toRelease, si.TerminationType_TIMEOUT, "releasing placeholders on app complete") sa.clearStateTimer() } else { - //nolint: errcheck + // nolint: errcheck _ = sa.HandleApplicationEvent(event) } } diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go index 04d97376f..1b76d63bf 100644 --- a/pkg/scheduler/objects/queue.go +++ b/pkg/scheduler/objects/queue.go @@ -651,6 +651,7 @@ func (sq *Queue) incPendingResource(delta *resources.Resource) { sq.Lock() defer sq.Unlock() sq.pending = resources.Add(sq.pending, delta) + sq.updatePendingResourceMetrics() } // decPendingResource decrements pending resource of this queue and its parents. @@ -671,6 +672,8 @@ func (sq *Queue) decPendingResource(delta *resources.Resource) { log.Log(log.SchedQueue).Warn("Pending resources went negative", zap.String("queueName", sq.QueuePath), zap.Error(err)) + } else { + sq.updatePendingResourceMetrics() } } @@ -1017,7 +1020,7 @@ func (sq *Queue) IncAllocatedResource(alloc *resources.Resource, nodeReported bo } // all OK update this queue sq.allocatedResource = newAllocated - sq.updateAllocatedAndPendingResourceMetrics() + sq.updateAllocatedResourceMetrics() return nil } @@ -1053,7 +1056,7 @@ func (sq *Queue) DecAllocatedResource(alloc *resources.Resource) error { } // all OK update the queue sq.allocatedResource = resources.Sub(sq.allocatedResource, alloc) - sq.updateAllocatedAndPendingResourceMetrics() + sq.updateAllocatedResourceMetrics() return nil } @@ -1066,7 +1069,7 @@ func (sq *Queue) IncPreemptingResource(alloc *resources.Resource) { defer sq.Unlock() sq.parent.IncPreemptingResource(alloc) sq.preemptingResource = resources.Add(sq.preemptingResource, alloc) - sq.updateAllocatedAndPendingResourceMetrics() + sq.updatePreemptingResourceMetrics() } // DecPreemptingResource decrements the preempting resources for this queue (recursively). @@ -1078,7 +1081,7 @@ func (sq *Queue) DecPreemptingResource(alloc *resources.Resource) { defer sq.Unlock() sq.parent.DecPreemptingResource(alloc) sq.preemptingResource = resources.Sub(sq.preemptingResource, alloc) - sq.updateAllocatedAndPendingResourceMetrics() + sq.updatePreemptingResourceMetrics() } func (sq *Queue) IsPrioritySortEnabled() bool { @@ -1553,13 +1556,29 @@ func (sq *Queue) updateMaxResourceMetrics() { } // updateAllocatedAndPendingResourceMetrics updates allocated and pending resource metrics for all queue types. +// Deprecated: use specific metric update function for efficiency. func (sq *Queue) updateAllocatedAndPendingResourceMetrics() { + sq.updateAllocatedResourceMetrics() + sq.updatePendingResourceMetrics() + sq.updatePreemptingResourceMetrics() +} + +// updateAllocatedResourceMetrics updates allocated resource metrics for all queue types. +func (sq *Queue) updateAllocatedResourceMetrics() { for k, v := range sq.allocatedResource.Resources { metrics.GetQueueMetrics(sq.QueuePath).SetQueueAllocatedResourceMetrics(k, float64(v)) } +} + +// updatePendingResourceMetrics updates pending resource metrics for all queue types. +func (sq *Queue) updatePendingResourceMetrics() { for k, v := range sq.pending.Resources { metrics.GetQueueMetrics(sq.QueuePath).SetQueuePendingResourceMetrics(k, float64(v)) } +} + +// updatePendingResourceMetrics updates preempting resource metrics for all queue types. +func (sq *Queue) updatePreemptingResourceMetrics() { for k, v := range sq.preemptingResource.Resources { metrics.GetQueueMetrics(sq.QueuePath).SetQueuePreemptingResourceMetrics(k, float64(v)) } diff --git a/pkg/scheduler/objects/queue_test.go b/pkg/scheduler/objects/queue_test.go index 66c5e6dae..0024c60f5 100644 --- a/pkg/scheduler/objects/queue_test.go +++ b/pkg/scheduler/objects/queue_test.go @@ -23,15 +23,19 @@ import ( "reflect" "sort" "strconv" + "strings" "testing" "time" + "github.com/prometheus/client_golang/prometheus" + promtu "github.com/prometheus/client_golang/prometheus/testutil" "gotest.tools/v3/assert" "github.com/apache/yunikorn-core/pkg/common" "github.com/apache/yunikorn-core/pkg/common/configs" "github.com/apache/yunikorn-core/pkg/common/resources" "github.com/apache/yunikorn-core/pkg/events" + "github.com/apache/yunikorn-core/pkg/metrics" "github.com/apache/yunikorn-core/pkg/scheduler/objects/template" "github.com/apache/yunikorn-core/pkg/scheduler/policies" siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common" @@ -266,6 +270,11 @@ func TestPriorityCalc(t *testing.T) { } func TestPendingCalc(t *testing.T) { + // Reset existing metric storage; otherwise this unit test would get metrics populated by other UTs. + // In long run, to make the metrics code more testable, we should pass instantiable Metrics obj to Queue + // instead of using a global Metrics obj at pkg/metrics/init.go. + metrics.Reset() + // create the root root, err := createRootQueue(nil) assert.NilError(t, err, "queue create failed") @@ -284,6 +293,16 @@ func TestPendingCalc(t *testing.T) { if !resources.Equals(leaf.pending, allocRes) { t.Errorf("leaf queue pending allocation failed to increment expected %v, got %v", allocRes, leaf.pending) } + metrics := []string{"yunikorn_root_queue_resource", "yunikorn_root_leaf_queue_resource"} + want := concatQueueResourceMetric(metrics, []string{` +yunikorn_root_queue_resource{resource="memory",state="pending"} 100 +yunikorn_root_queue_resource{resource="vcores",state="pending"} 10 +`, ` +yunikorn_root_leaf_queue_resource{resource="memory",state="pending"} 100 +yunikorn_root_leaf_queue_resource{resource="vcores",state="pending"} 10 +`}, + ) + assert.NilError(t, promtu.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(want), metrics...), "unexpected metrics") leaf.decPendingResource(allocRes) if !resources.IsZero(root.pending) { t.Errorf("root queue pending allocation failed to decrement expected 0, got %v", root.pending) @@ -291,6 +310,15 @@ func TestPendingCalc(t *testing.T) { if !resources.IsZero(leaf.pending) { t.Errorf("leaf queue pending allocation failed to decrement expected 0, got %v", leaf.pending) } + want = concatQueueResourceMetric(metrics, []string{` +yunikorn_root_queue_resource{resource="memory",state="pending"} 0 +yunikorn_root_queue_resource{resource="vcores",state="pending"} 0 +`, ` +yunikorn_root_leaf_queue_resource{resource="memory",state="pending"} 0 +yunikorn_root_leaf_queue_resource{resource="vcores",state="pending"} 0 +`}, + ) + assert.NilError(t, promtu.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(want), metrics...), "unexpected metrics") // Not allowed to go negative: both will be zero after this newRes := resources.Multiply(allocRes, 2) root.pending = newRes @@ -302,6 +330,30 @@ func TestPendingCalc(t *testing.T) { if !resources.IsZero(leaf.GetPendingResource()) { t.Errorf("leaf queue pending allocation should have failed to decrement expected zero, got %v", leaf.pending) } + want = concatQueueResourceMetric(metrics, []string{` +yunikorn_root_queue_resource{resource="memory",state="pending"} 0 +yunikorn_root_queue_resource{resource="vcores",state="pending"} 0 +`, ` +yunikorn_root_leaf_queue_resource{resource="memory",state="pending"} 0 +yunikorn_root_leaf_queue_resource{resource="vcores",state="pending"} 0 +`}, + ) + assert.NilError(t, promtu.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(want), metrics...), "unexpected metrics") +} + +const ( + QueueResourceMetricHelp = "# HELP %v Queue resource metrics. State of the resource includes `guaranteed`, `max`, `allocated`, `pending`, `preempting`." + QueueResourceMetricType = "# TYPE %v gauge" +) + +func concatQueueResourceMetric(metricNames, metricVals []string) string { + var out string + for i, metricName := range metricNames { + out = out + fmt.Sprintf(QueueResourceMetricHelp, metricName) + "\n" + out = out + fmt.Sprintf(QueueResourceMetricType, metricName) + "\n" + out += strings.TrimLeft(metricVals[i], "\n") + } + return out } func TestGetChildQueueInfo(t *testing.T) {