Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration test for workflow ID based rate limiting task processing #5933

Merged
merged 11 commits into from
Apr 26, 2024
5 changes: 5 additions & 0 deletions host/test_suites.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,9 @@ type (
*require.Assertions
*IntegrationBase
}

WorkflowIDInternalRateLimitIntegrationSuite struct {
*require.Assertions
*IntegrationBase
}
)
232 changes: 232 additions & 0 deletions host/workflowsidinternalratelimit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
// The MIT License (MIT)
sankari165 marked this conversation as resolved.
Show resolved Hide resolved

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package host

import (
"bytes"
"encoding/binary"
"flag"
"strconv"
"testing"
"time"

"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/persistence"
pt "github.com/uber/cadence/common/persistence/persistence-tests"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/matching"
)

func TestWorkflowIDInternalRateLimitIntegrationSuite(t *testing.T) {
// Loads the flags for persistence etc., if none are given they are set in ./flag.go
flag.Parse()

clusterConfig, err := GetTestClusterConfig("testdata/integration_wfidratelimit_cluster.yaml")
require.NoError(t, err)

clusterConfig.TimeSource = clock.NewMockedTimeSource()
clusterConfig.HistoryDynamicConfigOverrides = map[dynamicconfig.Key]interface{}{
sankari165 marked this conversation as resolved.
Show resolved Hide resolved
dynamicconfig.WorkflowIDCacheInternalEnabled: true,
dynamicconfig.WorkflowIDInternalRPS: 2,
dynamicconfig.WorkflowIDInternalRateLimitEnabled: true,
}

testCluster := NewPersistenceTestCluster(t, clusterConfig)

s := new(WorkflowIDInternalRateLimitIntegrationSuite)
params := IntegrationBaseParams{
DefaultTestCluster: testCluster,
VisibilityTestCluster: testCluster,
TestClusterConfig: clusterConfig,
}
s.IntegrationBase = NewIntegrationBase(params)
suite.Run(t, s)
}

func (s *WorkflowIDInternalRateLimitIntegrationSuite) SetupSuite() {
s.setupLogger()

s.Logger.Info("Running integration test against test cluster")
clusterMetadata := NewClusterMetadata(s.T(), s.testClusterConfig)
dc := persistence.DynamicConfiguration{
EnableCassandraAllConsistencyLevelDelete: dynamicconfig.GetBoolPropertyFn(true),
PersistenceSampleLoggingRate: dynamicconfig.GetIntPropertyFn(100),
EnableShardIDMetrics: dynamicconfig.GetBoolPropertyFn(true),
}
params := pt.TestBaseParams{
DefaultTestCluster: s.defaultTestCluster,
VisibilityTestCluster: s.visibilityTestCluster,
ClusterMetadata: clusterMetadata,
DynamicConfiguration: dc,
}
cluster, err := NewCluster(s.T(), s.testClusterConfig, s.Logger, params)
s.Require().NoError(err)
s.testCluster = cluster
s.engine = s.testCluster.GetFrontendClient()
s.adminClient = s.testCluster.GetAdminClient()

s.domainName = s.randomizeStr("integration-test-domain")
s.Require().NoError(s.registerDomain(s.domainName, 1, types.ArchivalStatusDisabled, "", types.ArchivalStatusDisabled, ""))

s.domainCacheRefresh()
}

func (s *WorkflowIDInternalRateLimitIntegrationSuite) SetupTest() {
s.Assertions = require.New(s.T())
}

func (s *WorkflowIDInternalRateLimitIntegrationSuite) TearDownSuite() {
s.tearDownSuite()
}

func (s *WorkflowIDInternalRateLimitIntegrationSuite) TestWorkflowIDSpecificInternalRateLimits() {
const (
testWorkflowID = "integration-workflow-specific-internal-rate-limit-test"
testWorkflowType = "integration-workflow-specific-internal-rate-limit-test-type"
testTaskListName = "integration-workflow-specific-internal-rate-limit-test-taskList"
testIdentity = "worker1"
)

activityName := "test-activity"

request := &types.StartWorkflowExecutionRequest{
RequestID: uuid.New(),
Domain: s.domainName,
WorkflowID: testWorkflowID,
WorkflowType: &types.WorkflowType{Name: testWorkflowType},
TaskList: &types.TaskList{Name: testTaskListName},
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1),
Identity: testIdentity,
WorkflowIDReusePolicy: types.WorkflowIDReusePolicyTerminateIfRunning.Ptr(),
}

ctx, cancel := createContext()
defer cancel()

we, err := s.engine.StartWorkflowExecution(ctx, request)
s.NoError(err)

s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunID))

workflowComplete := false
activityCount := int32(5)
activityCounter := int32(0)

dtHandler := func(execution *types.WorkflowExecution, wt *types.WorkflowType,
sankari165 marked this conversation as resolved.
Show resolved Hide resolved
previousStartedEventID, startedEventID int64, history *types.History) ([]byte, []*types.Decision, error) {
if activityCounter < activityCount {
activityCounter++
buf := new(bytes.Buffer)
s.Nil(binary.Write(buf, binary.LittleEndian, activityCounter))

return []byte(strconv.Itoa(int(activityCounter))), []*types.Decision{{
DecisionType: types.DecisionTypeScheduleActivityTask.Ptr(),
ScheduleActivityTaskDecisionAttributes: &types.ScheduleActivityTaskDecisionAttributes{
ActivityID: strconv.Itoa(int(activityCounter)),
ActivityType: &types.ActivityType{Name: activityName},
TaskList: &types.TaskList{Name: testTaskListName},
Input: buf.Bytes(),
ScheduleToCloseTimeoutSeconds: common.Int32Ptr(1),
ScheduleToStartTimeoutSeconds: common.Int32Ptr(1),
StartToCloseTimeoutSeconds: common.Int32Ptr(1),
HeartbeatTimeoutSeconds: common.Int32Ptr(1),
},
}}, nil
}

s.Logger.Info("Completing Workflow.")

workflowComplete = true
return []byte(strconv.Itoa(int(activityCounter))), []*types.Decision{{
DecisionType: types.DecisionTypeCompleteWorkflowExecution.Ptr(),
CompleteWorkflowExecutionDecisionAttributes: &types.CompleteWorkflowExecutionDecisionAttributes{
Result: []byte("Done."),
},
}}, nil
}

activityExecutedCount := int32(0)
atHandler := func(execution *types.WorkflowExecution, activityType *types.ActivityType,
activityID string, input []byte, taskToken []byte) ([]byte, bool, error) {
s.Equal(testWorkflowID, execution.WorkflowID)
s.Equal(activityName, activityType.Name)

activityExecutedCount++
return []byte("Activity Result."), false, nil
}

poller := &TaskPoller{
Engine: s.engine,
Domain: s.domainName,
TaskList: &types.TaskList{Name: testTaskListName},
Identity: testIdentity,
DecisionHandler: dtHandler,
ActivityHandler: atHandler,
Logger: s.Logger,
T: s.T(),
}

for i := int32(0); i < activityCount; i++ {
_, err = poller.PollAndProcessDecisionTask(false, false)
s.True(err == nil || err == matching.ErrNoTasks)

err = poller.PollAndProcessActivityTask(false)
s.True(err == nil || err == matching.ErrNoTasks)
}

s.Logger.Info("Waiting for workflow to complete", tag.WorkflowRunID(we.RunID))

s.False(workflowComplete)
_, err = poller.PollAndProcessDecisionTask(false, false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not super familiar with these engine tests. Does this block here waiting for a reponse (for N seconds or whatever) until there's a task?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has a fixed 5 attempts and then says no tasks found

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

host/taskpoller.go

s.True(err == nil || err == matching.ErrNoTasks)
s.True(workflowComplete)

historyResponse, err := s.engine.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{
Domain: s.domainName,
Execution: &types.WorkflowExecution{
WorkflowID: testWorkflowID,
},
})
s.NoError(err)
history := historyResponse.History
firstEvent := history.Events[0]
lastEvent := history.Events[len(history.Events)-1]
// First 7 event ids --> 0 (Workflow start), 1-3 ( Decision scheduled,started,completed) , 4-6 (Activity scheduled,started,completed) post which the tasks will be rate limited since RPS is 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not super clear to me how this test validates workflow id rate limits. how do you ensure these specific events are spread with 2 rps?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These events correspond to two tasks - one decision task and one activity task. The ratelimiter is at the task processing level and after processing 2 tasks, the rate limiter starts to return an error

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess there's no externally visible error being returned, just a delay?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes there is no externally visible error since the rate limit error is being retried and delays the task processing

eventBeforeRatelimited := history.Events[7]

timeElapsedBeforeRatelimiting := time.Unix(0, common.Int64Default(eventBeforeRatelimited.Timestamp)).Sub(time.Unix(0, common.Int64Default(firstEvent.Timestamp)))
s.True(timeElapsedBeforeRatelimiting < 1*time.Second)

totalTime := time.Unix(0, common.Int64Default(lastEvent.Timestamp)).Sub(time.Unix(0, common.Int64Default(firstEvent.Timestamp)))
s.True(totalTime > 3*time.Second)

}
Loading