Skip to content

Commit

Permalink
Label *executions_terminated metrics with phase (flyteorg#386)
Browse files Browse the repository at this point in the history
Signed-off-by: Haytham Abuelfutuh <[email protected]>
  • Loading branch information
EngHabu authored Apr 5, 2022
1 parent fede037 commit 3204f7c
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 12 deletions.
10 changes: 6 additions & 4 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strconv"
"time"

"github.com/flyteorg/flytestdlib/promutils/labeled"

"github.com/flyteorg/flyteadmin/plugins"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s"
Expand Down Expand Up @@ -59,7 +61,7 @@ type executionSystemMetrics struct {
Scope promutils.Scope
ActiveExecutions prometheus.Gauge
ExecutionsCreated prometheus.Counter
ExecutionsTerminated prometheus.Counter
ExecutionsTerminated labeled.Counter
ExecutionEventsCreated prometheus.Counter
PropellerFailures prometheus.Counter
PublishNotificationError prometheus.Counter
Expand Down Expand Up @@ -1331,7 +1333,7 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi
}
} else if common.IsExecutionTerminal(request.Event.Phase) {
m.systemMetrics.ActiveExecutions.Dec()
m.systemMetrics.ExecutionsTerminated.Inc()
m.systemMetrics.ExecutionsTerminated.Inc(contextutils.WithPhase(ctx, request.Event.Phase.String()))
go m.emitOverallWorkflowExecutionTime(executionModel, request.Event.OccurredAt)
if request.Event.GetOutputData() != nil {
m.userMetrics.WorkflowExecutionOutputBytes.Observe(float64(proto.Size(request.Event.GetOutputData())))
Expand Down Expand Up @@ -1646,8 +1648,8 @@ func newExecutionSystemMetrics(scope promutils.Scope) executionSystemMetrics {
"overall count of active workflow executions"),
ExecutionsCreated: scope.MustNewCounter("executions_created",
"overall count of successfully completed CreateExecutionRequests"),
ExecutionsTerminated: scope.MustNewCounter("executions_terminated",
"overall count of terminated workflow executions"),
ExecutionsTerminated: labeled.NewCounter("executions_terminated",
"overall count of terminated workflow executions", scope),
ExecutionEventsCreated: scope.MustNewCounter("execution_events_created",
"overall count of successfully completed WorkflowExecutionEventRequest"),
PropellerFailures: scope.MustNewCounter("propeller_failures",
Expand Down
10 changes: 6 additions & 4 deletions pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"strconv"

"github.com/flyteorg/flytestdlib/promutils/labeled"

eventWriter "github.com/flyteorg/flyteadmin/pkg/async/events/interfaces"

notificationInterfaces "github.com/flyteorg/flyteadmin/pkg/async/notifications/interfaces"
Expand Down Expand Up @@ -41,7 +43,7 @@ type nodeExecutionMetrics struct {
Scope promutils.Scope
ActiveNodeExecutions prometheus.Gauge
NodeExecutionsCreated prometheus.Counter
NodeExecutionsTerminated prometheus.Counter
NodeExecutionsTerminated labeled.Counter
NodeExecutionEventsCreated prometheus.Counter
MissingWorkflowExecution prometheus.Counter
ClosureSizeBytes prometheus.Summary
Expand Down Expand Up @@ -279,7 +281,7 @@ func (m *NodeExecutionManager) CreateNodeEvent(ctx context.Context, request admi
m.metrics.ActiveNodeExecutions.Inc()
} else if common.IsNodeExecutionTerminal(request.Event.Phase) {
m.metrics.ActiveNodeExecutions.Dec()
m.metrics.NodeExecutionsTerminated.Inc()
m.metrics.NodeExecutionsTerminated.Inc(contextutils.WithPhase(ctx, request.Event.Phase.String()))
if request.Event.GetOutputData() != nil {
m.metrics.NodeExecutionOutputBytes.Observe(float64(proto.Size(request.Event.GetOutputData())))
}
Expand Down Expand Up @@ -551,8 +553,8 @@ func NewNodeExecutionManager(db repoInterfaces.Repository, config runtimeInterfa
"overall count of active node executions"),
NodeExecutionsCreated: scope.MustNewCounter("node_executions_created",
"overall count of node executions created"),
NodeExecutionsTerminated: scope.MustNewCounter("node_executions_terminated",
"overall count of terminated node executions"),
NodeExecutionsTerminated: labeled.NewCounter("node_executions_terminated",
"overall count of terminated node executions", scope),
NodeExecutionEventsCreated: scope.MustNewCounter("node_execution_events_created",
"overall count of successfully completed NodeExecutionEventRequest"),
MissingWorkflowExecution: scope.MustNewCounter("missing_workflow_execution",
Expand Down
10 changes: 6 additions & 4 deletions pkg/manager/impl/task_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"strconv"

"github.com/flyteorg/flytestdlib/promutils/labeled"

notificationInterfaces "github.com/flyteorg/flyteadmin/pkg/async/notifications/interfaces"
"github.com/golang/protobuf/proto"

Expand Down Expand Up @@ -35,7 +37,7 @@ type taskExecutionMetrics struct {
Scope promutils.Scope
ActiveTaskExecutions prometheus.Gauge
TaskExecutionsCreated prometheus.Counter
TaskExecutionsTerminated prometheus.Counter
TaskExecutionsTerminated labeled.Counter
TaskExecutionEventsCreated prometheus.Counter
MissingTaskExecution prometheus.Counter
MissingTaskDefinition prometheus.Counter
Expand Down Expand Up @@ -192,7 +194,7 @@ func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, req
m.metrics.ActiveTaskExecutions.Inc()
} else if common.IsTaskExecutionTerminal(request.Event.Phase) && request.Event.PhaseVersion == 0 {
m.metrics.ActiveTaskExecutions.Dec()
m.metrics.TaskExecutionsTerminated.Inc()
m.metrics.TaskExecutionsTerminated.Inc(contextutils.WithPhase(ctx, request.Event.Phase.String()))
if request.Event.GetOutputData() != nil {
m.metrics.TaskExecutionOutputBytes.Observe(float64(proto.Size(request.Event.GetOutputData())))
}
Expand Down Expand Up @@ -338,8 +340,8 @@ func NewTaskExecutionManager(db repoInterfaces.Repository, config runtimeInterfa
"overall count of task execution events received that are missing a parent node execution"),
TaskExecutionsCreated: scope.MustNewCounter("task_executions_created",
"overall count of successfully completed CreateExecutionRequests"),
TaskExecutionsTerminated: scope.MustNewCounter("task_executions_terminated",
"overall count of terminated workflow executions"),
TaskExecutionsTerminated: labeled.NewCounter("task_executions_terminated",
"overall count of terminated workflow executions", scope),
TaskExecutionEventsCreated: scope.MustNewCounter("task_execution_events_created",
"overall count of successfully completed WorkflowExecutionEventRequest"),
MissingTaskDefinition: scope.MustNewCounter("missing_task_definition",
Expand Down

0 comments on commit 3204f7c

Please sign in to comment.