Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write tests for reconciliation fetcher #6424

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/persistence/data_manager_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// THE SOFTWARE.

// Geneate rate limiter wrappers.
//go:generate mockgen -package $GOPACKAGE -destination dataManagerInterfaces_mock.go -self_package github.com/uber/cadence/common/persistence github.com/uber/cadence/common/persistence Task,ShardManager,ExecutionManager,ExecutionManagerFactory,TaskManager,HistoryManager,DomainManager,QueueManager,ConfigStoreManager
//go:generate mockgen -package $GOPACKAGE -destination data_manager_interfaces_mock.go -self_package github.com/uber/cadence/common/persistence github.com/uber/cadence/common/persistence Task,ShardManager,ExecutionManager,ExecutionManagerFactory,TaskManager,HistoryManager,DomainManager,QueueManager,ConfigStoreManager
//go:generate gowrap gen -g -p . -i ConfigStoreManager -t ./wrappers/templates/ratelimited.tmpl -o wrappers/ratelimited/configstore_generated.go
//go:generate gowrap gen -g -p . -i DomainManager -t ./wrappers/templates/ratelimited.tmpl -o wrappers/ratelimited/domain_generated.go
//go:generate gowrap gen -g -p . -i HistoryManager -t ./wrappers/templates/ratelimited.tmpl -o wrappers/ratelimited/history_generated.go
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -package $GOPACKAGE -destination dataVisibilityManagerInterfaces_mock.go -self_package github.com/uber/cadence/common/persistence github.com/uber/cadence/common/persistence VisibilityManager
//go:generate mockgen -package $GOPACKAGE -destination visibility_manager_interfaces_mock.go -self_package github.com/uber/cadence/common/persistence github.com/uber/cadence/common/persistence VisibilityManager
// Generate rate limiter wrapper.
//go:generate gowrap gen -g -p . -i VisibilityManager -t ./wrappers/templates/ratelimited.tmpl -o wrappers/ratelimited/visibility_generated.go

Expand Down
315 changes: 282 additions & 33 deletions common/reconciliation/fetcher/concrete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,83 +23,332 @@
package fetcher

import (
"context"
"fmt"
"testing"

"github.com/golang/mock/gomock"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/codec"
"github.com/uber/cadence/common/pagination"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/reconciliation/entity"
)

const (
testTreeID = "test-tree-id"
testBranchID = "test-branch-id"
)
func TestConcreteExecutionIterator(t *testing.T) {
ctrl := gomock.NewController(t)
retryer := persistence.NewMockRetryer(ctrl)
retryer.EXPECT().ListConcreteExecutions(gomock.Any(), gomock.Any()).
Return(&persistence.ListConcreteExecutionsResponse{}, nil).
Times(1)

var (
validBranchToken = []byte{89, 11, 0, 10, 0, 0, 0, 12, 116, 101, 115, 116, 45, 116, 114, 101, 101, 45, 105, 100, 11, 0, 20, 0, 0, 0, 14, 116, 101, 115, 116, 45, 98, 114, 97, 110, 99, 104, 45, 105, 100, 0}
invalidBranchToken = []byte("invalid")
)
iterator := ConcreteExecutionIterator(
context.Background(),
retryer,
10,
)
require.NotNil(t, iterator)
}

func TestConcreteExecution(t *testing.T) {
encoder := codec.NewThriftRWEncoder()
tests := []struct {
desc string
req ExecutionRequest
mockFn func(retryer *persistence.MockRetryer)
wantEntity entity.Entity
wantErr bool
}{
{
desc: "success",
req: ExecutionRequest{
DomainID: "test-domain-id",
WorkflowID: "test-workflow-id",
RunID: "test-run-id",
DomainName: "test-domain-name",
},
mockFn: func(retryer *persistence.MockRetryer) {
retryer.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).
Return(
&persistence.GetWorkflowExecutionResponse{
State: &persistence.WorkflowMutableState{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
BranchToken: mustGetValidBranchToken(t, encoder, "test-tree-id", "test-branch-id"),
State: persistence.WorkflowStateRunning,
DomainID: "test-domain-id",
WorkflowID: "test-workflow-id",
RunID: "test-run-id",
},
},
},
nil,
).Times(1)

retryer.EXPECT().GetShardID().Return(355).Times(1)
},
wantEntity: &entity.ConcreteExecution{
BranchToken: mustGetValidBranchToken(t, encoder, "test-tree-id", "test-branch-id"),
TreeID: "test-tree-id",
BranchID: "test-branch-id",
Execution: entity.Execution{
ShardID: 355,
DomainID: "test-domain-id",
WorkflowID: "test-workflow-id",
RunID: "test-run-id",
State: persistence.WorkflowStateRunning,
},
},
},
{
desc: "GetWorkflowExecution failed",
req: ExecutionRequest{
DomainID: "test-domain-id",
WorkflowID: "test-workflow-id",
RunID: "test-run-id",
DomainName: "test-domain-name",
},
mockFn: func(retryer *persistence.MockRetryer) {
retryer.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).
Return(nil, fmt.Errorf("failed")).Times(1)
},
wantErr: true,
},
}

for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
ctrl := gomock.NewController(t)
retryer := persistence.NewMockRetryer(ctrl)

tc.mockFn(retryer)

gotEntity, err := ConcreteExecution(context.Background(), retryer, tc.req)
if (err != nil) != tc.wantErr {
t.Fatalf("ConcreteExecution() err: %v, wantErr %v", err, tc.wantErr)
}

if diff := cmp.Diff(tc.wantEntity, gotEntity); diff != "" {
t.Errorf("ConcreteExecution() mismatch (-want +got):\n%s", diff)
}
})
}
}

func TestGetConcreteExecutions(t *testing.T) {
encoder := codec.NewThriftRWEncoder()
testExecutions := []*persistence.ListConcreteExecutionsEntity{
{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
BranchToken: mustGetValidBranchToken(t, encoder, "test-tree-id-1", "test-branch-id-1"),
State: persistence.WorkflowStateRunning,
DomainID: "test-domain-id-1",
WorkflowID: "test-workflow-id-1",
RunID: "test-run-id-1",
},
},
{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
BranchToken: mustGetValidBranchToken(t, encoder, "test-tree-id-2", "test-branch-id-2"),
State: persistence.WorkflowStateCompleted,
DomainID: "test-domain-id-2",
WorkflowID: "test-workflow-id-2",
RunID: "test-run-id-2",
},
},
}

tests := []struct {
desc string
pageSize int
pageToken pagination.PageToken
mockFn func(*testing.T, *persistence.MockRetryer)
wantPage pagination.Page
wantErr bool
}{
{
desc: "success",
pageSize: 2,
pageToken: []byte("test-page-token"),
mockFn: func(t *testing.T, retryer *persistence.MockRetryer) {
retryer.EXPECT().ListConcreteExecutions(gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, req *persistence.ListConcreteExecutionsRequest) (*persistence.ListConcreteExecutionsResponse, error) {
wantReq := &persistence.ListConcreteExecutionsRequest{
PageSize: 2,
PageToken: []byte("test-page-token"),
}
if diff := cmp.Diff(wantReq, req); diff != "" {
t.Errorf("Request mismatch (-want +got):\n%s", diff)
}
return &persistence.ListConcreteExecutionsResponse{
PageToken: []byte("test-next-page-token"),
Executions: testExecutions,
}, nil
}).Times(1)

// will be called for each execution in the response
retryer.EXPECT().GetShardID().Return(355).Times(2)
},
wantPage: pagination.Page{
CurrentToken: []byte("test-page-token"),
NextToken: []byte("test-next-page-token"),
Entities: concreteExecutionsToEntities(testExecutions, 355, encoder),
},
},
}

for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
ctrl := gomock.NewController(t)
retryer := persistence.NewMockRetryer(ctrl)

tc.mockFn(t, retryer)

fetchFn := getConcreteExecutions(retryer, tc.pageSize, encoder)
gotPage, err := fetchFn(context.Background(), tc.pageToken)
if (err != nil) != tc.wantErr {
t.Fatalf("ConcreteExecution() err: %v, wantErr %v", err, tc.wantErr)
}

if diff := cmp.Diff(tc.wantPage, gotPage); diff != "" {
t.Errorf("ConcreteExecution() mismatch (-want +got):\n%s", diff)
}
})
}
}

func TestGetBranchToken(t *testing.T) {
encoder := codec.NewThriftRWEncoder()
testCases := []struct {
name string
entity *persistence.ListConcreteExecutionsEntity
expectError bool
branchToken []byte
treeID string
branchID string
name string
entity *persistence.ListConcreteExecutionsEntity
wantErr bool
wantBranchToken []byte
wantHistoryBranch shared.HistoryBranch
}{
{
name: "ValidBranchToken",
name: "valid branch token - no version histories",
entity: &persistence.ListConcreteExecutionsEntity{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
BranchToken: mustGetValidBranchToken(t, encoder, "test-tree-id", "test-branch-id"),
},
},
wantBranchToken: mustGetValidBranchToken(t, encoder, "test-tree-id", "test-branch-id"),
wantHistoryBranch: shared.HistoryBranch{
TreeID: common.StringPtr("test-tree-id"),
BranchID: common.StringPtr("test-branch-id"),
},
},
{
name: "valid branch token - with version histories",
entity: &persistence.ListConcreteExecutionsEntity{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
BranchToken: getValidBranchToken(t, encoder),
BranchToken: mustGetValidBranchToken(t, encoder, "test-tree-id", "test-branch-id"),
},
VersionHistories: &persistence.VersionHistories{
CurrentVersionHistoryIndex: 1,
Histories: []*persistence.VersionHistory{
{}, // this will be ignored because index is 1
{
BranchToken: mustGetValidBranchToken(t, encoder, "test-tree-id-from-versionhistory", "test-branch-id-from-versionhistory"),
},
},
},
},
wantBranchToken: mustGetValidBranchToken(t, encoder, "test-tree-id-from-versionhistory", "test-branch-id-from-versionhistory"),
wantHistoryBranch: shared.HistoryBranch{
TreeID: common.StringPtr("test-tree-id-from-versionhistory"),
BranchID: common.StringPtr("test-branch-id-from-versionhistory"),
},
expectError: false,
branchToken: validBranchToken,
treeID: testTreeID,
branchID: testBranchID,
},
{
name: "InvalidBranchToken",
name: "version history index out of bound",
entity: &persistence.ListConcreteExecutionsEntity{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
BranchToken: invalidBranchToken,
BranchToken: mustGetValidBranchToken(t, encoder, "test-tree-id", "test-branch-id"),
},
VersionHistories: &persistence.VersionHistories{
CurrentVersionHistoryIndex: 2,
Histories: []*persistence.VersionHistory{
{},
{
BranchToken: mustGetValidBranchToken(t, encoder, "test-tree-id-from-versionhistory", "test-branch-id-from-versionhistory"),
},
},
},
},
expectError: true,
wantErr: true,
},
{
name: "invalid branch token",
entity: &persistence.ListConcreteExecutionsEntity{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
BranchToken: []byte("invalid"),
},
},
wantErr: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
branchToken, branch, err := getBranchToken(tc.entity.ExecutionInfo.BranchToken, tc.entity.VersionHistories, encoder)
if tc.expectError {
branchToken, branch, err := getBranchToken(
tc.entity.ExecutionInfo.BranchToken,
tc.entity.VersionHistories,
encoder,
)

if tc.wantErr {
require.Error(t, err)
require.Nil(t, branchToken)
require.Empty(t, branch.GetTreeID())
require.Empty(t, branch.GetBranchID())
} else {
require.NoError(t, err)
require.Equal(t, tc.branchToken, branchToken)
require.Equal(t, tc.treeID, branch.GetTreeID())
require.Equal(t, tc.branchID, branch.GetBranchID())
if diff := cmp.Diff(tc.wantHistoryBranch, branch); diff != "" {
t.Fatalf("HistoryBranch mismatch (-want +got):\n%s", diff)
}
require.Equal(t, tc.wantBranchToken, branchToken)
}
})
}
}

func getValidBranchToken(t *testing.T, encoder *codec.ThriftRWEncoder) []byte {
func mustGetValidBranchToken(t *testing.T, encoder *codec.ThriftRWEncoder, treeID, branchID string) []byte {
hb := &shared.HistoryBranch{
TreeID: common.StringPtr(testTreeID),
BranchID: common.StringPtr(testBranchID),
TreeID: common.StringPtr(treeID),
BranchID: common.StringPtr(branchID),
}
bytes, err := encoder.Encode(hb)
require.NoError(t, err)
if err != nil {
t.Fatalf("failed to encode branch token: %v", err)
}

return bytes
}

func concreteExecutionsToEntities(execs []*persistence.ListConcreteExecutionsEntity, shardID int, encoder *codec.ThriftRWEncoder) []pagination.Entity {
entities := make([]pagination.Entity, len(execs))
for i, e := range execs {
branchToken, branch, err := getBranchToken(e.ExecutionInfo.BranchToken, e.VersionHistories, encoder)
if err != nil {
return nil
}
concreteExec := &entity.ConcreteExecution{
BranchToken: branchToken,
TreeID: branch.GetTreeID(),
BranchID: branch.GetBranchID(),
Execution: entity.Execution{
ShardID: shardID,
DomainID: e.ExecutionInfo.DomainID,
WorkflowID: e.ExecutionInfo.WorkflowID,
RunID: e.ExecutionInfo.RunID,
State: e.ExecutionInfo.State,
},
}
entities[i] = concreteExec
}
return entities
}
Loading
Loading