Skip to content

Commit

Permalink
Emit User/System permanent error failures for Node (flyteorg#119)
Browse files Browse the repository at this point in the history
* Emit User/System error failures for workflows
  • Loading branch information
anandswaminathan authored Apr 23, 2020
1 parent 0da8fa2 commit 7c4d636
Showing 1 changed file with 50 additions and 33 deletions.
83 changes: 50 additions & 33 deletions flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,18 @@ import (
)

type nodeMetrics struct {
Scope promutils.Scope
FailureDuration labeled.StopWatch
SuccessDuration labeled.StopWatch
UserErrorDuration labeled.StopWatch
SystemErrorDuration labeled.StopWatch
UnknownErrorDuration labeled.StopWatch
ResolutionFailure labeled.Counter
InputsWriteFailure labeled.Counter
TimedOutFailure labeled.Counter
Scope promutils.Scope
FailureDuration labeled.StopWatch
SuccessDuration labeled.StopWatch
UserErrorDuration labeled.StopWatch
SystemErrorDuration labeled.StopWatch
UnknownErrorDuration labeled.StopWatch
PermanentUserErrorDuration labeled.StopWatch
PermanentSystemErrorDuration labeled.StopWatch
PermanentUnknownErrorDuration labeled.StopWatch
ResolutionFailure labeled.Counter
InputsWriteFailure labeled.Counter
TimedOutFailure labeled.Counter

InterruptedThresholdHit labeled.Counter

Expand Down Expand Up @@ -334,6 +337,15 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx *node
return executors.NodeStatusUndefined, err
}

if p.GetPhase() == handler.EPhaseUndefined {
return executors.NodeStatusUndefined, errors.Errorf(errors.IllegalStateError, nCtx.NodeID(), "received undefined phase.")
}

np, err := ToNodePhase(p.GetPhase())
if err != nil {
return executors.NodeStatusUndefined, errors.Wrapf(errors.IllegalStateError, nCtx.NodeID(), err, "failed to move from queued")
}

// execErr in phase-info 'p' is only available if node has failed to execute, and the current phase at that time
// will be v1alpha1.NodePhaseRunning
execErr := p.GetErr()
Expand All @@ -353,17 +365,19 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx *node
} else {
c.metrics.UnknownErrorDuration.Observe(ctx, startTime, endTime)
}
// When a node fails, we fail the workflow. Independent of number of nodes succeeding/failing, whenever a first node fails,
// the entire workflow is failed.
if np == v1alpha1.NodePhaseFailing {
if execErr.GetKind() == core.ExecutionError_SYSTEM {
nodeStatus.IncrementSystemFailures()
c.metrics.PermanentSystemErrorDuration.Observe(ctx, startTime, endTime)
} else if execErr.GetKind() == core.ExecutionError_USER {
c.metrics.PermanentUserErrorDuration.Observe(ctx, startTime, endTime)
} else {
c.metrics.PermanentUnknownErrorDuration.Observe(ctx, startTime, endTime)
}
}
}

if p.GetPhase() == handler.EPhaseUndefined {
return executors.NodeStatusUndefined, errors.Errorf(errors.IllegalStateError, nCtx.NodeID(), "received undefined phase.")
}

np, err := ToNodePhase(p.GetPhase())
if err != nil {
return executors.NodeStatusUndefined, errors.Wrapf(errors.IllegalStateError, nCtx.NodeID(), err, "failed to move from queued")
}

finalStatus := executors.NodeStatusRunning
if np == v1alpha1.NodePhaseFailing && !h.FinalizeRequired() {
logger.Infof(ctx, "Finalize not required, moving node to Failed")
Expand Down Expand Up @@ -775,20 +789,23 @@ func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *stora
taskRecorder: events.NewTaskEventRecorder(eventSink, scope.NewSubScope("task")),
maxDatasetSizeBytes: maxDatasetSize,
metrics: &nodeMetrics{
Scope: nodeScope,
FailureDuration: labeled.NewStopWatch("failure_duration", "Indicates the total execution time of a failed workflow.", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
SuccessDuration: labeled.NewStopWatch("success_duration", "Indicates the total execution time of a successful workflow.", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
UserErrorDuration: labeled.NewStopWatch("user_error_duration", "Indicates the total execution time before user error", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
SystemErrorDuration: labeled.NewStopWatch("system_error_duration", "Indicates the total execution time before system error", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
UnknownErrorDuration: labeled.NewStopWatch("unknown_error_duration", "Indicates the total execution time before unknown error", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
InputsWriteFailure: labeled.NewCounter("inputs_write_fail", "Indicates failure in writing node inputs to metastore", nodeScope),
TimedOutFailure: labeled.NewCounter("timeout_fail", "Indicates failure due to timeout", nodeScope),
InterruptedThresholdHit: labeled.NewCounter("interrupted_threshold", "Indicates the node interruptible disabled because it hit max failure count", nodeScope),
ResolutionFailure: labeled.NewCounter("input_resolve_fail", "Indicates failure in resolving node inputs", nodeScope),
TransitionLatency: labeled.NewStopWatch("transition_latency", "Measures the latency between the last parent node stoppedAt time and current node's queued time.", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
QueuingLatency: labeled.NewStopWatch("queueing_latency", "Measures the latency between the time a node's been queued to the time the handler reported the executable moved to running state", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
NodeExecutionTime: labeled.NewStopWatch("node_exec_latency", "Measures the time taken to execute one node, a node can be complex so it may encompass sub-node latency.", time.Microsecond, nodeScope, labeled.EmitUnlabeledMetric),
NodeInputGatherLatency: labeled.NewStopWatch("node_input_latency", "Measures the latency to aggregate inputs and check readiness of a node", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
Scope: nodeScope,
FailureDuration: labeled.NewStopWatch("failure_duration", "Indicates the total execution time of a failed workflow.", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
SuccessDuration: labeled.NewStopWatch("success_duration", "Indicates the total execution time of a successful workflow.", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
UserErrorDuration: labeled.NewStopWatch("user_error_duration", "Indicates the total execution time before user error", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
SystemErrorDuration: labeled.NewStopWatch("system_error_duration", "Indicates the total execution time before system error", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
UnknownErrorDuration: labeled.NewStopWatch("unknown_error_duration", "Indicates the total execution time before unknown error", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
PermanentUserErrorDuration: labeled.NewStopWatch("perma_user_error_duration", "Indicates the total execution time before non recoverable user error", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
PermanentSystemErrorDuration: labeled.NewStopWatch("perma_system_error_duration", "Indicates the total execution time before non recoverable system error", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
PermanentUnknownErrorDuration: labeled.NewStopWatch("perma_unknown_error_duration", "Indicates the total execution time before non recoverable unknown error", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
InputsWriteFailure: labeled.NewCounter("inputs_write_fail", "Indicates failure in writing node inputs to metastore", nodeScope),
TimedOutFailure: labeled.NewCounter("timeout_fail", "Indicates failure due to timeout", nodeScope),
InterruptedThresholdHit: labeled.NewCounter("interrupted_threshold", "Indicates the node interruptible disabled because it hit max failure count", nodeScope),
ResolutionFailure: labeled.NewCounter("input_resolve_fail", "Indicates failure in resolving node inputs", nodeScope),
TransitionLatency: labeled.NewStopWatch("transition_latency", "Measures the latency between the last parent node stoppedAt time and current node's queued time.", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
QueuingLatency: labeled.NewStopWatch("queueing_latency", "Measures the latency between the time a node's been queued to the time the handler reported the executable moved to running state", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
NodeExecutionTime: labeled.NewStopWatch("node_exec_latency", "Measures the time taken to execute one node, a node can be complex so it may encompass sub-node latency.", time.Microsecond, nodeScope, labeled.EmitUnlabeledMetric),
NodeInputGatherLatency: labeled.NewStopWatch("node_input_latency", "Measures the latency to aggregate inputs and check readiness of a node", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
},
outputResolver: NewRemoteFileOutputResolver(store),
defaultExecutionDeadline: nodeConfig.DefaultDeadlines.DefaultNodeExecutionDeadline.Duration,
Expand Down

0 comments on commit 7c4d636

Please sign in to comment.