Skip to content

Commit

Permalink
[Wf-Diagnostics] Include failure issues identification and rootcause …
Browse files Browse the repository at this point in the history
…in diagnostics (cadence-workflow#6370)

* [Wf-Diagnostics] Include failure issues identification and rootcause in diagnostics

* lint update
  • Loading branch information
sankari165 authored Oct 17, 2024
1 parent bb53e91 commit 1ba0d71
Show file tree
Hide file tree
Showing 9 changed files with 244 additions and 48 deletions.
47 changes: 41 additions & 6 deletions service/worker/diagnostics/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/worker/diagnostics/analytics"
"github.com/uber/cadence/service/worker/diagnostics/invariant"
"github.com/uber/cadence/service/worker/diagnostics/invariant/failure"
"github.com/uber/cadence/service/worker/diagnostics/invariant/timeout"
)

Expand All @@ -51,33 +52,67 @@ func (w *dw) retrieveExecutionHistory(ctx context.Context, info retrieveExecutio
})
}

type identifyTimeoutsInputParams struct {
type identifyIssuesParams struct {
History *types.GetWorkflowExecutionHistoryResponse
Domain string
}

func (w *dw) identifyTimeouts(ctx context.Context, info identifyTimeoutsInputParams) ([]invariant.InvariantCheckResult, error) {
func (w *dw) identifyIssues(ctx context.Context, info identifyIssuesParams) ([]invariant.InvariantCheckResult, error) {
result := make([]invariant.InvariantCheckResult, 0)

timeoutInvariant := timeout.NewInvariant(timeout.NewTimeoutParams{
WorkflowExecutionHistory: info.History,
Domain: info.Domain,
ClientBean: w.clientBean,
})
return timeoutInvariant.Check(ctx)
timeoutIssues, err := timeoutInvariant.Check(ctx)
if err != nil {
return nil, err
}
result = append(result, timeoutIssues...)

failureInvariant := failure.NewInvariant(failure.Params{
WorkflowExecutionHistory: info.History,
Domain: info.Domain,
})
failureIssues, err := failureInvariant.Check(ctx)
if err != nil {
return nil, err
}
result = append(result, failureIssues...)

return result, nil
}

type rootCauseTimeoutsParams struct {
type rootCauseIssuesParams struct {
History *types.GetWorkflowExecutionHistoryResponse
Domain string
Issues []invariant.InvariantCheckResult
}

func (w *dw) rootCauseTimeouts(ctx context.Context, info rootCauseTimeoutsParams) ([]invariant.InvariantRootCauseResult, error) {
func (w *dw) rootCauseIssues(ctx context.Context, info rootCauseIssuesParams) ([]invariant.InvariantRootCauseResult, error) {
result := make([]invariant.InvariantRootCauseResult, 0)
timeoutInvariant := timeout.NewInvariant(timeout.NewTimeoutParams{
WorkflowExecutionHistory: info.History,
ClientBean: w.clientBean,
Domain: info.Domain,
})
return timeoutInvariant.RootCause(ctx, info.Issues)
timeoutRC, err := timeoutInvariant.RootCause(ctx, info.Issues)
if err != nil {
return nil, err
}
result = append(result, timeoutRC...)
failureInvariant := failure.NewInvariant(failure.Params{
WorkflowExecutionHistory: info.History,
Domain: info.Domain,
})
failureRC, err := failureInvariant.RootCause(ctx, info.Issues)
if err != nil {
return nil, err
}
result = append(result, failureRC...)

return result, nil
}

func (w *dw) emitUsageLogs(ctx context.Context, info analytics.WfDiagnosticsUsageData) error {
Expand Down
86 changes: 78 additions & 8 deletions service/worker/diagnostics/activities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/worker/diagnostics/analytics"
"github.com/uber/cadence/service/worker/diagnostics/invariant"
"github.com/uber/cadence/service/worker/diagnostics/invariant/failure"
"github.com/uber/cadence/service/worker/diagnostics/invariant/timeout"
)

Expand All @@ -60,34 +61,56 @@ func Test__retrieveExecutionHistory(t *testing.T) {
require.Equal(t, testWorkflowExecutionHistoryResponse(), result)
}

func Test__identifyTimeouts(t *testing.T) {
func Test__identifyIssues(t *testing.T) {
dwtest := testDiagnosticWorkflow(t)
workflowTimeoutData := timeout.ExecutionTimeoutMetadata{
ExecutionTime: 110 * time.Second,
ConfiguredTimeout: 110 * time.Second,
LastOngoingEvent: &types.HistoryEvent{
ID: 1,
ID: 4,
Timestamp: common.Int64Ptr(testTimeStamp),
WorkflowExecutionStartedEventAttributes: &types.WorkflowExecutionStartedEventAttributes{
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(workflowTimeoutSecond),
ActivityTaskFailedEventAttributes: &types.ActivityTaskFailedEventAttributes{
Reason: common.StringPtr("cadenceInternal:Generic"),
Details: []byte("test-activity-failure"),
Identity: "localhost",
ScheduledEventID: 2,
StartedEventID: 3,
},
},
}
workflowTimeoutDataInBytes, err := json.Marshal(workflowTimeoutData)
require.NoError(t, err)
actMetadata := failure.FailureMetadata{
Identity: "localhost",
ActivityScheduled: &types.ActivityTaskScheduledEventAttributes{
ActivityID: "101",
ActivityType: &types.ActivityType{Name: "test-activity"},
},
ActivityStarted: &types.ActivityTaskStartedEventAttributes{
Identity: "localhost",
Attempt: 0,
},
}
actMetadataInBytes, err := json.Marshal(actMetadata)
require.NoError(t, err)
expectedResult := []invariant.InvariantCheckResult{
{
InvariantType: timeout.TimeoutTypeExecution.String(),
Reason: "START_TO_CLOSE",
Metadata: workflowTimeoutDataInBytes,
},
{
InvariantType: failure.ActivityFailed.String(),
Reason: failure.GenericError.String(),
Metadata: actMetadataInBytes,
},
}
result, err := dwtest.identifyTimeouts(context.Background(), identifyTimeoutsInputParams{History: testWorkflowExecutionHistoryResponse()})
result, err := dwtest.identifyIssues(context.Background(), identifyIssuesParams{History: testWorkflowExecutionHistoryResponse()})
require.NoError(t, err)
require.Equal(t, expectedResult, result)
}

func Test__rootCauseTimeouts(t *testing.T) {
func Test__rootCauseIssues(t *testing.T) {
dwtest := testDiagnosticWorkflow(t)
workflowTimeoutData := timeout.ExecutionTimeoutMetadata{
ExecutionTime: 110 * time.Second,
Expand All @@ -106,12 +129,30 @@ func Test__rootCauseTimeouts(t *testing.T) {
}
workflowTimeoutDataInBytes, err := json.Marshal(workflowTimeoutData)
require.NoError(t, err)
actMetadata := failure.FailureMetadata{
Identity: "localhost",
ActivityScheduled: &types.ActivityTaskScheduledEventAttributes{
ActivityID: "101",
ActivityType: &types.ActivityType{Name: "test-activity"},
},
ActivityStarted: &types.ActivityTaskStartedEventAttributes{
Identity: "localhost",
Attempt: 0,
},
}
actMetadataInBytes, err := json.Marshal(actMetadata)
require.NoError(t, err)
issues := []invariant.InvariantCheckResult{
{
InvariantType: timeout.TimeoutTypeExecution.String(),
Reason: "START_TO_CLOSE",
Metadata: workflowTimeoutDataInBytes,
},
{
InvariantType: failure.ActivityFailed.String(),
Reason: failure.CustomError.String(),
Metadata: actMetadataInBytes,
},
}
taskListBacklog := int64(10)
taskListBacklogInBytes, err := json.Marshal(timeout.PollersMetadata{TaskListBacklog: taskListBacklog})
Expand All @@ -121,8 +162,12 @@ func Test__rootCauseTimeouts(t *testing.T) {
RootCause: invariant.RootCauseTypePollersStatus,
Metadata: taskListBacklogInBytes,
},
{
RootCause: invariant.RootCauseTypeServiceSideIssue,
Metadata: actMetadataInBytes,
},
}
result, err := dwtest.rootCauseTimeouts(context.Background(), rootCauseTimeoutsParams{History: testWorkflowExecutionHistoryResponse(), Domain: "test-domain", Issues: issues})
result, err := dwtest.rootCauseIssues(context.Background(), rootCauseIssuesParams{History: testWorkflowExecutionHistoryResponse(), Domain: "test-domain", Issues: issues})
require.NoError(t, err)
require.Equal(t, expectedRootCause, result)
}
Expand Down Expand Up @@ -170,7 +215,32 @@ func testWorkflowExecutionHistoryResponse() *types.GetWorkflowExecutionHistoryRe
},
},
{
ID: 2,
ID: 2,
ActivityTaskScheduledEventAttributes: &types.ActivityTaskScheduledEventAttributes{
ActivityID: "101",
ActivityType: &types.ActivityType{Name: "test-activity"},
},
},
{
ID: 3,
ActivityTaskStartedEventAttributes: &types.ActivityTaskStartedEventAttributes{
Identity: "localhost",
Attempt: 0,
},
},
{
ID: 4,
Timestamp: common.Int64Ptr(testTimeStamp),
ActivityTaskFailedEventAttributes: &types.ActivityTaskFailedEventAttributes{
Reason: common.StringPtr("cadenceInternal:Generic"),
Details: []byte("test-activity-failure"),
Identity: "localhost",
ScheduledEventID: 2,
StartedEventID: 3,
},
},
{
ID: 5,
Timestamp: common.Int64Ptr(testTimeStamp + int64(workflowTimeoutSecond)*timeUnit.Nanoseconds()),
WorkflowExecutionTimedOutEventAttributes: &types.WorkflowExecutionTimedOutEventAttributes{TimeoutType: types.TimeoutTypeStartToClose.Ptr()},
},
Expand Down
4 changes: 2 additions & 2 deletions service/worker/diagnostics/invariant/failure/failure.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (f *failure) Check(context.Context) ([]invariant.InvariantCheckResult, erro
result = append(result, invariant.InvariantCheckResult{
InvariantType: WorkflowFailed.String(),
Reason: errorTypeFromReason(*reason).String(),
Metadata: invariant.MarshalData(failureMetadata{Identity: identity}),
Metadata: invariant.MarshalData(FailureMetadata{Identity: identity}),
})
}
if event.GetActivityTaskFailedEventAttributes() != nil && event.ActivityTaskFailedEventAttributes.Reason != nil {
Expand All @@ -72,7 +72,7 @@ func (f *failure) Check(context.Context) ([]invariant.InvariantCheckResult, erro
result = append(result, invariant.InvariantCheckResult{
InvariantType: ActivityFailed.String(),
Reason: errorTypeFromReason(*reason).String(),
Metadata: invariant.MarshalData(failureMetadata{
Metadata: invariant.MarshalData(FailureMetadata{
Identity: attr.Identity,
ActivityScheduled: scheduled,
ActivityStarted: started,
Expand Down
6 changes: 3 additions & 3 deletions service/worker/diagnostics/invariant/failure/failure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ const (
)

func Test__Check(t *testing.T) {
metadata := failureMetadata{
metadata := FailureMetadata{
Identity: "localhost",
}
metadataInBytes, err := json.Marshal(metadata)
require.NoError(t, err)
actMetadata := failureMetadata{
actMetadata := FailureMetadata{
Identity: "localhost",
ActivityScheduled: &types.ActivityTaskScheduledEventAttributes{
ActivityID: "101",
Expand Down Expand Up @@ -167,7 +167,7 @@ func failedWfHistory() *types.GetWorkflowExecutionHistoryResponse {
}

func Test__RootCause(t *testing.T) {
metadata := failureMetadata{
metadata := FailureMetadata{
Identity: "localhost",
}
metadataInBytes, err := json.Marshal(metadata)
Expand Down
2 changes: 1 addition & 1 deletion service/worker/diagnostics/invariant/failure/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (f FailureType) String() string {
return string(f)
}

type failureMetadata struct {
type FailureMetadata struct {
Identity string
ActivityScheduled *types.ActivityTaskScheduledEventAttributes
ActivityStarted *types.ActivityTaskStartedEventAttributes
Expand Down
11 changes: 6 additions & 5 deletions service/worker/diagnostics/invariant/timeout/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,19 +112,20 @@ func (t *timeout) Check(context.Context) ([]invariant.InvariantCheckResult, erro
func (t *timeout) RootCause(ctx context.Context, issues []invariant.InvariantCheckResult) ([]invariant.InvariantRootCauseResult, error) {
result := make([]invariant.InvariantRootCauseResult, 0)
for _, issue := range issues {
pollerStatus, err := t.checkTasklist(ctx, issue)
if err != nil {
return nil, err
if issue.InvariantType == TimeoutTypeActivity.String() || issue.InvariantType == TimeoutTypeExecution.String() {
pollerStatus, err := t.checkTasklist(ctx, issue)
if err != nil {
return nil, err
}
result = append(result, pollerStatus)
}
result = append(result, pollerStatus)

if issue.InvariantType == TimeoutTypeActivity.String() {
heartbeatStatus, err := checkHeartbeatStatus(issue)
if err != nil {
return nil, err
}
result = append(result, heartbeatStatus...)

}
}
return result, nil
Expand Down
4 changes: 2 additions & 2 deletions service/worker/diagnostics/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ func (w *dw) Start() error {
newWorker.RegisterWorkflowWithOptions(w.DiagnosticsWorkflow, workflow.RegisterOptions{Name: diagnosticsWorkflow})
newWorker.RegisterWorkflowWithOptions(w.DiagnosticsStarterWorkflow, workflow.RegisterOptions{Name: diagnosticsStarterWorkflow})
newWorker.RegisterActivityWithOptions(w.retrieveExecutionHistory, activity.RegisterOptions{Name: retrieveWfExecutionHistoryActivity})
newWorker.RegisterActivityWithOptions(w.identifyTimeouts, activity.RegisterOptions{Name: identifyTimeoutsActivity})
newWorker.RegisterActivityWithOptions(w.rootCauseTimeouts, activity.RegisterOptions{Name: rootCauseTimeoutsActivity})
newWorker.RegisterActivityWithOptions(w.identifyIssues, activity.RegisterOptions{Name: identifyIssuesActivity})
newWorker.RegisterActivityWithOptions(w.rootCauseIssues, activity.RegisterOptions{Name: rootCauseIssuesActivity})
newWorker.RegisterActivityWithOptions(w.emitUsageLogs, activity.RegisterOptions{Name: emitUsageLogsActivity})
w.worker = newWorker
return newWorker.Start()
Expand Down
Loading

0 comments on commit 1ba0d71

Please sign in to comment.