From eb3b02d82eaa83a8c81278eb475eaec89df3084e Mon Sep 17 00:00:00 2001 From: David Porter Date: Wed, 6 Mar 2024 15:30:25 -0800 Subject: [PATCH 01/12] Adds MSB tests --- .../execution/mutable_state_builder_test.go | 275 ++++++ service/history/shard/context.go | 2 + service/history/shard/interface_mock.go | 844 ++++++++++++++++++ 3 files changed, 1121 insertions(+) create mode 100644 service/history/shard/interface_mock.go diff --git a/service/history/execution/mutable_state_builder_test.go b/service/history/execution/mutable_state_builder_test.go index 1990c955cf4..98e276dfcc9 100644 --- a/service/history/execution/mutable_state_builder_test.go +++ b/service/history/execution/mutable_state_builder_test.go @@ -21,6 +21,8 @@ package execution import ( + "context" + "errors" "testing" "time" @@ -37,12 +39,14 @@ import ( "github.com/uber/cadence/common/definition" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/constants" "github.com/uber/cadence/service/history/events" "github.com/uber/cadence/service/history/shard" + shardCtx "github.com/uber/cadence/service/history/shard" ) type ( @@ -990,3 +994,274 @@ func (s *mutableStateSuite) buildWorkflowMutableState() *persistence.WorkflowMut VersionHistories: versionHistories, } } + +func TestNewMutableStateBuilderWithEventV2(t *testing.T) { + + ctrl := gomock.NewController(t) + mockShard := shard.NewTestContext( + t, + ctrl, + &persistence.ShardInfo{ + ShardID: 0, + RangeID: 1, + TransferAckLevel: 0, + }, + config.NewForTest(), + ) + domainCache := cache.NewDomainCacheEntryForTest( + &persistence.DomainInfo{Name: "mutableStateTest"}, + &persistence.DomainConfig{}, + true, + &persistence.DomainReplicationConfig{}, + 1, + nil, + ) + + NewMutableStateBuilderWithEventV2(mockShard, log.NewNoop(), "A82146B5-7A5C-4660-9195-E80E5161EC56", domainCache) +} + +var ( + domainID = "A6338800-D143-4FEF-8A49-9BBB31386C5F" + wfID = "879A361B-B435-491D-8A3B-ACF3BAD30F4B" + runID = "81DFCB6B-ACD4-46D1-89C2-804388203880" + ts1 = int64(1234) + shardID = 123 +) + +// Guiding real data example: ie: +// `select execution from executions where run_id = ALLOW FILTERING;` +// +// executions.execution { +// domainID: "A6338800-D143-4FEF-8A49-9BBB31386C5F", +// wfID: "879A361B-B435-491D-8A3B-ACF3BAD30F4B", +// runID: "81DFCB6B-ACD4-46D1-89C2-804388203880", +// initiated_id: -7, +// completion_event: null, +// state: 2, +// close_status: 1, +// next_event_id: 12, +// last_processed_event: 9, +// decision_schedule_id: -23, +// decision_started_id: -23, +// last_first_event_id: 10, +// decision_version: -24, +// completion_event_batch_id: 10, +// last_event_task_id: 4194328, +// } +var exampleMutableStateForClosedWF = &mutableStateBuilder{ + executionInfo: &persistence.WorkflowExecutionInfo{ + WorkflowID: wfID, + DomainID: domainID, + RunID: runID, + NextEventID: 12, + State: persistence.WorkflowStateCompleted, + CompletionEventBatchID: 10, + BranchToken: []byte("branch-token"), + }, +} + +var exampleCompletionEvent = &types.HistoryEvent{ + ID: 11, + TaskID: 4194328, + Version: 1, + Timestamp: &ts1, + WorkflowExecutionCompletedEventAttributes: &types.WorkflowExecutionCompletedEventAttributes{ + Result: []byte("some random workflow completion result"), + DecisionTaskCompletedEventID: 10, + }, +} + +var exampleStartEvent = &types.HistoryEvent{ + ID: 1, + TaskID: 4194328, + Version: 1, + Timestamp: &ts1, + WorkflowExecutionStartedEventAttributes: &types.WorkflowExecutionStartedEventAttributes{ + WorkflowType: &types.WorkflowType{Name: "workflow-type"}, + TaskList: &types.TaskList{Name: "tasklist"}, + Input: []byte("some random workflow input"), + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(60), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(10), + OriginalExecutionRunID: runID, + Identity: "123@some-hostname@@uuid", + }, +} + +func TestGetCompletionEvent(t *testing.T) { + + tests := map[string]struct { + currentState *mutableStateBuilder + + historyManagerAffordance func(historyManager *persistence.MockHistoryManager) + + expectedResult *types.HistoryEvent + expectedErr error + }{ + "Getting a completed event from a normal, completed workflow - taken from a real example": { + currentState: exampleMutableStateForClosedWF, + historyManagerAffordance: func(historyManager *persistence.MockHistoryManager) { + + historyManager.EXPECT().ReadHistoryBranch(gomock.Any(), + &persistence.ReadHistoryBranchRequest{ + BranchToken: []byte("branch-token"), + MinEventID: 10, + MaxEventID: 12, // nextEventID +1 + PageSize: 1, + NextPageToken: nil, + ShardID: common.IntPtr(shardID), + DomainName: "domain", + }).Return(&persistence.ReadHistoryBranchResponse{ + HistoryEvents: []*types.HistoryEvent{ + exampleCompletionEvent, + }, + }, nil) + + }, + + expectedResult: exampleCompletionEvent, + }, + "An unexpected error while fetchhing history, such as not found err": { + currentState: &mutableStateBuilder{ + executionInfo: &persistence.WorkflowExecutionInfo{ + WorkflowID: wfID, + DomainID: domainID, + RunID: runID, + NextEventID: 12, + State: persistence.WorkflowStateCompleted, + CompletionEventBatchID: 10, + BranchToken: []byte("branch-token"), + }, + }, + historyManagerAffordance: func(historyManager *persistence.MockHistoryManager) { + historyManager.EXPECT().ReadHistoryBranch(gomock.Any(), gomock.Any()).Return(nil, errors.New("a transient random error")) + }, + + expectedResult: nil, + expectedErr: &types.InternalServiceError{Message: "unable to get workflow completion event"}, + }, + "A 'transient' internal service error, this should be returned to the caller": { + currentState: &mutableStateBuilder{ + executionInfo: &persistence.WorkflowExecutionInfo{ + WorkflowID: wfID, + DomainID: domainID, + RunID: runID, + NextEventID: 12, + State: persistence.WorkflowStateCompleted, + CompletionEventBatchID: 10, + BranchToken: []byte("branch-token"), + }, + }, + historyManagerAffordance: func(historyManager *persistence.MockHistoryManager) { + historyManager.EXPECT().ReadHistoryBranch(gomock.Any(), gomock.Any()). + Return(nil, &types.InternalServiceError{Message: "an err"}) + }, + + expectedResult: nil, + expectedErr: &types.InternalServiceError{Message: "an err"}, + }, + "initial validation: An invalid starting mutable state should return an error": { + currentState: &mutableStateBuilder{ + executionInfo: &persistence.WorkflowExecutionInfo{}, + }, + historyManagerAffordance: func(historyManager *persistence.MockHistoryManager) {}, + expectedResult: nil, + expectedErr: &types.InternalServiceError{Message: "unable to get workflow completion event"}, + }, + } + + for name, td := range tests { + t.Run(name, func(t *testing.T) { + + ctrl := gomock.NewController(t) + + shardContext := shardCtx.NewMockContext(ctrl) + shardContext.EXPECT().GetShardID().Return(123).AnyTimes() // this isn't called on a few of the validation failures + historyManager := persistence.NewMockHistoryManager(ctrl) + td.historyManagerAffordance(historyManager) + + domainCache := cache.NewMockDomainCache(ctrl) + domainCache.EXPECT().GetDomainName(gomock.Any()).Return("domain", nil).AnyTimes() // this isn't called on validation + + td.currentState.eventsCache = events.NewCache(shardID, historyManager, config.NewForTest(), log.NewNoop(), metrics.NewNoopMetricsClient(), domainCache) + td.currentState.shard = shardContext + + res, err := td.currentState.GetCompletionEvent(context.Background()) + + assert.Equal(t, td.expectedResult, res) + assert.Equal(t, td.expectedErr, err) + }) + } +} + +func TestGetStartEvent(t *testing.T) { + + tests := map[string]struct { + currentState *mutableStateBuilder + + historyManagerAffordance func(historyManager *persistence.MockHistoryManager) + + expectedResult *types.HistoryEvent + expectedErr error + }{ + "Getting a start event from a normal, completed workflow - taken from a real example": { + currentState: exampleMutableStateForClosedWF, + historyManagerAffordance: func(historyManager *persistence.MockHistoryManager) { + + historyManager.EXPECT().ReadHistoryBranch(gomock.Any(), + &persistence.ReadHistoryBranchRequest{ + BranchToken: []byte("branch-token"), + MinEventID: 1, + MaxEventID: 2, + PageSize: 1, + NextPageToken: nil, + ShardID: common.IntPtr(shardID), + DomainName: "domain", + }).Return(&persistence.ReadHistoryBranchResponse{ + HistoryEvents: []*types.HistoryEvent{ + exampleStartEvent, + }, + }, nil) + + }, + + expectedResult: exampleStartEvent, + }, + "Getting a start event but hitting an error when reaching into history": { + currentState: exampleMutableStateForClosedWF, + historyManagerAffordance: func(historyManager *persistence.MockHistoryManager) { + historyManager.EXPECT().ReadHistoryBranch(gomock.Any(), gomock.Any()).Return(nil, errors.New("an error")) + }, + expectedErr: &types.InternalServiceError{Message: "unable to get workflow start event"}, + }, + "Getting a start event but hitting a 'transient' error when reaching into history. This should be passed back up the call stack": { + currentState: exampleMutableStateForClosedWF, + historyManagerAffordance: func(historyManager *persistence.MockHistoryManager) { + historyManager.EXPECT().ReadHistoryBranch(gomock.Any(), gomock.Any()).Return(nil, &types.InternalServiceError{Message: "an error"}) + }, + expectedErr: &types.InternalServiceError{Message: "an error"}, + }, + } + + for name, td := range tests { + t.Run(name, func(t *testing.T) { + + ctrl := gomock.NewController(t) + + shardContext := shardCtx.NewMockContext(ctrl) + shardContext.EXPECT().GetShardID().Return(123).AnyTimes() // this isn't called on a few of the validation failures + historyManager := persistence.NewMockHistoryManager(ctrl) + td.historyManagerAffordance(historyManager) + + domainCache := cache.NewMockDomainCache(ctrl) + domainCache.EXPECT().GetDomainName(gomock.Any()).Return("domain", nil).AnyTimes() // this isn't called on validation + + td.currentState.eventsCache = events.NewCache(shardID, historyManager, config.NewForTest(), log.NewNoop(), metrics.NewNoopMetricsClient(), domainCache) + td.currentState.shard = shardContext + + res, err := td.currentState.GetStartEvent(context.Background()) + + assert.Equal(t, td.expectedResult, res) + assert.Equal(t, td.expectedErr, err) + }) + } +} diff --git a/service/history/shard/context.go b/service/history/shard/context.go index 037a71de136..f5859446053 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -18,6 +18,8 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interface_mock.go -package shard github.com/uber/cadence/history/shard/context Context + package shard import ( diff --git a/service/history/shard/interface_mock.go b/service/history/shard/interface_mock.go new file mode 100644 index 00000000000..5d870a90125 --- /dev/null +++ b/service/history/shard/interface_mock.go @@ -0,0 +1,844 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +// Code generated by MockGen. DO NOT EDIT. +// Source: context.go + +// Package shard is a generated GoMock package. +package shard + +import ( + context "context" + reflect "reflect" + time "time" + + gomock "github.com/golang/mock/gomock" + + cache "github.com/uber/cadence/common/cache" + clock "github.com/uber/cadence/common/clock" + cluster "github.com/uber/cadence/common/cluster" + log "github.com/uber/cadence/common/log" + metrics "github.com/uber/cadence/common/metrics" + persistence "github.com/uber/cadence/common/persistence" + types "github.com/uber/cadence/common/types" + config "github.com/uber/cadence/service/history/config" + engine "github.com/uber/cadence/service/history/engine" + events "github.com/uber/cadence/service/history/events" + resource "github.com/uber/cadence/service/history/resource" +) + +// MockContext is a mock of Context interface. +type MockContext struct { + ctrl *gomock.Controller + recorder *MockContextMockRecorder +} + +// MockContextMockRecorder is the mock recorder for MockContext. +type MockContextMockRecorder struct { + mock *MockContext +} + +// NewMockContext creates a new mock instance. +func NewMockContext(ctrl *gomock.Controller) *MockContext { + mock := &MockContext{ctrl: ctrl} + mock.recorder = &MockContextMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockContext) EXPECT() *MockContextMockRecorder { + return m.recorder +} + +// AddingPendingFailoverMarker mocks base method. +func (m *MockContext) AddingPendingFailoverMarker(arg0 *types.FailoverMarkerAttributes) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddingPendingFailoverMarker", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddingPendingFailoverMarker indicates an expected call of AddingPendingFailoverMarker. +func (mr *MockContextMockRecorder) AddingPendingFailoverMarker(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddingPendingFailoverMarker", reflect.TypeOf((*MockContext)(nil).AddingPendingFailoverMarker), arg0) +} + +// AppendHistoryV2Events mocks base method. +func (m *MockContext) AppendHistoryV2Events(ctx context.Context, request *persistence.AppendHistoryNodesRequest, domainID string, execution types.WorkflowExecution) (*persistence.AppendHistoryNodesResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AppendHistoryV2Events", ctx, request, domainID, execution) + ret0, _ := ret[0].(*persistence.AppendHistoryNodesResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AppendHistoryV2Events indicates an expected call of AppendHistoryV2Events. +func (mr *MockContextMockRecorder) AppendHistoryV2Events(ctx, request, domainID, execution interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AppendHistoryV2Events", reflect.TypeOf((*MockContext)(nil).AppendHistoryV2Events), ctx, request, domainID, execution) +} + +// ConflictResolveWorkflowExecution mocks base method. +func (m *MockContext) ConflictResolveWorkflowExecution(ctx context.Context, request *persistence.ConflictResolveWorkflowExecutionRequest) (*persistence.ConflictResolveWorkflowExecutionResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ConflictResolveWorkflowExecution", ctx, request) + ret0, _ := ret[0].(*persistence.ConflictResolveWorkflowExecutionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ConflictResolveWorkflowExecution indicates an expected call of ConflictResolveWorkflowExecution. +func (mr *MockContextMockRecorder) ConflictResolveWorkflowExecution(ctx, request interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConflictResolveWorkflowExecution", reflect.TypeOf((*MockContext)(nil).ConflictResolveWorkflowExecution), ctx, request) +} + +// CreateWorkflowExecution mocks base method. +func (m *MockContext) CreateWorkflowExecution(ctx context.Context, request *persistence.CreateWorkflowExecutionRequest) (*persistence.CreateWorkflowExecutionResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateWorkflowExecution", ctx, request) + ret0, _ := ret[0].(*persistence.CreateWorkflowExecutionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateWorkflowExecution indicates an expected call of CreateWorkflowExecution. +func (mr *MockContextMockRecorder) CreateWorkflowExecution(ctx, request interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateWorkflowExecution", reflect.TypeOf((*MockContext)(nil).CreateWorkflowExecution), ctx, request) +} + +// DeleteTimerFailoverLevel mocks base method. +func (m *MockContext) DeleteTimerFailoverLevel(failoverID string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteTimerFailoverLevel", failoverID) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteTimerFailoverLevel indicates an expected call of DeleteTimerFailoverLevel. +func (mr *MockContextMockRecorder) DeleteTimerFailoverLevel(failoverID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTimerFailoverLevel", reflect.TypeOf((*MockContext)(nil).DeleteTimerFailoverLevel), failoverID) +} + +// DeleteTransferFailoverLevel mocks base method. +func (m *MockContext) DeleteTransferFailoverLevel(failoverID string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteTransferFailoverLevel", failoverID) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteTransferFailoverLevel indicates an expected call of DeleteTransferFailoverLevel. +func (mr *MockContextMockRecorder) DeleteTransferFailoverLevel(failoverID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTransferFailoverLevel", reflect.TypeOf((*MockContext)(nil).DeleteTransferFailoverLevel), failoverID) +} + +// GenerateTransferTaskID mocks base method. +func (m *MockContext) GenerateTransferTaskID() (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GenerateTransferTaskID") + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GenerateTransferTaskID indicates an expected call of GenerateTransferTaskID. +func (mr *MockContextMockRecorder) GenerateTransferTaskID() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateTransferTaskID", reflect.TypeOf((*MockContext)(nil).GenerateTransferTaskID)) +} + +// GenerateTransferTaskIDs mocks base method. +func (m *MockContext) GenerateTransferTaskIDs(number int) ([]int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GenerateTransferTaskIDs", number) + ret0, _ := ret[0].([]int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GenerateTransferTaskIDs indicates an expected call of GenerateTransferTaskIDs. +func (mr *MockContextMockRecorder) GenerateTransferTaskIDs(number interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateTransferTaskIDs", reflect.TypeOf((*MockContext)(nil).GenerateTransferTaskIDs), number) +} + +// GetAllTimerFailoverLevels mocks base method. +func (m *MockContext) GetAllTimerFailoverLevels() map[string]TimerFailoverLevel { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAllTimerFailoverLevels") + ret0, _ := ret[0].(map[string]TimerFailoverLevel) + return ret0 +} + +// GetAllTimerFailoverLevels indicates an expected call of GetAllTimerFailoverLevels. +func (mr *MockContextMockRecorder) GetAllTimerFailoverLevels() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllTimerFailoverLevels", reflect.TypeOf((*MockContext)(nil).GetAllTimerFailoverLevels)) +} + +// GetAllTransferFailoverLevels mocks base method. +func (m *MockContext) GetAllTransferFailoverLevels() map[string]TransferFailoverLevel { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAllTransferFailoverLevels") + ret0, _ := ret[0].(map[string]TransferFailoverLevel) + return ret0 +} + +// GetAllTransferFailoverLevels indicates an expected call of GetAllTransferFailoverLevels. +func (mr *MockContextMockRecorder) GetAllTransferFailoverLevels() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllTransferFailoverLevels", reflect.TypeOf((*MockContext)(nil).GetAllTransferFailoverLevels)) +} + +// GetClusterMetadata mocks base method. +func (m *MockContext) GetClusterMetadata() cluster.Metadata { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetClusterMetadata") + ret0, _ := ret[0].(cluster.Metadata) + return ret0 +} + +// GetClusterMetadata indicates an expected call of GetClusterMetadata. +func (mr *MockContextMockRecorder) GetClusterMetadata() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetClusterMetadata", reflect.TypeOf((*MockContext)(nil).GetClusterMetadata)) +} + +// GetClusterReplicationLevel mocks base method. +func (m *MockContext) GetClusterReplicationLevel(cluster string) int64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetClusterReplicationLevel", cluster) + ret0, _ := ret[0].(int64) + return ret0 +} + +// GetClusterReplicationLevel indicates an expected call of GetClusterReplicationLevel. +func (mr *MockContextMockRecorder) GetClusterReplicationLevel(cluster interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetClusterReplicationLevel", reflect.TypeOf((*MockContext)(nil).GetClusterReplicationLevel), cluster) +} + +// GetConfig mocks base method. +func (m *MockContext) GetConfig() *config.Config { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetConfig") + ret0, _ := ret[0].(*config.Config) + return ret0 +} + +// GetConfig indicates an expected call of GetConfig. +func (mr *MockContextMockRecorder) GetConfig() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetConfig", reflect.TypeOf((*MockContext)(nil).GetConfig)) +} + +// GetCrossClusterProcessingQueueStates mocks base method. +func (m *MockContext) GetCrossClusterProcessingQueueStates(cluster string) []*types.ProcessingQueueState { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetCrossClusterProcessingQueueStates", cluster) + ret0, _ := ret[0].([]*types.ProcessingQueueState) + return ret0 +} + +// GetCrossClusterProcessingQueueStates indicates an expected call of GetCrossClusterProcessingQueueStates. +func (mr *MockContextMockRecorder) GetCrossClusterProcessingQueueStates(cluster interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCrossClusterProcessingQueueStates", reflect.TypeOf((*MockContext)(nil).GetCrossClusterProcessingQueueStates), cluster) +} + +// GetCurrentTime mocks base method. +func (m *MockContext) GetCurrentTime(cluster string) time.Time { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetCurrentTime", cluster) + ret0, _ := ret[0].(time.Time) + return ret0 +} + +// GetCurrentTime indicates an expected call of GetCurrentTime. +func (mr *MockContextMockRecorder) GetCurrentTime(cluster interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCurrentTime", reflect.TypeOf((*MockContext)(nil).GetCurrentTime), cluster) +} + +// GetDomainCache mocks base method. +func (m *MockContext) GetDomainCache() cache.DomainCache { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetDomainCache") + ret0, _ := ret[0].(cache.DomainCache) + return ret0 +} + +// GetDomainCache indicates an expected call of GetDomainCache. +func (mr *MockContextMockRecorder) GetDomainCache() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetDomainCache", reflect.TypeOf((*MockContext)(nil).GetDomainCache)) +} + +// GetDomainNotificationVersion mocks base method. +func (m *MockContext) GetDomainNotificationVersion() int64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetDomainNotificationVersion") + ret0, _ := ret[0].(int64) + return ret0 +} + +// GetDomainNotificationVersion indicates an expected call of GetDomainNotificationVersion. +func (mr *MockContextMockRecorder) GetDomainNotificationVersion() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetDomainNotificationVersion", reflect.TypeOf((*MockContext)(nil).GetDomainNotificationVersion)) +} + +// GetEngine mocks base method. +func (m *MockContext) GetEngine() engine.Engine { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetEngine") + ret0, _ := ret[0].(engine.Engine) + return ret0 +} + +// GetEngine indicates an expected call of GetEngine. +func (mr *MockContextMockRecorder) GetEngine() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetEngine", reflect.TypeOf((*MockContext)(nil).GetEngine)) +} + +// GetEventsCache mocks base method. +func (m *MockContext) GetEventsCache() events.Cache { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetEventsCache") + ret0, _ := ret[0].(events.Cache) + return ret0 +} + +// GetEventsCache indicates an expected call of GetEventsCache. +func (mr *MockContextMockRecorder) GetEventsCache() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetEventsCache", reflect.TypeOf((*MockContext)(nil).GetEventsCache)) +} + +// GetExecutionManager mocks base method. +func (m *MockContext) GetExecutionManager() persistence.ExecutionManager { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetExecutionManager") + ret0, _ := ret[0].(persistence.ExecutionManager) + return ret0 +} + +// GetExecutionManager indicates an expected call of GetExecutionManager. +func (mr *MockContextMockRecorder) GetExecutionManager() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetExecutionManager", reflect.TypeOf((*MockContext)(nil).GetExecutionManager)) +} + +// GetHistoryManager mocks base method. +func (m *MockContext) GetHistoryManager() persistence.HistoryManager { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetHistoryManager") + ret0, _ := ret[0].(persistence.HistoryManager) + return ret0 +} + +// GetHistoryManager indicates an expected call of GetHistoryManager. +func (mr *MockContextMockRecorder) GetHistoryManager() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetHistoryManager", reflect.TypeOf((*MockContext)(nil).GetHistoryManager)) +} + +// GetLastUpdatedTime mocks base method. +func (m *MockContext) GetLastUpdatedTime() time.Time { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetLastUpdatedTime") + ret0, _ := ret[0].(time.Time) + return ret0 +} + +// GetLastUpdatedTime indicates an expected call of GetLastUpdatedTime. +func (mr *MockContextMockRecorder) GetLastUpdatedTime() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLastUpdatedTime", reflect.TypeOf((*MockContext)(nil).GetLastUpdatedTime)) +} + +// GetLogger mocks base method. +func (m *MockContext) GetLogger() log.Logger { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetLogger") + ret0, _ := ret[0].(log.Logger) + return ret0 +} + +// GetLogger indicates an expected call of GetLogger. +func (mr *MockContextMockRecorder) GetLogger() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLogger", reflect.TypeOf((*MockContext)(nil).GetLogger)) +} + +// GetMetricsClient mocks base method. +func (m *MockContext) GetMetricsClient() metrics.Client { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMetricsClient") + ret0, _ := ret[0].(metrics.Client) + return ret0 +} + +// GetMetricsClient indicates an expected call of GetMetricsClient. +func (mr *MockContextMockRecorder) GetMetricsClient() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetricsClient", reflect.TypeOf((*MockContext)(nil).GetMetricsClient)) +} + +// GetService mocks base method. +func (m *MockContext) GetService() resource.Resource { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetService") + ret0, _ := ret[0].(resource.Resource) + return ret0 +} + +// GetService indicates an expected call of GetService. +func (mr *MockContextMockRecorder) GetService() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetService", reflect.TypeOf((*MockContext)(nil).GetService)) +} + +// GetShardID mocks base method. +func (m *MockContext) GetShardID() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetShardID") + ret0, _ := ret[0].(int) + return ret0 +} + +// GetShardID indicates an expected call of GetShardID. +func (mr *MockContextMockRecorder) GetShardID() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetShardID", reflect.TypeOf((*MockContext)(nil).GetShardID)) +} + +// GetThrottledLogger mocks base method. +func (m *MockContext) GetThrottledLogger() log.Logger { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetThrottledLogger") + ret0, _ := ret[0].(log.Logger) + return ret0 +} + +// GetThrottledLogger indicates an expected call of GetThrottledLogger. +func (mr *MockContextMockRecorder) GetThrottledLogger() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetThrottledLogger", reflect.TypeOf((*MockContext)(nil).GetThrottledLogger)) +} + +// GetTimeSource mocks base method. +func (m *MockContext) GetTimeSource() clock.TimeSource { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTimeSource") + ret0, _ := ret[0].(clock.TimeSource) + return ret0 +} + +// GetTimeSource indicates an expected call of GetTimeSource. +func (mr *MockContextMockRecorder) GetTimeSource() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTimeSource", reflect.TypeOf((*MockContext)(nil).GetTimeSource)) +} + +// GetTimerAckLevel mocks base method. +func (m *MockContext) GetTimerAckLevel() time.Time { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTimerAckLevel") + ret0, _ := ret[0].(time.Time) + return ret0 +} + +// GetTimerAckLevel indicates an expected call of GetTimerAckLevel. +func (mr *MockContextMockRecorder) GetTimerAckLevel() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTimerAckLevel", reflect.TypeOf((*MockContext)(nil).GetTimerAckLevel)) +} + +// GetTimerClusterAckLevel mocks base method. +func (m *MockContext) GetTimerClusterAckLevel(cluster string) time.Time { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTimerClusterAckLevel", cluster) + ret0, _ := ret[0].(time.Time) + return ret0 +} + +// GetTimerClusterAckLevel indicates an expected call of GetTimerClusterAckLevel. +func (mr *MockContextMockRecorder) GetTimerClusterAckLevel(cluster interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTimerClusterAckLevel", reflect.TypeOf((*MockContext)(nil).GetTimerClusterAckLevel), cluster) +} + +// GetTimerMaxReadLevel mocks base method. +func (m *MockContext) GetTimerMaxReadLevel(cluster string) time.Time { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTimerMaxReadLevel", cluster) + ret0, _ := ret[0].(time.Time) + return ret0 +} + +// GetTimerMaxReadLevel indicates an expected call of GetTimerMaxReadLevel. +func (mr *MockContextMockRecorder) GetTimerMaxReadLevel(cluster interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTimerMaxReadLevel", reflect.TypeOf((*MockContext)(nil).GetTimerMaxReadLevel), cluster) +} + +// GetTimerProcessingQueueStates mocks base method. +func (m *MockContext) GetTimerProcessingQueueStates(cluster string) []*types.ProcessingQueueState { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTimerProcessingQueueStates", cluster) + ret0, _ := ret[0].([]*types.ProcessingQueueState) + return ret0 +} + +// GetTimerProcessingQueueStates indicates an expected call of GetTimerProcessingQueueStates. +func (mr *MockContextMockRecorder) GetTimerProcessingQueueStates(cluster interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTimerProcessingQueueStates", reflect.TypeOf((*MockContext)(nil).GetTimerProcessingQueueStates), cluster) +} + +// GetTransferAckLevel mocks base method. +func (m *MockContext) GetTransferAckLevel() int64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTransferAckLevel") + ret0, _ := ret[0].(int64) + return ret0 +} + +// GetTransferAckLevel indicates an expected call of GetTransferAckLevel. +func (mr *MockContextMockRecorder) GetTransferAckLevel() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTransferAckLevel", reflect.TypeOf((*MockContext)(nil).GetTransferAckLevel)) +} + +// GetTransferClusterAckLevel mocks base method. +func (m *MockContext) GetTransferClusterAckLevel(cluster string) int64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTransferClusterAckLevel", cluster) + ret0, _ := ret[0].(int64) + return ret0 +} + +// GetTransferClusterAckLevel indicates an expected call of GetTransferClusterAckLevel. +func (mr *MockContextMockRecorder) GetTransferClusterAckLevel(cluster interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTransferClusterAckLevel", reflect.TypeOf((*MockContext)(nil).GetTransferClusterAckLevel), cluster) +} + +// GetTransferMaxReadLevel mocks base method. +func (m *MockContext) GetTransferMaxReadLevel() int64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTransferMaxReadLevel") + ret0, _ := ret[0].(int64) + return ret0 +} + +// GetTransferMaxReadLevel indicates an expected call of GetTransferMaxReadLevel. +func (mr *MockContextMockRecorder) GetTransferMaxReadLevel() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTransferMaxReadLevel", reflect.TypeOf((*MockContext)(nil).GetTransferMaxReadLevel)) +} + +// GetTransferProcessingQueueStates mocks base method. +func (m *MockContext) GetTransferProcessingQueueStates(cluster string) []*types.ProcessingQueueState { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTransferProcessingQueueStates", cluster) + ret0, _ := ret[0].([]*types.ProcessingQueueState) + return ret0 +} + +// GetTransferProcessingQueueStates indicates an expected call of GetTransferProcessingQueueStates. +func (mr *MockContextMockRecorder) GetTransferProcessingQueueStates(cluster interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTransferProcessingQueueStates", reflect.TypeOf((*MockContext)(nil).GetTransferProcessingQueueStates), cluster) +} + +// GetWorkflowExecution mocks base method. +func (m *MockContext) GetWorkflowExecution(ctx context.Context, request *persistence.GetWorkflowExecutionRequest) (*persistence.GetWorkflowExecutionResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetWorkflowExecution", ctx, request) + ret0, _ := ret[0].(*persistence.GetWorkflowExecutionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetWorkflowExecution indicates an expected call of GetWorkflowExecution. +func (mr *MockContextMockRecorder) GetWorkflowExecution(ctx, request interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetWorkflowExecution", reflect.TypeOf((*MockContext)(nil).GetWorkflowExecution), ctx, request) +} + +// PreviousShardOwnerWasDifferent mocks base method. +func (m *MockContext) PreviousShardOwnerWasDifferent() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PreviousShardOwnerWasDifferent") + ret0, _ := ret[0].(bool) + return ret0 +} + +// PreviousShardOwnerWasDifferent indicates an expected call of PreviousShardOwnerWasDifferent. +func (mr *MockContextMockRecorder) PreviousShardOwnerWasDifferent() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PreviousShardOwnerWasDifferent", reflect.TypeOf((*MockContext)(nil).PreviousShardOwnerWasDifferent)) +} + +// ReplicateFailoverMarkers mocks base method. +func (m *MockContext) ReplicateFailoverMarkers(ctx context.Context, markers []*persistence.FailoverMarkerTask) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReplicateFailoverMarkers", ctx, markers) + ret0, _ := ret[0].(error) + return ret0 +} + +// ReplicateFailoverMarkers indicates an expected call of ReplicateFailoverMarkers. +func (mr *MockContextMockRecorder) ReplicateFailoverMarkers(ctx, markers interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReplicateFailoverMarkers", reflect.TypeOf((*MockContext)(nil).ReplicateFailoverMarkers), ctx, markers) +} + +// SetCurrentTime mocks base method. +func (m *MockContext) SetCurrentTime(cluster string, currentTime time.Time) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetCurrentTime", cluster, currentTime) +} + +// SetCurrentTime indicates an expected call of SetCurrentTime. +func (mr *MockContextMockRecorder) SetCurrentTime(cluster, currentTime interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetCurrentTime", reflect.TypeOf((*MockContext)(nil).SetCurrentTime), cluster, currentTime) +} + +// SetEngine mocks base method. +func (m *MockContext) SetEngine(arg0 engine.Engine) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetEngine", arg0) +} + +// SetEngine indicates an expected call of SetEngine. +func (mr *MockContextMockRecorder) SetEngine(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetEngine", reflect.TypeOf((*MockContext)(nil).SetEngine), arg0) +} + +// UpdateClusterReplicationLevel mocks base method. +func (m *MockContext) UpdateClusterReplicationLevel(cluster string, lastTaskID int64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateClusterReplicationLevel", cluster, lastTaskID) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateClusterReplicationLevel indicates an expected call of UpdateClusterReplicationLevel. +func (mr *MockContextMockRecorder) UpdateClusterReplicationLevel(cluster, lastTaskID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateClusterReplicationLevel", reflect.TypeOf((*MockContext)(nil).UpdateClusterReplicationLevel), cluster, lastTaskID) +} + +// UpdateCrossClusterProcessingQueueStates mocks base method. +func (m *MockContext) UpdateCrossClusterProcessingQueueStates(cluster string, states []*types.ProcessingQueueState) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateCrossClusterProcessingQueueStates", cluster, states) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateCrossClusterProcessingQueueStates indicates an expected call of UpdateCrossClusterProcessingQueueStates. +func (mr *MockContextMockRecorder) UpdateCrossClusterProcessingQueueStates(cluster, states interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateCrossClusterProcessingQueueStates", reflect.TypeOf((*MockContext)(nil).UpdateCrossClusterProcessingQueueStates), cluster, states) +} + +// UpdateDomainNotificationVersion mocks base method. +func (m *MockContext) UpdateDomainNotificationVersion(domainNotificationVersion int64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateDomainNotificationVersion", domainNotificationVersion) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateDomainNotificationVersion indicates an expected call of UpdateDomainNotificationVersion. +func (mr *MockContextMockRecorder) UpdateDomainNotificationVersion(domainNotificationVersion interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateDomainNotificationVersion", reflect.TypeOf((*MockContext)(nil).UpdateDomainNotificationVersion), domainNotificationVersion) +} + +// UpdateTimerAckLevel mocks base method. +func (m *MockContext) UpdateTimerAckLevel(ackLevel time.Time) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateTimerAckLevel", ackLevel) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateTimerAckLevel indicates an expected call of UpdateTimerAckLevel. +func (mr *MockContextMockRecorder) UpdateTimerAckLevel(ackLevel interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTimerAckLevel", reflect.TypeOf((*MockContext)(nil).UpdateTimerAckLevel), ackLevel) +} + +// UpdateTimerClusterAckLevel mocks base method. +func (m *MockContext) UpdateTimerClusterAckLevel(cluster string, ackLevel time.Time) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateTimerClusterAckLevel", cluster, ackLevel) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateTimerClusterAckLevel indicates an expected call of UpdateTimerClusterAckLevel. +func (mr *MockContextMockRecorder) UpdateTimerClusterAckLevel(cluster, ackLevel interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTimerClusterAckLevel", reflect.TypeOf((*MockContext)(nil).UpdateTimerClusterAckLevel), cluster, ackLevel) +} + +// UpdateTimerFailoverLevel mocks base method. +func (m *MockContext) UpdateTimerFailoverLevel(failoverID string, level TimerFailoverLevel) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateTimerFailoverLevel", failoverID, level) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateTimerFailoverLevel indicates an expected call of UpdateTimerFailoverLevel. +func (mr *MockContextMockRecorder) UpdateTimerFailoverLevel(failoverID, level interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTimerFailoverLevel", reflect.TypeOf((*MockContext)(nil).UpdateTimerFailoverLevel), failoverID, level) +} + +// UpdateTimerMaxReadLevel mocks base method. +func (m *MockContext) UpdateTimerMaxReadLevel(cluster string) time.Time { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateTimerMaxReadLevel", cluster) + ret0, _ := ret[0].(time.Time) + return ret0 +} + +// UpdateTimerMaxReadLevel indicates an expected call of UpdateTimerMaxReadLevel. +func (mr *MockContextMockRecorder) UpdateTimerMaxReadLevel(cluster interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTimerMaxReadLevel", reflect.TypeOf((*MockContext)(nil).UpdateTimerMaxReadLevel), cluster) +} + +// UpdateTimerProcessingQueueStates mocks base method. +func (m *MockContext) UpdateTimerProcessingQueueStates(cluster string, states []*types.ProcessingQueueState) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateTimerProcessingQueueStates", cluster, states) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateTimerProcessingQueueStates indicates an expected call of UpdateTimerProcessingQueueStates. +func (mr *MockContextMockRecorder) UpdateTimerProcessingQueueStates(cluster, states interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTimerProcessingQueueStates", reflect.TypeOf((*MockContext)(nil).UpdateTimerProcessingQueueStates), cluster, states) +} + +// UpdateTransferAckLevel mocks base method. +func (m *MockContext) UpdateTransferAckLevel(ackLevel int64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateTransferAckLevel", ackLevel) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateTransferAckLevel indicates an expected call of UpdateTransferAckLevel. +func (mr *MockContextMockRecorder) UpdateTransferAckLevel(ackLevel interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTransferAckLevel", reflect.TypeOf((*MockContext)(nil).UpdateTransferAckLevel), ackLevel) +} + +// UpdateTransferClusterAckLevel mocks base method. +func (m *MockContext) UpdateTransferClusterAckLevel(cluster string, ackLevel int64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateTransferClusterAckLevel", cluster, ackLevel) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateTransferClusterAckLevel indicates an expected call of UpdateTransferClusterAckLevel. +func (mr *MockContextMockRecorder) UpdateTransferClusterAckLevel(cluster, ackLevel interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTransferClusterAckLevel", reflect.TypeOf((*MockContext)(nil).UpdateTransferClusterAckLevel), cluster, ackLevel) +} + +// UpdateTransferFailoverLevel mocks base method. +func (m *MockContext) UpdateTransferFailoverLevel(failoverID string, level TransferFailoverLevel) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateTransferFailoverLevel", failoverID, level) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateTransferFailoverLevel indicates an expected call of UpdateTransferFailoverLevel. +func (mr *MockContextMockRecorder) UpdateTransferFailoverLevel(failoverID, level interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTransferFailoverLevel", reflect.TypeOf((*MockContext)(nil).UpdateTransferFailoverLevel), failoverID, level) +} + +// UpdateTransferProcessingQueueStates mocks base method. +func (m *MockContext) UpdateTransferProcessingQueueStates(cluster string, states []*types.ProcessingQueueState) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateTransferProcessingQueueStates", cluster, states) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateTransferProcessingQueueStates indicates an expected call of UpdateTransferProcessingQueueStates. +func (mr *MockContextMockRecorder) UpdateTransferProcessingQueueStates(cluster, states interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTransferProcessingQueueStates", reflect.TypeOf((*MockContext)(nil).UpdateTransferProcessingQueueStates), cluster, states) +} + +// UpdateWorkflowExecution mocks base method. +func (m *MockContext) UpdateWorkflowExecution(ctx context.Context, request *persistence.UpdateWorkflowExecutionRequest) (*persistence.UpdateWorkflowExecutionResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateWorkflowExecution", ctx, request) + ret0, _ := ret[0].(*persistence.UpdateWorkflowExecutionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UpdateWorkflowExecution indicates an expected call of UpdateWorkflowExecution. +func (mr *MockContextMockRecorder) UpdateWorkflowExecution(ctx, request interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateWorkflowExecution", reflect.TypeOf((*MockContext)(nil).UpdateWorkflowExecution), ctx, request) +} + +// ValidateAndUpdateFailoverMarkers mocks base method. +func (m *MockContext) ValidateAndUpdateFailoverMarkers() ([]*types.FailoverMarkerAttributes, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ValidateAndUpdateFailoverMarkers") + ret0, _ := ret[0].([]*types.FailoverMarkerAttributes) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ValidateAndUpdateFailoverMarkers indicates an expected call of ValidateAndUpdateFailoverMarkers. +func (mr *MockContextMockRecorder) ValidateAndUpdateFailoverMarkers() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ValidateAndUpdateFailoverMarkers", reflect.TypeOf((*MockContext)(nil).ValidateAndUpdateFailoverMarkers)) +} From addd9da20b48a1f85fb67535614416d3292984d6 Mon Sep 17 00:00:00 2001 From: David Porter Date: Wed, 6 Mar 2024 16:33:01 -0800 Subject: [PATCH 02/12] feedback --- .../execution/mutable_state_builder_test.go | 14 ++++++++------ service/history/shard/context.go | 2 +- .../shard/{interface_mock.go => context_mock.go} | 0 3 files changed, 9 insertions(+), 7 deletions(-) rename service/history/shard/{interface_mock.go => context_mock.go} (100%) diff --git a/service/history/execution/mutable_state_builder_test.go b/service/history/execution/mutable_state_builder_test.go index 98e276dfcc9..fa96f5d2465 100644 --- a/service/history/execution/mutable_state_builder_test.go +++ b/service/history/execution/mutable_state_builder_test.go @@ -1088,7 +1088,6 @@ var exampleStartEvent = &types.HistoryEvent{ } func TestGetCompletionEvent(t *testing.T) { - tests := map[string]struct { currentState *mutableStateBuilder @@ -1188,13 +1187,14 @@ func TestGetCompletionEvent(t *testing.T) { res, err := td.currentState.GetCompletionEvent(context.Background()) assert.Equal(t, td.expectedResult, res) - assert.Equal(t, td.expectedErr, err) + if td.expectedErr != nil { + assert.ErrorAs(t, td.expectedErr, &err) + } }) } } func TestGetStartEvent(t *testing.T) { - tests := map[string]struct { currentState *mutableStateBuilder @@ -1231,14 +1231,14 @@ func TestGetStartEvent(t *testing.T) { historyManagerAffordance: func(historyManager *persistence.MockHistoryManager) { historyManager.EXPECT().ReadHistoryBranch(gomock.Any(), gomock.Any()).Return(nil, errors.New("an error")) }, - expectedErr: &types.InternalServiceError{Message: "unable to get workflow start event"}, + expectedErr: types.InternalServiceError{Message: "unable to get workflow start event"}, }, "Getting a start event but hitting a 'transient' error when reaching into history. This should be passed back up the call stack": { currentState: exampleMutableStateForClosedWF, historyManagerAffordance: func(historyManager *persistence.MockHistoryManager) { historyManager.EXPECT().ReadHistoryBranch(gomock.Any(), gomock.Any()).Return(nil, &types.InternalServiceError{Message: "an error"}) }, - expectedErr: &types.InternalServiceError{Message: "an error"}, + expectedErr: types.InternalServiceError{Message: "an error"}, }, } @@ -1261,7 +1261,9 @@ func TestGetStartEvent(t *testing.T) { res, err := td.currentState.GetStartEvent(context.Background()) assert.Equal(t, td.expectedResult, res) - assert.Equal(t, td.expectedErr, err) + if td.expectedErr != nil { + assert.ErrorAs(t, err, &td.expectedErr) + } }) } } diff --git a/service/history/shard/context.go b/service/history/shard/context.go index f5859446053..26f9c4531bc 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interface_mock.go -package shard github.com/uber/cadence/history/shard/context Context +//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination context_mock.go -package shard github.com/uber/cadence/history/shard/context Context package shard diff --git a/service/history/shard/interface_mock.go b/service/history/shard/context_mock.go similarity index 100% rename from service/history/shard/interface_mock.go rename to service/history/shard/context_mock.go From 559dcec985503b3564b981a85a4284c53d055c9d Mon Sep 17 00:00:00 2001 From: David Porter Date: Fri, 8 Mar 2024 13:22:21 -0800 Subject: [PATCH 03/12] WIP --- common/convert.go | 6 + go.mod | 2 + go.sum | 4 + service/history/config/config.go | 1 + .../execution/mutable_state_builder.go | 17 ++ .../execution/mutable_state_builder_test.go | 218 ++++++++++++++++++ 6 files changed, 248 insertions(+) diff --git a/common/convert.go b/common/convert.go index 7d0f256b306..95ea0ccc1f0 100644 --- a/common/convert.go +++ b/common/convert.go @@ -28,6 +28,12 @@ import ( s "github.com/uber/cadence/.gen/go/shared" ) +// Returns a pointer to the given value. +// todo (david.porter) Remove the remaining helpers here which are now obsolete +func Ptr[T any](v T) *T { + return &v +} + // IntPtr makes a copy and returns the pointer to an int. func IntPtr(v int) *int { return &v diff --git a/go.mod b/go.mod index 487f6339943..321456ceb83 100644 --- a/go.mod +++ b/go.mod @@ -93,6 +93,7 @@ require ( github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-uuid v1.0.2 // indirect + github.com/hexops/valast v1.4.4 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect github.com/jcmturner/gofork v1.0.0 // indirect @@ -142,6 +143,7 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect honnef.co/go/tools v0.3.2 // indirect + mvdan.cc/gofumpt v0.4.0 // indirect ) // ringpop-go and tchannel-go depends on older version of thrift, yarpc brings up newer version diff --git a/go.sum b/go.sum index 2e5d16db790..da0ecc2e72b 100644 --- a/go.sum +++ b/go.sum @@ -224,6 +224,8 @@ github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2I github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-version v1.2.0 h1:3vNe/fWF5CBgRIguda1meWhsZHy3m8gCJ5wx+dIzX/E= github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/hexops/valast v1.4.4 h1:rETyycw+/L2ZVJHHNxEBgh8KUn+87WugH9MxcEv9PGs= +github.com/hexops/valast v1.4.4/go.mod h1:Jcy1pNH7LNraVaAZDLyv21hHg2WBv9Nf9FL6fGxU7o4= github.com/iancoleman/strcase v0.2.0 h1:05I4QRnGpI0m37iZQRuskXh+w77mr6Z41lwQzuHLwW0= github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= @@ -766,6 +768,8 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.3.2 h1:ytYb4rOqyp1TSa2EPvNVwtPQJctSELKaMyLfqNP4+34= honnef.co/go/tools v0.3.2/go.mod h1:jzwdWgg7Jdq75wlfblQxO4neNaFFSvgc1tD5Wv8U0Yw= +mvdan.cc/gofumpt v0.4.0 h1:JVf4NN1mIpHogBj7ABpgOyZc65/UUOkKQFkoURsz4MM= +mvdan.cc/gofumpt v0.4.0/go.mod h1:PljLOHDeZqgS8opHRKLzp2It2VBuSdteAgqUfzMTxlQ= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/service/history/config/config.go b/service/history/config/config.go index 3ca308f24cf..2d429051827 100644 --- a/service/history/config/config.go +++ b/service/history/config/config.go @@ -648,6 +648,7 @@ func NewForTestByShardNumber(shardNumber int) *Config { config.MaxActivityCountDispatchByDomain = dc.GetIntPropertyFilteredByDomain(dynamicconfig.MaxActivityCountDispatchByDomain) config.EnableCrossClusterOperationsForDomain = dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableCrossClusterOperationsForDomain) config.NormalDecisionScheduleToStartMaxAttempts = dc.GetIntPropertyFilteredByDomain(dynamicconfig.NormalDecisionScheduleToStartMaxAttempts) + config.NormalDecisionScheduleToStartTimeout = dc.GetDurationPropertyFilteredByDomain(dynamicconfig.NormalDecisionScheduleToStartTimeout) config.PendingActivityValidationEnabled = dc.GetBoolProperty(dynamicconfig.EnablePendingActivityValidation) config.QueueProcessorEnableGracefulSyncShutdown = dc.GetBoolProperty(dynamicconfig.QueueProcessorEnableGracefulSyncShutdown) config.QueueProcessorSplitMaxLevel = dc.GetIntProperty(dynamicconfig.QueueProcessorSplitMaxLevel) diff --git a/service/history/execution/mutable_state_builder.go b/service/history/execution/mutable_state_builder.go index 4058dc2f424..1b725c7d904 100644 --- a/service/history/execution/mutable_state_builder.go +++ b/service/history/execution/mutable_state_builder.go @@ -3369,6 +3369,13 @@ func (e *mutableStateBuilder) AddContinueAsNewEvent( attributes *types.ContinueAsNewWorkflowExecutionDecisionAttributes, ) (*types.HistoryEvent, MutableState, error) { + // fmt.Println("------") + // fmt.Println("starting state - return value - execution info") + // fmt.Println(valast.String(e.executionInfo)) + // fmt.Println("starting state - return value - history info") + // fmt.Println(valast.String(e.hBuilder.history)) + // fmt.Println("------") + opTag := tag.WorkflowActionWorkflowContinueAsNew if err := e.checkMutability(opTag); err != nil { return nil, nil, err @@ -3434,6 +3441,16 @@ func (e *mutableStateBuilder) AddContinueAsNewEvent( return nil, nil, err } + // fmt.Println("------") + // fmt.Println("e existing state builder - execution info", valast.String(e.executionInfo)) + // fmt.Println("e existing state builder - execution info", valast.String(e.hBuilder.history)) + // fmt.Println("------") + // fmt.Println("newStateBuilder - return value - execution info") + // fmt.Println(valast.String(newStateBuilder.executionInfo)) + // fmt.Println("newStateBuilder - return value - history info") + // fmt.Println(valast.String(newStateBuilder.hBuilder.history)) + // fmt.Println("------") + return continueAsNewEvent, newStateBuilder, nil } diff --git a/service/history/execution/mutable_state_builder_test.go b/service/history/execution/mutable_state_builder_test.go index fa96f5d2465..37eeddf3bd3 100644 --- a/service/history/execution/mutable_state_builder_test.go +++ b/service/history/execution/mutable_state_builder_test.go @@ -1267,3 +1267,221 @@ func TestGetStartEvent(t *testing.T) { }) } } + +// func TestAddContinueAsNewEvent(t *testing.T) { + +// firstEventID := int64(15) +// decisionCompletedEventID := int64(15) + +// startExecutionInfo := &persistence.WorkflowExecutionInfo{ +// DomainID: "5391dbea-5b30-4323-82ca-e1c95339bb3e", +// WorkflowID: "helloworld_bfa410d9-9f49-4bcb-a943-0f3ceb252da2", +// RunID: "c6702a46-a1f0-42d2-9de7-8aca6ed6795f", +// FirstExecutionRunID: "c6702a46-a1f0-42d2-9de7-8aca6ed6795f", +// InitiatedID: -7, +// CompletionEventBatchID: 0, +// TaskList: "tasklist-name", +// WorkflowTypeName: "test-workflow", +// WorkflowTimeout: 60, +// DecisionStartToCloseTimeout: 60, +// State: 1, +// CloseStatus: 0, +// LastFirstEventID: 14, +// LastEventTaskID: 11534369, +// NextEventID: 16, +// LastProcessedEvent: 14, +// CreateRequestID: "5c0be655-1efc-4dfe-8f69-1f59e59c13ef", +// DecisionVersion: -24, +// DecisionScheduleID: -23, +// DecisionStartedID: -23, +// DecisionRequestID: "emptyUuid", +// DecisionTimeout: 0, +// DecisionAttempt: 0, +// DecisionStartedTimestamp: 0, +// DecisionScheduledTimestamp: 0, +// DecisionOriginalScheduledTimestamp: 1709790036041553000, +// CancelRequested: false, +// StickyTaskList: "david-porter-DVFG73D710:04be47fa-2381-469f-b2ea-1253271ad116", +// StickyScheduleToStartTimeout: 5, +// ClientLibraryVersion: "0.18.4", +// ClientFeatureVersion: "1.7.0", +// ClientImpl: "uber-go", +// Attempt: 0, +// } + +// endExecutionInfo := &persistence.WorkflowExecutionInfo{ +// DomainID: "5391dbea-5b30-4323-82ca-e1c95339bb3e", +// WorkflowID: "helloworld_bfa410d9-9f49-4bcb-a943-0f3ceb252da2", +// RunID: "c6702a46-a1f0-42d2-9de7-8aca6ed6795f", +// FirstExecutionRunID: "c6702a46-a1f0-42d2-9de7-8aca6ed6795f", +// InitiatedID: -7, +// CompletionEventBatchID: 15, +// TaskList: "tasklist-name", +// WorkflowTypeName: "test-workflow", +// WorkflowTimeout: 60, +// DecisionStartToCloseTimeout: 60, +// State: 2, +// CloseStatus: 5, +// LastFirstEventID: 14, +// LastEventTaskID: 11534369, +// NextEventID: 17, +// LastProcessedEvent: 14, +// CreateRequestID: "5c0be655-1efc-4dfe-8f69-1f59e59c13ef", +// DecisionVersion: -24, +// DecisionScheduleID: -23, +// DecisionStartedID: -23, +// DecisionRequestID: "emptyUuid", +// DecisionTimeout: 0, +// DecisionAttempt: 0, +// DecisionOriginalScheduledTimestamp: 1709790036041553000, +// } + +// startHistory := []*types.HistoryEvent{ +// { +// ID: 15, +// Timestamp: common.Ptr(int64(1709791556528026000)), +// EventType: types.EventTypeDecisionTaskCompleted.Ptr(), +// Version: 1, +// TaskID: -1234, +// DecisionTaskCompletedEventAttributes: &types.DecisionTaskCompletedEventAttributes{ +// ScheduledEventID: 13, +// StartedEventID: 14, +// Identity: "27368@david-porter-DVFG73D710@helloWorldGroup@6027e9ee-048e-4f67-8d88-27883c496901", +// BinaryChecksum: "6df03bf5110d681667852a8456519536", +// }, +// }, +// } + +// endHistory := []*types.HistoryEvent{ +// { +// ID: 15, +// Timestamp: common.Ptr(int64(1709791556528026000)), +// EventType: types.EventTypeDecisionTaskCompleted.Ptr(), +// Version: 1, +// TaskID: -1234, +// DecisionTaskCompletedEventAttributes: &types.DecisionTaskCompletedEventAttributes{ +// ScheduledEventID: 13, +// StartedEventID: 14, +// Identity: "27368@david-porter-DVFG73D710@helloWorldGroup@6027e9ee-048e-4f67-8d88-27883c496901", +// BinaryChecksum: "6df03bf5110d681667852a8456519536", +// }, +// }, +// { +// ID: 16, +// Timestamp: common.Ptr(int64(1709791556529788000)), +// EventType: common.Ptr(types.EventTypeWorkflowExecutionContinuedAsNew), +// Version: 1, +// TaskID: -1234, +// WorkflowExecutionContinuedAsNewEventAttributes: &types.WorkflowExecutionContinuedAsNewEventAttributes{ +// NewExecutionRunID: "1b094f71-9c23-4177-8cf9-7f723cc52955", +// WorkflowType: &types.WorkflowType{ +// Name: "helloWorldWorkflow", +// }, +// TaskList: &types.TaskList{ +// Name: "helloWorldGroup", +// }, +// Input: []byte("some-input"), +// ExecutionStartToCloseTimeoutSeconds: common.Ptr(int32(60)), +// TaskStartToCloseTimeoutSeconds: common.Ptr(int32(60)), +// DecisionTaskCompletedEventID: 15, +// Initiator: common.Ptr(types.ContinueAsNewInitiatorDecider), +// }, +// }, +// } + +// tests := map[string]struct { +// currentState *mutableStateBuilder +// // history is a substruct of current state, but because they're both +// // pointing to each other, they're assembled at the test start +// currentHistoryBuilder *HistoryBuilder + +// // expectations +// historyManagerAffordance func(historyManager *persistence.MockHistoryManager) + +// // this is a somewhat confusing API, both returning a new cloned state and mutating the existing +// // current state so this will be comparing both (they should be the same) +// expectedEndState *mutableStateBuilder +// expectedErr error +// }{ +// "a continue-as-new event with no errors": { +// currentHistoryBuilder: &HistoryBuilder{ +// history: startHistory, +// }, +// currentState: &mutableStateBuilder{ +// domainEntry: cache.NewDomainCacheEntryForTest(&persistence.DomainInfo{ID: domainID}, &persistence.DomainConfig{}, true, &persistence.DomainReplicationConfig{}, 1, nil), +// executionInfo: startExecutionInfo, +// logger: log.NewNoop(), +// config: config.NewForTest(), +// }, +// historyManagerAffordance: func(historyManager *persistence.MockHistoryManager) { +// // when it gets the first-run ID +// historyManager.EXPECT().ReadHistoryBranch(gomock.Any(), gomock.Any()).Return(&persistence.ReadHistoryBranchResponse{ +// HistoryEvents: []*types.HistoryEvent{ +// { +// ID: 1, +// WorkflowExecutionStartedEventAttributes: &types.WorkflowExecutionStartedEventAttributes{ +// FirstExecutionRunID: "some-first-run-id", +// WorkflowType: &types.WorkflowType{ +// Name: "test-workflow", +// }, +// }, +// }, +// }, +// }, nil) +// }, + +// expectedEndState: &mutableStateBuilder{ +// hBuilder: &HistoryBuilder{ +// history: endHistory, // insert Fukuyama joke +// }, +// executionInfo: endExecutionInfo, +// }, +// }, +// } + +// for name, td := range tests { +// t.Run(name, func(t *testing.T) { + +// ctrl := gomock.NewController(t) + +// historyManager := persistence.NewMockHistoryManager(ctrl) +// td.historyManagerAffordance(historyManager) + +// domainCache := cache.NewMockDomainCache(ctrl) +// domainCache.EXPECT().GetDomainName(gomock.Any()).Return("domain", nil) + +// td.currentState.eventsCache = events.NewCache(shardID, historyManager, config.NewForTest(), log.NewNoop(), metrics.NewNoopMetricsClient(), domainCache) + +// shardContext := shardCtx.NewMockContext(ctrl) +// shardContext.EXPECT().GetShardID().Return(123) +// shardContext.EXPECT().GetClusterMetadata().Return(cluster.Metadata{}).Times(2) +// shardContext.EXPECT().GetEventsCache().Return(td.currentState.eventsCache) +// shardContext.EXPECT().GetConfig().Return(td.currentState.config) +// shardContext.EXPECT().GetTimeSource().Return(clock.NewMockedTimeSource()) +// shardContext.EXPECT().GetMetricsClient().Return(metrics.NewNoopMetricsClient()) +// shardContext.EXPECT().GetDomainCache().Return(domainCache) + +// taskGenerator := NewMockMutableStateTaskGenerator(ctrl) + +// taskGenerator.EXPECT().GenerateWorkflowCloseTasks(gomock.Any(), td.currentState.config.WorkflowDeletionJitterRange("domain")) + +// td.currentState.shard = shardContext +// td.currentHistoryBuilder.msBuilder = td.currentState +// td.currentState.hBuilder = td.currentHistoryBuilder +// td.currentState.taskGenerator = taskGenerator + +// td.currentState.timeSource = clock.NewMockedTimeSourceAt(time.Unix(0, ts1)) + +// _, ms, err := td.currentState.AddContinueAsNewEvent(context.Background(), firstEventID, decisionCompletedEventID, "", &types.ContinueAsNewWorkflowExecutionDecisionAttributes{ +// WorkflowType: &types.WorkflowType{Name: "test-workflow"}, +// TaskList: &types.TaskList{Name: "tasklist-name"}, +// Input: []byte("some random workflow input"), +// }) + +// assert.Equal(t, td.expectedEndState, ms) +// if td.expectedErr != nil { +// assert.ErrorAs(t, err, &td.expectedErr) +// } +// }) +// } +// } From 4445b3d577d2e5ef25b271f789927920e6d3b301 Mon Sep 17 00:00:00 2001 From: David Porter Date: Sat, 9 Mar 2024 14:54:47 -0800 Subject: [PATCH 04/12] Passing --- .../execution/mutable_state_builder.go | 10 -------- ..._builder_add_continue_as_new_event_test.go | 23 +++++++++++-------- 2 files changed, 14 insertions(+), 19 deletions(-) diff --git a/service/history/execution/mutable_state_builder.go b/service/history/execution/mutable_state_builder.go index 5c03cfd14f4..4058dc2f424 100644 --- a/service/history/execution/mutable_state_builder.go +++ b/service/history/execution/mutable_state_builder.go @@ -3434,16 +3434,6 @@ func (e *mutableStateBuilder) AddContinueAsNewEvent( return nil, nil, err } - // fmt.Println("------") - // fmt.Println("e existing state builder - execution info", valast.String(e.executionInfo)) - // fmt.Println("e existing state builder - execution info", valast.String(e.hBuilder.history)) - // fmt.Println("------") - // fmt.Println("newStateBuilder - return value - execution info") - // fmt.Println(valast.String(newStateBuilder.executionInfo)) - // fmt.Println("newStateBuilder - return value - history info") - // fmt.Println(valast.String(newStateBuilder.hBuilder.history)) - // fmt.Println("------") - return continueAsNewEvent, newStateBuilder, nil } diff --git a/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go b/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go index b78ff87fb9b..4e1c36a5968 100644 --- a/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go +++ b/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go @@ -387,15 +387,20 @@ func TestAddContinueAsNewEvent(t *testing.T) { msb.taskGenerator = taskGenerator - _, returnedBuilder, err := msb.AddContinueAsNewEvent(context.Background(), firstEventID, decisionCompletedEventID, "", &types.ContinueAsNewWorkflowExecutionDecisionAttributes{ - WorkflowType: &types.WorkflowType{ - Name: "helloWorldWorkflow", - }, - TaskList: &types.TaskList{ - Name: "helloWorldGroup", - }, - Input: []uint8{110, 117, 108, 108, 10}, - }) + _, returnedBuilder, err := msb.AddContinueAsNewEvent(context.Background(), + firstEventID, + decisionCompletedEventID, + "", + &types.ContinueAsNewWorkflowExecutionDecisionAttributes{ + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(60), + WorkflowType: &types.WorkflowType{ + Name: "helloWorldWorkflow", + }, + TaskList: &types.TaskList{ + Name: "helloWorldGroup", + }, + Input: []uint8{110, 117, 108, 108, 10}, + }) resultExecutionInfo := returnedBuilder.GetExecutionInfo() From 953cac82b2d83422dd00a8d65f4284ba67488245 Mon Sep 17 00:00:00 2001 From: David Porter Date: Sat, 9 Mar 2024 15:08:21 -0800 Subject: [PATCH 05/12] History assertion complete --- ...e_builder_add_continue_as_new_event_test.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go b/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go index 4e1c36a5968..7965a9df5a0 100644 --- a/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go +++ b/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go @@ -174,7 +174,7 @@ func TestAddContinueAsNewEvent(t *testing.T) { Version: 1, TaskID: -1234, WorkflowExecutionContinuedAsNewEventAttributes: &types.WorkflowExecutionContinuedAsNewEventAttributes{ - NewExecutionRunID: "db647c4b-8759-47f1-8db3-57baff104b76", + NewExecutionRunID: "a run id", WorkflowType: &types.WorkflowType{ Name: "helloWorldWorkflow", }, @@ -193,7 +193,7 @@ func TestAddContinueAsNewEvent(t *testing.T) { expectedEndingReturnExecutionState := &persistence.WorkflowExecutionInfo{ DomainID: "5391dbea-5b30-4323-82ca-e1c95339bb3e", WorkflowID: "helloworld_b4db8bd0-74b7-4250-ade7-ac72a1efb171", - RunID: "db647c4b-8759-47f1-8db3-57baff104b76", + RunID: "a run id", FirstExecutionRunID: "5adce5c5-b7b2-4418-9bf0-4207303f6343", InitiatedID: -23, TaskList: "helloWorldGroup", @@ -227,7 +227,7 @@ func TestAddContinueAsNewEvent(t *testing.T) { expectedEndingReturnHistoryState := []*types.HistoryEvent{ { ID: 1, - Timestamp: common.Ptr(int64(1709872131923540000)), + Timestamp: common.Ptr(int64(ts1)), EventType: common.Ptr(types.EventTypeWorkflowExecutionStarted), Version: 1, TaskID: -1234, @@ -240,17 +240,17 @@ func TestAddContinueAsNewEvent(t *testing.T) { ExecutionStartToCloseTimeoutSeconds: common.Ptr(int32(60)), TaskStartToCloseTimeoutSeconds: common.Ptr(int32(60)), ContinuedExecutionRunID: "5adce5c5-b7b2-4418-9bf0-4207303f6343", - OriginalExecutionRunID: "db647c4b-8759-47f1-8db3-57baff104b76", + OriginalExecutionRunID: "a run id", FirstExecutionRunID: "5adce5c5-b7b2-4418-9bf0-4207303f6343", PrevAutoResetPoints: &types.ResetPoints{Points: []*types.ResetPointInfo{{ BinaryChecksum: "6df03bf5110d681667852a8456519536", RunID: "5adce5c5-b7b2-4418-9bf0-4207303f6343", FirstDecisionCompletedID: 4, CreatedTimeNano: common.Ptr(int64(1709872121495904000)), - ExpiringTimeNano: common.Ptr(int64(1709937512898751000)), + ExpiringTimeNano: common.Ptr(int64(ts1)), Resettable: true, }}}, - Header: &types.Header{}, + Header: nil, }, }, { @@ -418,8 +418,14 @@ func TestAddContinueAsNewEvent(t *testing.T) { resultExecutionInfo.RunID = "a run id" td.expectedReturnedState.CreateRequestID = "a request id" resultExecutionInfo.CreateRequestID = "a request id" + for _, historyEvent := range returnedBuilder.GetHistoryBuilder().history { + if historyEvent.WorkflowExecutionStartedEventAttributes != nil { + historyEvent.WorkflowExecutionStartedEventAttributes.OriginalExecutionRunID = "a run id" + } + } assert.Equal(t, td.expectedReturnedState, resultExecutionInfo) + assert.Equal(t, td.expectedReturnedHistory, returnedBuilder.GetHistoryBuilder().history) }) } } From 1824b9acda8ffdcd17b4ddac36ab9f3c79c22583 Mon Sep 17 00:00:00 2001 From: David Porter Date: Sat, 9 Mar 2024 15:24:02 -0800 Subject: [PATCH 06/12] Made a little less janky --- ..._builder_add_continue_as_new_event_test.go | 256 ++---------------- 1 file changed, 30 insertions(+), 226 deletions(-) diff --git a/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go b/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go index 7965a9df5a0..f36d5cd7c39 100644 --- a/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go +++ b/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go @@ -115,81 +115,6 @@ func TestAddContinueAsNewEvent(t *testing.T) { }, }} - expectedEndingBuilderExecutionInfo := &persistence.WorkflowExecutionInfo{ - DomainID: "5391dbea-5b30-4323-82ca-e1c95339bb3e", - WorkflowID: "helloworld_b4db8bd0-74b7-4250-ade7-ac72a1efb171", - RunID: "5adce5c5-b7b2-4418-9bf0-4207303f6343", - FirstExecutionRunID: "5adce5c5-b7b2-4418-9bf0-4207303f6343", - InitiatedID: -7, - CompletionEventBatchID: 15, - TaskList: "helloWorldGroup", - WorkflowTypeName: "helloWorldWorkflow", - WorkflowTimeout: 60, - DecisionStartToCloseTimeout: 60, - State: 2, - CloseStatus: 5, - LastFirstEventID: 14, - LastEventTaskID: 15728673, - NextEventID: 17, - LastProcessedEvent: 14, - StartTimestamp: time.Date(2024, 3, 8, 4, 28, 41, 415000000, time.UTC), - LastUpdatedTimestamp: time.Date(2024, 3, 7, 20, 28, 51, 563480000, time.Local), - CreateRequestID: "b086d62c-dd2b-4bbc-9143-5940516acbfe", - DecisionVersion: -24, - DecisionScheduleID: -23, - DecisionStartedID: -23, - DecisionRequestID: "emptyUuid", - DecisionOriginalScheduledTimestamp: 1709872131542474000, - AutoResetPoints: &types.ResetPoints{ - Points: []*types.ResetPointInfo{{ - BinaryChecksum: "6df03bf5110d681667852a8456519536", - RunID: "5adce5c5-b7b2-4418-9bf0-4207303f6343", - FirstDecisionCompletedID: 4, - CreatedTimeNano: common.Ptr(int64(1709872121495904000)), - ExpiringTimeNano: common.Ptr(int64(ts1)), - Resettable: true, - }}, - }, - SearchAttributes: map[string][]uint8{"BinaryChecksums": {91, 34, 54, 100, 102, 48, 51, 98, 102, 53, 49, 49, 48, 100, 54, 56, 49, 54, 54, 55, 56, 53, 50, 97, 56, 52, 53, 54, 53, 49, 57, 53, 51, 54, 34, 93}}, - } - - expectedEndingBuilderHistoryState := []*types.HistoryEvent{ - { - ID: 15, - Timestamp: common.Ptr(int64(1709872131580456000)), - EventType: common.Ptr(types.EventTypeDecisionTaskCompleted), - Version: 1, - TaskID: -1234, - DecisionTaskCompletedEventAttributes: &types.DecisionTaskCompletedEventAttributes{ - ScheduledEventID: 13, - StartedEventID: 14, - Identity: "27368@david-porter-DVFG73D710@helloWorldGroup@6027e9ee-048e-4f67-8d88-27883c496901", - BinaryChecksum: "6df03bf5110d681667852a8456519536", - }, - }, - { - ID: 16, - Timestamp: common.Ptr(int64(1709872131923473000)), - EventType: common.Ptr(types.EventTypeWorkflowExecutionContinuedAsNew), - Version: 1, - TaskID: -1234, - WorkflowExecutionContinuedAsNewEventAttributes: &types.WorkflowExecutionContinuedAsNewEventAttributes{ - NewExecutionRunID: "a run id", - WorkflowType: &types.WorkflowType{ - Name: "helloWorldWorkflow", - }, - TaskList: &types.TaskList{Name: "helloWorldGroup"}, - Input: []uint8{110, 117, 108, 108, 10}, - ExecutionStartToCloseTimeoutSeconds: common.Ptr(int32(60)), - TaskStartToCloseTimeoutSeconds: common.Ptr(int32(60)), - DecisionTaskCompletedEventID: 15, - BackoffStartIntervalInSeconds: common.Ptr(int32(0)), - Initiator: common.Ptr(types.ContinueAsNewInitiatorDecider), - Header: &types.Header{}, - }, - }, - } - expectedEndingReturnExecutionState := &persistence.WorkflowExecutionInfo{ DomainID: "5391dbea-5b30-4323-82ca-e1c95339bb3e", WorkflowID: "helloworld_b4db8bd0-74b7-4250-ade7-ac72a1efb171", @@ -224,6 +149,7 @@ func TestAddContinueAsNewEvent(t *testing.T) { }}, }, } + expectedEndingReturnHistoryState := []*types.HistoryEvent{ { ID: 1, @@ -302,12 +228,11 @@ func TestAddContinueAsNewEvent(t *testing.T) { // expectations historyManagerAffordance func(historyManager *persistence.MockHistoryManager) + shardManagerAffordance func( shardContext *shardCtx.MockContext, msb *mutableStateBuilder, domainCache cache.DomainCache) + domainCacheAffordance func(domainCache *cache.MockDomainCache) + taskgeneratorAffordance func(taskGenerator *MockMutableStateTaskGenerator, msb *mutableStateBuilder) - // this is a somewhat confusing API, both returning a new cloned state and mutating the existing - // current state so this will be comparing both - expectedBuilderEndState *persistence.WorkflowExecutionInfo // this is what the MSB should end up as expectedReturnedState *persistence.WorkflowExecutionInfo // this is returned - expectedBuilderHistory []*types.HistoryEvent expectedReturnedHistory []*types.HistoryEvent expectedErr error }{ @@ -323,9 +248,21 @@ func TestAddContinueAsNewEvent(t *testing.T) { }, }, nil) }, - - expectedBuilderEndState: expectedEndingBuilderExecutionInfo, - expectedBuilderHistory: expectedEndingBuilderHistoryState, + shardManagerAffordance: func(shardContext *shardCtx.MockContext, msb *mutableStateBuilder, domainCache cache.DomainCache) { + shardContext.EXPECT().GetShardID().Return(123) + shardContext.EXPECT().GetClusterMetadata().Return(cluster.Metadata{}).Times(2) + shardContext.EXPECT().GetEventsCache().Return(msb.eventsCache) + shardContext.EXPECT().GetConfig().Return(msb.config) + shardContext.EXPECT().GetTimeSource().Return(msb.timeSource) + shardContext.EXPECT().GetMetricsClient().Return(metrics.NewNoopMetricsClient()) + shardContext.EXPECT().GetDomainCache().Return(domainCache) + }, + domainCacheAffordance: func(domainCache *cache.MockDomainCache) { + domainCache.EXPECT().GetDomainName(gomock.Any()).Return("domain", nil) + }, + taskgeneratorAffordance: func(taskGenerator *MockMutableStateTaskGenerator, msb *mutableStateBuilder) { + taskGenerator.EXPECT().GenerateWorkflowCloseTasks(gomock.Any(), msb.config.WorkflowDeletionJitterRange("domain")) + }, expectedReturnedState: expectedEndingReturnExecutionState, expectedReturnedHistory: expectedEndingReturnHistoryState, @@ -334,15 +271,8 @@ func TestAddContinueAsNewEvent(t *testing.T) { for name, td := range tests { t.Run(name, func(t *testing.T) { - ctrl := gomock.NewController(t) - historyManager := persistence.NewMockHistoryManager(ctrl) - td.historyManagerAffordance(historyManager) - - domainCache := cache.NewMockDomainCache(ctrl) - domainCache.EXPECT().GetDomainName(gomock.Any()).Return("domain", nil) - msb := &mutableStateBuilder{ domainEntry: cache.NewDomainCacheEntryForTest( &persistence.DomainInfo{ID: domainID}, @@ -356,37 +286,32 @@ func TestAddContinueAsNewEvent(t *testing.T) { logger: log.NewNoop(), config: config.NewForTest(), } - msb.timeSource = clock.NewMockedTimeSourceAt(time.Unix(0, ts1)) + shardContext := shardCtx.NewMockContext(ctrl) + historyManager := persistence.NewMockHistoryManager(ctrl) + domainCache := cache.NewMockDomainCache(ctrl) + taskGenerator := NewMockMutableStateTaskGenerator(ctrl) + + msb.timeSource = clock.NewMockedTimeSourceAt(time.Unix(0, ts1)) msb.eventsCache = events.NewCache(shardID, historyManager, config.NewForTest(), log.NewNoop(), metrics.NewNoopMetricsClient(), domainCache) - - shardContext := shardCtx.NewMockContext(ctrl) - shardContext.EXPECT().GetShardID().Return(123) - shardContext.EXPECT().GetClusterMetadata().Return(cluster.Metadata{}).Times(2) - shardContext.EXPECT().GetEventsCache().Return(msb.eventsCache) - shardContext.EXPECT().GetConfig().Return(msb.config) - shardContext.EXPECT().GetTimeSource().Return(msb.timeSource) - shardContext.EXPECT().GetMetricsClient().Return(metrics.NewNoopMetricsClient()) - shardContext.EXPECT().GetDomainCache().Return(domainCache) - - taskGenerator := NewMockMutableStateTaskGenerator(ctrl) - - taskGenerator.EXPECT().GenerateWorkflowCloseTasks(gomock.Any(), msb.config.WorkflowDeletionJitterRange("domain")) - msb.shard = shardContext msb.executionInfo = td.startingState msb.hBuilder = &HistoryBuilder{ history: td.startingHistory, msBuilder: msb, } - msb.taskGenerator = taskGenerator + td.historyManagerAffordance(historyManager) + td.domainCacheAffordance(domainCache) + td.taskgeneratorAffordance(taskGenerator, msb) + td.shardManagerAffordance(shardContext, msb, domainCache) + _, returnedBuilder, err := msb.AddContinueAsNewEvent(context.Background(), firstEventID, decisionCompletedEventID, @@ -401,13 +326,8 @@ func TestAddContinueAsNewEvent(t *testing.T) { }, Input: []uint8{110, 117, 108, 108, 10}, }) - resultExecutionInfo := returnedBuilder.GetExecutionInfo() - // assert.Equal(t, td.expectedBuilderEndState, msb.executionInfo) - // assert.Equal(t, td.expectedReturnedState, returnedBuilder.GetExecutionInfo()) - - // assert.Equal(t, td.expectedBuilderHistory, msb.hBuilder.history) if td.expectedErr != nil { assert.ErrorAs(t, err, &td.expectedErr) } @@ -429,119 +349,3 @@ func TestAddContinueAsNewEvent(t *testing.T) { }) } } - -// startExecutionInfo := &persistence.WorkflowExecutionInfo{ -// DomainID: "5391dbea-5b30-4323-82ca-e1c95339bb3e", -// WorkflowID: "helloworld_bfa410d9-9f49-4bcb-a943-0f3ceb252da2", -// RunID: "c6702a46-a1f0-42d2-9de7-8aca6ed6795f", -// FirstExecutionRunID: "c6702a46-a1f0-42d2-9de7-8aca6ed6795f", -// InitiatedID: -7, -// CompletionEventBatchID: 0, -// TaskList: "tasklist-name", -// WorkflowTypeName: "test-workflow", -// WorkflowTimeout: 60, -// DecisionStartToCloseTimeout: 60, -// State: 1, -// CloseStatus: 0, -// LastFirstEventID: 14, -// LastEventTaskID: 11534369, -// NextEventID: 16, -// LastProcessedEvent: 14, -// CreateRequestID: "5c0be655-1efc-4dfe-8f69-1f59e59c13ef", -// DecisionVersion: -24, -// DecisionScheduleID: -23, -// DecisionStartedID: -23, -// DecisionRequestID: "emptyUuid", -// DecisionTimeout: 0, -// DecisionAttempt: 0, -// DecisionStartedTimestamp: 0, -// DecisionScheduledTimestamp: 0, -// DecisionOriginalScheduledTimestamp: 1709790036041553000, -// CancelRequested: false, -// StickyTaskList: "david-porter-DVFG73D710:04be47fa-2381-469f-b2ea-1253271ad116", -// StickyScheduleToStartTimeout: 5, -// ClientLibraryVersion: "0.18.4", -// ClientFeatureVersion: "1.7.0", -// ClientImpl: "uber-go", -// Attempt: 0, -// } - -// endExecutionInfo := &persistence.WorkflowExecutionInfo{ -// DomainID: "5391dbea-5b30-4323-82ca-e1c95339bb3e", -// WorkflowID: "helloworld_bfa410d9-9f49-4bcb-a943-0f3ceb252da2", -// RunID: "c6702a46-a1f0-42d2-9de7-8aca6ed6795f", -// FirstExecutionRunID: "c6702a46-a1f0-42d2-9de7-8aca6ed6795f", -// InitiatedID: -7, -// CompletionEventBatchID: 15, -// TaskList: "tasklist-name", -// WorkflowTypeName: "test-workflow", -// WorkflowTimeout: 60, -// DecisionStartToCloseTimeout: 60, -// State: 2, -// CloseStatus: 5, -// LastFirstEventID: 14, -// LastEventTaskID: 11534369, -// NextEventID: 17, -// LastProcessedEvent: 14, -// CreateRequestID: "5c0be655-1efc-4dfe-8f69-1f59e59c13ef", -// DecisionVersion: -24, -// DecisionScheduleID: -23, -// DecisionStartedID: -23, -// DecisionRequestID: "emptyUuid", -// DecisionTimeout: 0, -// DecisionAttempt: 0, -// DecisionOriginalScheduledTimestamp: 1709790036041553000, -// } - -// startHistory := []*types.HistoryEvent{ -// { -// ID: 15, -// Timestamp: common.Ptr(int64(1709791556528026000)), -// EventType: types.EventTypeDecisionTaskCompleted.Ptr(), -// Version: 1, -// TaskID: -1234, -// DecisionTaskCompletedEventAttributes: &types.DecisionTaskCompletedEventAttributes{ -// ScheduledEventID: 13, -// StartedEventID: 14, -// Identity: "27368@david-porter-DVFG73D710@helloWorldGroup@6027e9ee-048e-4f67-8d88-27883c496901", -// BinaryChecksum: "6df03bf5110d681667852a8456519536", -// }, -// }, -// } - -// endHistory := []*types.HistoryEvent{ -// { -// ID: 15, -// Timestamp: common.Ptr(int64(1709791556528026000)), -// EventType: types.EventTypeDecisionTaskCompleted.Ptr(), -// Version: 1, -// TaskID: -1234, -// DecisionTaskCompletedEventAttributes: &types.DecisionTaskCompletedEventAttributes{ -// ScheduledEventID: 13, -// StartedEventID: 14, -// Identity: "27368@david-porter-DVFG73D710@helloWorldGroup@6027e9ee-048e-4f67-8d88-27883c496901", -// BinaryChecksum: "6df03bf5110d681667852a8456519536", -// }, -// }, -// { -// ID: 16, -// Timestamp: common.Ptr(int64(1709791556529788000)), -// EventType: common.Ptr(types.EventTypeWorkflowExecutionContinuedAsNew), -// Version: 1, -// TaskID: -1234, -// WorkflowExecutionContinuedAsNewEventAttributes: &types.WorkflowExecutionContinuedAsNewEventAttributes{ -// NewExecutionRunID: "1b094f71-9c23-4177-8cf9-7f723cc52955", -// WorkflowType: &types.WorkflowType{ -// Name: "helloWorldWorkflow", -// }, -// TaskList: &types.TaskList{ -// Name: "helloWorldGroup", -// }, -// Input: []byte("some-input"), -// ExecutionStartToCloseTimeoutSeconds: common.Ptr(int32(60)), -// TaskStartToCloseTimeoutSeconds: common.Ptr(int32(60)), -// DecisionTaskCompletedEventID: 15, -// Initiator: common.Ptr(types.ContinueAsNewInitiatorDecider), -// }, -// }, -// } From 8ec6e5efd86b5eac7941e2493c72ce2781a626ed Mon Sep 17 00:00:00 2001 From: David Porter Date: Sat, 9 Mar 2024 15:24:40 -0800 Subject: [PATCH 07/12] imports --- ...ble_state_builder_add_continue_as_new_event_test.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go b/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go index f36d5cd7c39..759057608e2 100644 --- a/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go +++ b/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go @@ -22,32 +22,22 @@ package execution import ( "context" - // "errors" "testing" "time" "github.com/golang/mock/gomock" - // "github.com/pborman/uuid" "github.com/stretchr/testify/assert" - // "github.com/stretchr/testify/require" - // "github.com/stretchr/testify/suite" - // "github.com/uber-go/tally" "github.com/uber/cadence/common" "github.com/uber/cadence/common/cache" - // "github.com/uber/cadence/common/checksum" "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/cluster" - // "github.com/uber/cadence/common/definition" - // "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/history/config" - // "github.com/uber/cadence/service/history/constants" "github.com/uber/cadence/service/history/events" - // "github.com/uber/cadence/service/history/shard" shardCtx "github.com/uber/cadence/service/history/shard" ) From f52b4791c5e5879a26095962f763190ce0650840 Mon Sep 17 00:00:00 2001 From: David Porter Date: Sat, 9 Mar 2024 15:25:10 -0800 Subject: [PATCH 08/12] fmt --- .../mutable_state_builder_add_continue_as_new_event_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go b/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go index 759057608e2..537fdd2aa8d 100644 --- a/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go +++ b/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go @@ -218,8 +218,8 @@ func TestAddContinueAsNewEvent(t *testing.T) { // expectations historyManagerAffordance func(historyManager *persistence.MockHistoryManager) - shardManagerAffordance func( shardContext *shardCtx.MockContext, msb *mutableStateBuilder, domainCache cache.DomainCache) - domainCacheAffordance func(domainCache *cache.MockDomainCache) + shardManagerAffordance func(shardContext *shardCtx.MockContext, msb *mutableStateBuilder, domainCache cache.DomainCache) + domainCacheAffordance func(domainCache *cache.MockDomainCache) taskgeneratorAffordance func(taskGenerator *MockMutableStateTaskGenerator, msb *mutableStateBuilder) expectedReturnedState *persistence.WorkflowExecutionInfo // this is returned From dfd64ee7c431b83ef06fe5b1febbe1953d994165 Mon Sep 17 00:00:00 2001 From: David Porter Date: Sat, 9 Mar 2024 15:28:40 -0800 Subject: [PATCH 09/12] No need to add deps right now --- go.mod | 2 -- go.sum | 4 ---- 2 files changed, 6 deletions(-) diff --git a/go.mod b/go.mod index 321456ceb83..487f6339943 100644 --- a/go.mod +++ b/go.mod @@ -93,7 +93,6 @@ require ( github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-uuid v1.0.2 // indirect - github.com/hexops/valast v1.4.4 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect github.com/jcmturner/gofork v1.0.0 // indirect @@ -143,7 +142,6 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect honnef.co/go/tools v0.3.2 // indirect - mvdan.cc/gofumpt v0.4.0 // indirect ) // ringpop-go and tchannel-go depends on older version of thrift, yarpc brings up newer version diff --git a/go.sum b/go.sum index da0ecc2e72b..2e5d16db790 100644 --- a/go.sum +++ b/go.sum @@ -224,8 +224,6 @@ github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2I github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-version v1.2.0 h1:3vNe/fWF5CBgRIguda1meWhsZHy3m8gCJ5wx+dIzX/E= github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= -github.com/hexops/valast v1.4.4 h1:rETyycw+/L2ZVJHHNxEBgh8KUn+87WugH9MxcEv9PGs= -github.com/hexops/valast v1.4.4/go.mod h1:Jcy1pNH7LNraVaAZDLyv21hHg2WBv9Nf9FL6fGxU7o4= github.com/iancoleman/strcase v0.2.0 h1:05I4QRnGpI0m37iZQRuskXh+w77mr6Z41lwQzuHLwW0= github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= @@ -768,8 +766,6 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.3.2 h1:ytYb4rOqyp1TSa2EPvNVwtPQJctSELKaMyLfqNP4+34= honnef.co/go/tools v0.3.2/go.mod h1:jzwdWgg7Jdq75wlfblQxO4neNaFFSvgc1tD5Wv8U0Yw= -mvdan.cc/gofumpt v0.4.0 h1:JVf4NN1mIpHogBj7ABpgOyZc65/UUOkKQFkoURsz4MM= -mvdan.cc/gofumpt v0.4.0/go.mod h1:PljLOHDeZqgS8opHRKLzp2It2VBuSdteAgqUfzMTxlQ= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= From 38c8643e1c18fbca883db7da96da8c9581c55fee Mon Sep 17 00:00:00 2001 From: David Porter Date: Sat, 9 Mar 2024 16:19:33 -0800 Subject: [PATCH 10/12] fucking timezones --- ..._builder_add_continue_as_new_event_test.go | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go b/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go index 537fdd2aa8d..a1d50511456 100644 --- a/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go +++ b/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go @@ -48,7 +48,10 @@ func TestAddContinueAsNewEvent(t *testing.T) { var ( domainID = "5391dbea-5b30-4323-82ca-e1c95339bb3e" - ts1 = int64(1709872131923568000) + ts0 = int64(123450) + ts1 = int64(123451) + ts2 = int64(123452) + ts3 = int64(123453) shardID = 123 ) @@ -67,8 +70,8 @@ func TestAddContinueAsNewEvent(t *testing.T) { LastEventTaskID: 15728673, NextEventID: 16, LastProcessedEvent: 14, - StartTimestamp: time.Date(2024, 3, 8, 4, 28, 41, 415000000, time.UTC), - LastUpdatedTimestamp: time.Date(2024, 3, 7, 20, 28, 51, 563480000, time.Local), + StartTimestamp: time.Unix(0, ts0), + LastUpdatedTimestamp: time.Unix(0, ts2), CreateRequestID: "b086d62c-dd2b-4bbc-9143-5940516acbfe", DecisionVersion: -24, DecisionScheduleID: -23, @@ -85,7 +88,7 @@ func TestAddContinueAsNewEvent(t *testing.T) { BinaryChecksum: "6df03bf5110d681667852a8456519536", RunID: "5adce5c5-b7b2-4418-9bf0-4207303f6343", FirstDecisionCompletedID: 4, - CreatedTimeNano: common.Ptr(int64(1709872121495904000)), + CreatedTimeNano: common.Ptr(int64(ts1)), Resettable: true, }}, }, @@ -119,22 +122,22 @@ func TestAddContinueAsNewEvent(t *testing.T) { LastFirstEventID: 1, NextEventID: 3, LastProcessedEvent: -23, - StartTimestamp: time.Date(2024, 3, 7, 20, 28, 51, 923568000, time.Local), + StartTimestamp: time.Unix(0, ts3), CreateRequestID: "4630bf04-5c64-41bf-92d9-576db2d535cb", DecisionVersion: 1, DecisionScheduleID: 2, DecisionStartedID: -23, DecisionRequestID: "emptyUuid", DecisionTimeout: 60, - DecisionScheduledTimestamp: ts1, - DecisionOriginalScheduledTimestamp: ts1, + DecisionScheduledTimestamp: ts3, + DecisionOriginalScheduledTimestamp: ts3, AutoResetPoints: &types.ResetPoints{ Points: []*types.ResetPointInfo{{ BinaryChecksum: "6df03bf5110d681667852a8456519536", RunID: "5adce5c5-b7b2-4418-9bf0-4207303f6343", FirstDecisionCompletedID: 4, - CreatedTimeNano: common.Ptr(int64(1709872121495904000)), - ExpiringTimeNano: common.Ptr(int64(ts1)), + CreatedTimeNano: common.Ptr(int64(ts1)), + ExpiringTimeNano: common.Ptr(int64(ts3)), Resettable: true, }}, }, @@ -143,7 +146,7 @@ func TestAddContinueAsNewEvent(t *testing.T) { expectedEndingReturnHistoryState := []*types.HistoryEvent{ { ID: 1, - Timestamp: common.Ptr(int64(ts1)), + Timestamp: common.Ptr(int64(ts3)), EventType: common.Ptr(types.EventTypeWorkflowExecutionStarted), Version: 1, TaskID: -1234, @@ -162,8 +165,8 @@ func TestAddContinueAsNewEvent(t *testing.T) { BinaryChecksum: "6df03bf5110d681667852a8456519536", RunID: "5adce5c5-b7b2-4418-9bf0-4207303f6343", FirstDecisionCompletedID: 4, - CreatedTimeNano: common.Ptr(int64(1709872121495904000)), - ExpiringTimeNano: common.Ptr(int64(ts1)), + CreatedTimeNano: common.Ptr(int64(ts1)), + ExpiringTimeNano: common.Ptr(int64(ts3)), Resettable: true, }}}, Header: nil, @@ -171,7 +174,7 @@ func TestAddContinueAsNewEvent(t *testing.T) { }, { ID: 2, - Timestamp: common.Ptr(int64(ts1)), + Timestamp: common.Ptr(int64(ts3)), EventType: common.Ptr(types.EventTypeDecisionTaskScheduled), Version: 1, TaskID: -1234, @@ -282,7 +285,7 @@ func TestAddContinueAsNewEvent(t *testing.T) { domainCache := cache.NewMockDomainCache(ctrl) taskGenerator := NewMockMutableStateTaskGenerator(ctrl) - msb.timeSource = clock.NewMockedTimeSourceAt(time.Unix(0, ts1)) + msb.timeSource = clock.NewMockedTimeSourceAt(time.Unix(0, ts3)) msb.eventsCache = events.NewCache(shardID, historyManager, config.NewForTest(), From c7386d10254da73c1a9095ac17c31f6719ca1913 Mon Sep 17 00:00:00 2001 From: David Porter Date: Sun, 10 Mar 2024 15:11:51 -0700 Subject: [PATCH 11/12] feedback --- common/convert_test.go | 26 ++ ..._builder_add_continue_as_new_event_test.go | 252 +++++++++++------- 2 files changed, 181 insertions(+), 97 deletions(-) create mode 100644 common/convert_test.go diff --git a/common/convert_test.go b/common/convert_test.go new file mode 100644 index 00000000000..ff245b94c88 --- /dev/null +++ b/common/convert_test.go @@ -0,0 +1,26 @@ +package common + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestPtr(t *testing.T) { + + aString := "a string" + aStruct := struct{}{} + anInterface := interface{}(nil) + + t.Run("a string should be converted to a pointer of a string", func(t *testing.T) { + assert.Equal(t, &aString, Ptr(aString)) + }) + + t.Run("a struct should be converted to a pointer of a string", func(t *testing.T) { + assert.Equal(t, &aStruct, Ptr(aStruct)) + }) + + t.Run("a interface should be converted to a pointer of a string", func(t *testing.T) { + assert.Equal(t, &anInterface, Ptr(anInterface)) + }) +} diff --git a/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go b/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go index a1d50511456..07a02d5a40a 100644 --- a/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go +++ b/service/history/execution/mutable_state_builder_add_continue_as_new_event_test.go @@ -22,10 +22,13 @@ package execution import ( "context" + "errors" "testing" "time" "github.com/golang/mock/gomock" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/assert" "github.com/uber/cadence/common" @@ -55,58 +58,97 @@ func TestAddContinueAsNewEvent(t *testing.T) { shardID = 123 ) - startingExecutionInfo := &persistence.WorkflowExecutionInfo{ - DomainID: "5391dbea-5b30-4323-82ca-e1c95339bb3e", - WorkflowID: "helloworld_b4db8bd0-74b7-4250-ade7-ac72a1efb171", - RunID: "5adce5c5-b7b2-4418-9bf0-4207303f6343", - FirstExecutionRunID: "5adce5c5-b7b2-4418-9bf0-4207303f6343", - InitiatedID: -7, - TaskList: "helloWorldGroup", - WorkflowTypeName: "helloWorldWorkflow", - WorkflowTimeout: 60, - DecisionStartToCloseTimeout: 60, - State: 1, - LastFirstEventID: 14, - LastEventTaskID: 15728673, - NextEventID: 16, - LastProcessedEvent: 14, - StartTimestamp: time.Unix(0, ts0), - LastUpdatedTimestamp: time.Unix(0, ts2), - CreateRequestID: "b086d62c-dd2b-4bbc-9143-5940516acbfe", - DecisionVersion: -24, - DecisionScheduleID: -23, - DecisionStartedID: -23, - DecisionRequestID: "emptyUuid", - DecisionOriginalScheduledTimestamp: 1709872131542474000, - StickyTaskList: "david-porter-DVFG73D710:04be47fa-2381-469f-b2ea-1253271ad116", - StickyScheduleToStartTimeout: 5, - ClientLibraryVersion: "0.18.4", - ClientFeatureVersion: "1.7.0", - ClientImpl: "uber-go", - AutoResetPoints: &types.ResetPoints{ - Points: []*types.ResetPointInfo{{ - BinaryChecksum: "6df03bf5110d681667852a8456519536", - RunID: "5adce5c5-b7b2-4418-9bf0-4207303f6343", - FirstDecisionCompletedID: 4, - CreatedTimeNano: common.Ptr(int64(ts1)), - Resettable: true, - }}, - }, - SearchAttributes: map[string][]uint8{"BinaryChecksums": {91, 34, 54, 100, 102, 48, 51, 98, 102, 53, 49, 49, 48, 100, 54, 56, 49, 54, 54, 55, 56, 53, 50, 97, 56, 52, 53, 54, 53, 49, 57, 53, 51, 54, 34, 93}}} + // the mutable state builder confusingly both returns a new builder with this fuction + // as well as mutating its internal state, making it difficult to test repeatedly, since + // the supplied inputs are muted per invocation. Wrapping them in a factor here to allow + // for tests to be independent + var createStartingExecutionInfo = func() *persistence.WorkflowExecutionInfo { + return &persistence.WorkflowExecutionInfo{ + DomainID: "5391dbea-5b30-4323-82ca-e1c95339bb3e", + WorkflowID: "helloworld_b4db8bd0-74b7-4250-ade7-ac72a1efb171", + RunID: "5adce5c5-b7b2-4418-9bf0-4207303f6343", + FirstExecutionRunID: "5adce5c5-b7b2-4418-9bf0-4207303f6343", + InitiatedID: -7, + TaskList: "helloWorldGroup", + WorkflowTypeName: "helloWorldWorkflow", + WorkflowTimeout: 60, + DecisionStartToCloseTimeout: 60, + State: 1, + LastFirstEventID: 14, + LastEventTaskID: 15728673, + NextEventID: 16, + LastProcessedEvent: 14, + StartTimestamp: time.Unix(0, ts0), + LastUpdatedTimestamp: time.Unix(0, ts2), + CreateRequestID: "b086d62c-dd2b-4bbc-9143-5940516acbfe", + DecisionVersion: -24, + DecisionScheduleID: -23, + DecisionStartedID: -23, + DecisionRequestID: "emptyUuid", + DecisionOriginalScheduledTimestamp: 1709872131542474000, + StickyTaskList: "david-porter-DVFG73D710:04be47fa-2381-469f-b2ea-1253271ad116", + StickyScheduleToStartTimeout: 5, + ClientLibraryVersion: "0.18.4", + ClientFeatureVersion: "1.7.0", + ClientImpl: "uber-go", + AutoResetPoints: &types.ResetPoints{ + Points: []*types.ResetPointInfo{{ + BinaryChecksum: "6df03bf5110d681667852a8456519536", + RunID: "5adce5c5-b7b2-4418-9bf0-4207303f6343", + FirstDecisionCompletedID: 4, + CreatedTimeNano: common.Ptr(int64(ts1)), + Resettable: true, + }}, + }, + SearchAttributes: map[string][]uint8{"BinaryChecksums": {91, 34, 54, 100, 102, 48, 51, 98, 102, 53, 49, 49, 48, 100, 54, 56, 49, 54, 54, 55, 56, 53, 50, 97, 56, 52, 53, 54, 53, 49, 57, 53, 51, 54, 34, 93}}} - startingHistory := []*types.HistoryEvent{{ - ID: 15, - Timestamp: common.Ptr(int64(1709872131580456000)), - EventType: common.Ptr(types.EventTypeDecisionTaskCompleted), - Version: 1, - TaskID: -1234, - DecisionTaskCompletedEventAttributes: &types.DecisionTaskCompletedEventAttributes{ - ScheduledEventID: 13, - StartedEventID: 14, - Identity: "27368@david-porter-DVFG73D710@helloWorldGroup@6027e9ee-048e-4f67-8d88-27883c496901", - BinaryChecksum: "6df03bf5110d681667852a8456519536", - }, - }} + } + + var createValidStartingHistory = func() []*types.HistoryEvent { + return []*types.HistoryEvent{{ + ID: 15, + Timestamp: common.Ptr(int64(1709872131580456000)), + EventType: common.Ptr(types.EventTypeDecisionTaskCompleted), + Version: 1, + TaskID: -1234, + DecisionTaskCompletedEventAttributes: &types.DecisionTaskCompletedEventAttributes{ + ScheduledEventID: 13, + StartedEventID: 14, + Identity: "27368@david-porter-DVFG73D710@helloWorldGroup@6027e9ee-048e-4f67-8d88-27883c496901", + BinaryChecksum: "6df03bf5110d681667852a8456519536", + }, + }} + } + + var createFetchedHistory = func() *types.HistoryEvent { + return &types.HistoryEvent{ + ID: 1, Timestamp: common.Ptr(int64(1709938156435726000)), + EventType: common.Ptr(types.EventTypeWorkflowExecutionStarted), + Version: 1, + TaskID: 17826364, + WorkflowExecutionStartedEventAttributes: &types.WorkflowExecutionStartedEventAttributes{ + WorkflowType: &types.WorkflowType{Name: "helloWorldWorkflow"}, + TaskList: &types.TaskList{Name: "helloWorldGroup"}, + Input: []uint8{110, 117, 108, 108, 10}, + ExecutionStartToCloseTimeoutSeconds: common.Ptr(int32(60)), + TaskStartToCloseTimeoutSeconds: common.Ptr(int32(60)), + ContinuedExecutionRunID: "96892ca6-975a-44b1-9726-cdb63acd8cda", + OriginalExecutionRunID: "befc5b41-fb06-4a99-bec2-91c3e98b17d7", + FirstExecutionRunID: "bcdee7e4-cb21-4bbb-a8d1-43da79e3d252", + PrevAutoResetPoints: &types.ResetPoints{Points: []*types.ResetPointInfo{ + { + BinaryChecksum: "6df03bf5110d681667852a8456519536", + RunID: "bcdee7e4-cb21-4bbb-a8d1-43da79e3d252", + FirstDecisionCompletedID: 4, + CreatedTimeNano: common.Ptr(int64(1709938002170829000)), + ExpiringTimeNano: common.Ptr(int64(1710197212347858000)), + Resettable: true, + }, + }}, + Header: &types.Header{}, + }, + } + } expectedEndingReturnExecutionState := &persistence.WorkflowExecutionInfo{ DomainID: "5391dbea-5b30-4323-82ca-e1c95339bb3e", @@ -185,34 +227,6 @@ func TestAddContinueAsNewEvent(t *testing.T) { }, } - fetchedHistoryEvent1 := &types.HistoryEvent{ - ID: 1, Timestamp: common.Ptr(int64(1709938156435726000)), - EventType: common.Ptr(types.EventTypeWorkflowExecutionStarted), - Version: 1, - TaskID: 17826364, - WorkflowExecutionStartedEventAttributes: &types.WorkflowExecutionStartedEventAttributes{ - WorkflowType: &types.WorkflowType{Name: "helloWorldWorkflow"}, - TaskList: &types.TaskList{Name: "helloWorldGroup"}, - Input: []uint8{110, 117, 108, 108, 10}, - ExecutionStartToCloseTimeoutSeconds: common.Ptr(int32(60)), - TaskStartToCloseTimeoutSeconds: common.Ptr(int32(60)), - ContinuedExecutionRunID: "96892ca6-975a-44b1-9726-cdb63acd8cda", - OriginalExecutionRunID: "befc5b41-fb06-4a99-bec2-91c3e98b17d7", - FirstExecutionRunID: "bcdee7e4-cb21-4bbb-a8d1-43da79e3d252", - PrevAutoResetPoints: &types.ResetPoints{Points: []*types.ResetPointInfo{ - { - BinaryChecksum: "6df03bf5110d681667852a8456519536", - RunID: "bcdee7e4-cb21-4bbb-a8d1-43da79e3d252", - FirstDecisionCompletedID: 4, - CreatedTimeNano: common.Ptr(int64(1709938002170829000)), - ExpiringTimeNano: common.Ptr(int64(1710197212347858000)), - Resettable: true, - }, - }}, - Header: &types.Header{}, - }, - } - tests := map[string]struct { startingState *persistence.WorkflowExecutionInfo // history is a substruct of current state, but because they're both @@ -230,14 +244,14 @@ func TestAddContinueAsNewEvent(t *testing.T) { expectedErr error }{ "a continue-as-new event with no errors": { - startingState: startingExecutionInfo, - startingHistory: startingHistory, + startingState: createStartingExecutionInfo(), + startingHistory: createValidStartingHistory(), // when it goes to fetch the starting event historyManagerAffordance: func(historyManager *persistence.MockHistoryManager) { historyManager.EXPECT().ReadHistoryBranch(gomock.Any(), gomock.Any()).Return(&persistence.ReadHistoryBranchResponse{ HistoryEvents: []*types.HistoryEvent{ - fetchedHistoryEvent1, + createFetchedHistory(), }, }, nil) }, @@ -260,6 +274,50 @@ func TestAddContinueAsNewEvent(t *testing.T) { expectedReturnedState: expectedEndingReturnExecutionState, expectedReturnedHistory: expectedEndingReturnHistoryState, }, + + "a continue-as-new with failure to get the history event": { + startingState: createStartingExecutionInfo(), + startingHistory: createValidStartingHistory(), + historyManagerAffordance: func(historyManager *persistence.MockHistoryManager) { + historyManager.EXPECT().ReadHistoryBranch(gomock.Any(), gomock.Any()).Return(nil, errors.New("an error")) + }, + shardManagerAffordance: func(shardContext *shardCtx.MockContext, msb *mutableStateBuilder, domainCache cache.DomainCache) { + shardContext.EXPECT().GetShardID().Return(123) + }, + domainCacheAffordance: func(domainCache *cache.MockDomainCache) { + domainCache.EXPECT().GetDomainName(gomock.Any()).Return("domain", nil) + }, + taskgeneratorAffordance: func(taskGenerator *MockMutableStateTaskGenerator, msb *mutableStateBuilder) {}, + expectedErr: errors.New("an error"), + }, + + "a continue-as-new with errors in replicating": { + startingState: createStartingExecutionInfo(), + startingHistory: createValidStartingHistory(), + historyManagerAffordance: func(historyManager *persistence.MockHistoryManager) { + historyManager.EXPECT().ReadHistoryBranch(gomock.Any(), gomock.Any()).Return(&persistence.ReadHistoryBranchResponse{ + HistoryEvents: []*types.HistoryEvent{ + createFetchedHistory(), + }, + }, nil) + }, + shardManagerAffordance: func(shardContext *shardCtx.MockContext, msb *mutableStateBuilder, domainCache cache.DomainCache) { + shardContext.EXPECT().GetShardID().Return(123) + shardContext.EXPECT().GetClusterMetadata().Return(cluster.Metadata{}).Times(2) + shardContext.EXPECT().GetEventsCache().Return(msb.eventsCache) + shardContext.EXPECT().GetConfig().Return(msb.config) + shardContext.EXPECT().GetTimeSource().Return(msb.timeSource) + shardContext.EXPECT().GetMetricsClient().Return(metrics.NewNoopMetricsClient()) + shardContext.EXPECT().GetDomainCache().Return(domainCache) + }, + domainCacheAffordance: func(domainCache *cache.MockDomainCache) { + domainCache.EXPECT().GetDomainName(gomock.Any()).Return("domain", nil) + }, + taskgeneratorAffordance: func(taskGenerator *MockMutableStateTaskGenerator, msb *mutableStateBuilder) { + taskGenerator.EXPECT().GenerateWorkflowCloseTasks(gomock.Any(), msb.config.WorkflowDeletionJitterRange("domain")).Return(errors.New("an error")) + }, + expectedErr: errors.New("an error"), + }, } for name, td := range tests { @@ -275,7 +333,7 @@ func TestAddContinueAsNewEvent(t *testing.T) { 1, nil, ), - executionInfo: startingExecutionInfo, + executionInfo: td.startingState, logger: log.NewNoop(), config: config.NewForTest(), } @@ -319,26 +377,26 @@ func TestAddContinueAsNewEvent(t *testing.T) { }, Input: []uint8{110, 117, 108, 108, 10}, }) - resultExecutionInfo := returnedBuilder.GetExecutionInfo() if td.expectedErr != nil { assert.ErrorAs(t, err, &td.expectedErr) + return } - // these are generated nondeterministically, with a plain guid generator - // todo(david): make this mockable - td.expectedReturnedState.RunID = "a run id" - resultExecutionInfo.RunID = "a run id" - td.expectedReturnedState.CreateRequestID = "a request id" - resultExecutionInfo.CreateRequestID = "a request id" - for _, historyEvent := range returnedBuilder.GetHistoryBuilder().history { - if historyEvent.WorkflowExecutionStartedEventAttributes != nil { - historyEvent.WorkflowExecutionStartedEventAttributes.OriginalExecutionRunID = "a run id" - } - } + resultExecutionInfo := returnedBuilder.GetExecutionInfo() + + assert.Empty(t, cmp.Diff(td.expectedReturnedState, resultExecutionInfo, + + // these are generated nondeterministically, with a plain guid generator + // todo(david): make this mockable + cmpopts.IgnoreFields(types.WorkflowExecutionStartedEventAttributes{}, "OriginalExecutionRunID"), + cmpopts.IgnoreFields(types.WorkflowExecution{}, "RunID"), + cmpopts.IgnoreFields(persistence.WorkflowExecutionInfo{}, "RunID", "CreateRequestID"), + )) - assert.Equal(t, td.expectedReturnedState, resultExecutionInfo) - assert.Equal(t, td.expectedReturnedHistory, returnedBuilder.GetHistoryBuilder().history) + assert.Empty(t, cmp.Diff(td.expectedReturnedHistory, returnedBuilder.GetHistoryBuilder().history, + cmpopts.IgnoreFields(types.WorkflowExecutionStartedEventAttributes{}, "OriginalExecutionRunID")), + ) }) } } From 22625cc807e8e96bbee84f5778d118450b6fbfd4 Mon Sep 17 00:00:00 2001 From: David Porter Date: Sun, 10 Mar 2024 15:31:22 -0700 Subject: [PATCH 12/12] Fix --- common/convert_test.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/common/convert_test.go b/common/convert_test.go index ff245b94c88..1f69ac14fa5 100644 --- a/common/convert_test.go +++ b/common/convert_test.go @@ -1,3 +1,25 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + package common import (