Skip to content

Commit

Permalink
Remove WorkflowIDExternal/InternalRateLimitEnabled dynamic configs (c…
Browse files Browse the repository at this point in the history
…adence-workflow#6618)

* remove WorkflowIDExternalRateLimitEnabled and WorkflowIDInternalRateLimitEnabled

* run make pr
  • Loading branch information
arzonus authored Jan 15, 2025
1 parent 89dee7f commit d500a06
Show file tree
Hide file tree
Showing 24 changed files with 170 additions and 391 deletions.
24 changes: 0 additions & 24 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1793,18 +1793,6 @@ const (
// Default value: true
// Allowed filters: DomainName
EnableRecordWorkflowExecutionUninitialized
// WorkflowIDExternalRateLimitEnabled is the key to enable/disable rate limiting for workflowID specific information for external requests
// KeyName: history.workflowIDExternalRateLimitEnabled
// Value type: Bool
// Default value: false
// Allowed filters: DomainName
WorkflowIDExternalRateLimitEnabled
// WorkflowIDInternalRateLimitEnabled is the key to enable/disable rate limiting for workflowID specific information for internal requests
// KeyName: history.workflowIDInternalRateLimitEnabled
// Value type: Bool
// Default value: false
// Allowed filters: DomainName
WorkflowIDInternalRateLimitEnabled
// AllowArchivingIncompleteHistory will continue on when seeing some error like history mutated(usually caused by database consistency issues)
// KeyName: worker.AllowArchivingIncompleteHistory
// Value type: Bool
Expand Down Expand Up @@ -4379,18 +4367,6 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "Enable TaskValidation",
DefaultValue: false,
},
WorkflowIDExternalRateLimitEnabled: {
KeyName: "history.workflowIDExternalRateLimitEnabled",
Filters: []Filter{DomainName},
Description: "WorkflowIDExternalRateLimitEnabled is the key to enable/disable rate limiting of specific workflowIDs for external requests",
DefaultValue: false,
},
WorkflowIDInternalRateLimitEnabled: {
KeyName: "history.workflowIDInternalRateLimitEnabled",
Filters: []Filter{DomainName},
Description: "WorkflowIDInternalRateLimitEnabled is the key to enable/disable rate limiting of specific workflowIDs for internal requests",
DefaultValue: false,
},
EnableRetryForChecksumFailure: {
KeyName: "history.enableMutableStateChecksumFailureRetry",
Filters: []Filter{DomainName},
Expand Down
3 changes: 1 addition & 2 deletions host/workflowidratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ func TestWorkflowIDRateLimitIntegrationSuite(t *testing.T) {

clusterConfig.TimeSource = clock.NewMockedTimeSource()
clusterConfig.HistoryDynamicConfigOverrides = map[dynamicconfig.Key]interface{}{
dynamicconfig.WorkflowIDExternalRPS: 5,
dynamicconfig.WorkflowIDExternalRateLimitEnabled: true,
dynamicconfig.WorkflowIDExternalRPS: 5,
}

testCluster := NewPersistenceTestCluster(t, clusterConfig)
Expand Down
3 changes: 1 addition & 2 deletions host/workflowsidinternalratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ func TestWorkflowIDInternalRateLimitIntegrationSuite(t *testing.T) {

clusterConfig.TimeSource = clock.NewMockedTimeSource()
clusterConfig.HistoryDynamicConfigOverrides = map[dynamicconfig.Key]interface{}{
dynamicconfig.WorkflowIDInternalRPS: 2,
dynamicconfig.WorkflowIDInternalRateLimitEnabled: true,
dynamicconfig.WorkflowIDInternalRPS: 2,
}

testCluster := NewPersistenceTestCluster(t, clusterConfig)
Expand Down
12 changes: 4 additions & 8 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,8 @@ type Config struct {
EnableRecordWorkflowExecutionUninitialized dynamicconfig.BoolPropertyFnWithDomainFilter

// The following are used by the history workflowID cache
WorkflowIDExternalRateLimitEnabled dynamicconfig.BoolPropertyFnWithDomainFilter
WorkflowIDInternalRateLimitEnabled dynamicconfig.BoolPropertyFnWithDomainFilter
WorkflowIDExternalRPS dynamicconfig.IntPropertyFnWithDomainFilter
WorkflowIDInternalRPS dynamicconfig.IntPropertyFnWithDomainFilter
WorkflowIDExternalRPS dynamicconfig.IntPropertyFnWithDomainFilter
WorkflowIDInternalRPS dynamicconfig.IntPropertyFnWithDomainFilter

// The following are used by consistent query
EnableConsistentQuery dynamicconfig.BoolPropertyFn
Expand Down Expand Up @@ -513,10 +511,8 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, i
EnableReplicationTaskGeneration: dc.GetBoolPropertyFilteredByDomainIDAndWorkflowID(dynamicconfig.EnableReplicationTaskGeneration),
EnableRecordWorkflowExecutionUninitialized: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableRecordWorkflowExecutionUninitialized),

WorkflowIDExternalRateLimitEnabled: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.WorkflowIDExternalRateLimitEnabled),
WorkflowIDInternalRateLimitEnabled: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.WorkflowIDInternalRateLimitEnabled),
WorkflowIDExternalRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.WorkflowIDExternalRPS),
WorkflowIDInternalRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.WorkflowIDInternalRPS),
WorkflowIDExternalRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.WorkflowIDExternalRPS),
WorkflowIDInternalRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.WorkflowIDInternalRPS),

EnableConsistentQuery: dc.GetBoolProperty(dynamicconfig.EnableConsistentQuery),
EnableConsistentQueryByDomain: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableConsistentQueryByDomain),
Expand Down
2 changes: 0 additions & 2 deletions service/history/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,6 @@ func TestNewConfig(t *testing.T) {
"ReplicationTaskGenerationQPS": {dynamicconfig.ReplicationTaskGenerationQPS, 14.0},
"EnableReplicationTaskGeneration": {dynamicconfig.EnableReplicationTaskGeneration, true},
"EnableRecordWorkflowExecutionUninitialized": {dynamicconfig.EnableRecordWorkflowExecutionUninitialized, true},
"WorkflowIDExternalRateLimitEnabled": {dynamicconfig.WorkflowIDExternalRateLimitEnabled, true},
"WorkflowIDInternalRateLimitEnabled": {dynamicconfig.WorkflowIDInternalRateLimitEnabled, true},
"WorkflowIDExternalRPS": {dynamicconfig.WorkflowIDExternalRPS, 87},
"WorkflowIDInternalRPS": {dynamicconfig.WorkflowIDInternalRPS, 88},
"EnableConsistentQuery": {dynamicconfig.EnableConsistentQuery, true},
Expand Down
79 changes: 37 additions & 42 deletions service/history/engine/engineimpl/history_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/uber/cadence/common/client"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/dynamicconfig"
ce "github.com/uber/cadence/common/errors"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
Expand Down Expand Up @@ -82,42 +81,41 @@ var (
)

type historyEngineImpl struct {
currentClusterName string
shard shard.Context
timeSource clock.TimeSource
decisionHandler decision.Handler
clusterMetadata cluster.Metadata
historyV2Mgr persistence.HistoryManager
executionManager persistence.ExecutionManager
visibilityMgr persistence.VisibilityManager
txProcessor queue.Processor
timerProcessor queue.Processor
nDCReplicator ndc.HistoryReplicator
nDCActivityReplicator ndc.ActivityReplicator
historyEventNotifier events.Notifier
tokenSerializer common.TaskTokenSerializer
executionCache execution.Cache
metricsClient metrics.Client
logger log.Logger
throttledLogger log.Logger
config *config.Config
archivalClient warchiver.Client
workflowResetter reset.WorkflowResetter
queueTaskProcessor task.Processor
replicationTaskProcessors []replication.TaskProcessor
replicationAckManager replication.TaskAckManager
replicationTaskStore *replication.TaskStore
replicationHydrator replication.TaskHydrator
replicationMetricsEmitter *replication.MetricsEmitterImpl
publicClient workflowserviceclient.Interface
eventsReapplier ndc.EventsReapplier
matchingClient matching.Client
rawMatchingClient matching.Client
clientChecker client.VersionChecker
replicationDLQHandler replication.DLQHandler
failoverMarkerNotifier failover.MarkerNotifier
wfIDCache workflowcache.WFCache
ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter
currentClusterName string
shard shard.Context
timeSource clock.TimeSource
decisionHandler decision.Handler
clusterMetadata cluster.Metadata
historyV2Mgr persistence.HistoryManager
executionManager persistence.ExecutionManager
visibilityMgr persistence.VisibilityManager
txProcessor queue.Processor
timerProcessor queue.Processor
nDCReplicator ndc.HistoryReplicator
nDCActivityReplicator ndc.ActivityReplicator
historyEventNotifier events.Notifier
tokenSerializer common.TaskTokenSerializer
executionCache execution.Cache
metricsClient metrics.Client
logger log.Logger
throttledLogger log.Logger
config *config.Config
archivalClient warchiver.Client
workflowResetter reset.WorkflowResetter
queueTaskProcessor task.Processor
replicationTaskProcessors []replication.TaskProcessor
replicationAckManager replication.TaskAckManager
replicationTaskStore *replication.TaskStore
replicationHydrator replication.TaskHydrator
replicationMetricsEmitter *replication.MetricsEmitterImpl
publicClient workflowserviceclient.Interface
eventsReapplier ndc.EventsReapplier
matchingClient matching.Client
rawMatchingClient matching.Client
clientChecker client.VersionChecker
replicationDLQHandler replication.DLQHandler
failoverMarkerNotifier failover.MarkerNotifier
wfIDCache workflowcache.WFCache

updateWithActionFn func(context.Context, execution.Cache, string, types.WorkflowExecution, bool, time.Time, func(wfContext execution.Context, mutableState execution.MutableState) error) error
}
Expand Down Expand Up @@ -146,7 +144,6 @@ func NewEngineWithShardContext(
queueTaskProcessor task.Processor,
failoverCoordinator failover.Coordinator,
wfIDCache workflowcache.WFCache,
ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter,
queueProcessorFactory queue.ProcessorFactory,
) engine.Engine {
currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName()
Expand Down Expand Up @@ -230,9 +227,8 @@ func NewEngineWithShardContext(
replicationTaskStore: replicationTaskStore,
replicationMetricsEmitter: replication.NewMetricsEmitter(
shard.GetShardID(), shard, replicationReader, shard.GetMetricsClient()),
wfIDCache: wfIDCache,
ratelimitInternalPerWorkflowID: ratelimitInternalPerWorkflowID,
updateWithActionFn: workflow.UpdateWithAction,
wfIDCache: wfIDCache,
updateWithActionFn: workflow.UpdateWithAction,
}
historyEngImpl.decisionHandler = decision.NewHandler(
shard,
Expand All @@ -255,7 +251,6 @@ func NewEngineWithShardContext(
historyEngImpl.archivalClient,
openExecutionCheck,
historyEngImpl.wfIDCache,
historyEngImpl.ratelimitInternalPerWorkflowID,
)

historyEngImpl.timerProcessor = queueProcessorFactory.NewTimerQueueProcessor(
Expand Down
6 changes: 1 addition & 5 deletions service/history/engine/testdata/engine_for_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (

"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/quotas"
"github.com/uber/cadence/service/history/config"
Expand Down Expand Up @@ -65,7 +64,6 @@ type NewEngineFn func(
queueTaskProcessor task.Processor,
failoverCoordinator failover.Coordinator,
wfIDCache workflowcache.WFCache,
ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter,
queueProcessorFactory queue.ProcessorFactory,
) engine.Engine

Expand Down Expand Up @@ -135,7 +133,6 @@ func NewEngineForTest(t *testing.T, newEngineFn NewEngineFn) *EngineForTest {

failoverCoordinator := failover.NewMockCoordinator(controller)
wfIDCache := workflowcache.NewMockWFCache(controller)
ratelimitInternalPerWorkflowID := dynamicconfig.GetBoolPropertyFnFilteredByDomain(false)

queueProcessorFactory := queue.NewMockProcessorFactory(controller)
timerQProcessor := queue.NewMockProcessor(controller)
Expand All @@ -152,7 +149,7 @@ func NewEngineForTest(t *testing.T, newEngineFn NewEngineFn) *EngineForTest {
transferQProcessor.EXPECT().NotifyNewTask(gomock.Any(), gomock.Any()).Return().AnyTimes()
transferQProcessor.EXPECT().Stop().Return().Times(1)
queueProcessorFactory.EXPECT().
NewTransferQueueProcessor(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
NewTransferQueueProcessor(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(transferQProcessor).
Times(1)

Expand All @@ -168,7 +165,6 @@ func NewEngineForTest(t *testing.T, newEngineFn NewEngineFn) *EngineForTest {
queueTaskProcessor,
failoverCoordinator,
wfIDCache,
ratelimitInternalPerWorkflowID,
queueProcessorFactory,
)

Expand Down
43 changes: 19 additions & 24 deletions service/history/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/definition"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/membership"
Expand Down Expand Up @@ -70,20 +69,19 @@ type (
handlerImpl struct {
resource.Resource

shuttingDown int32
controller shard.Controller
tokenSerializer common.TaskTokenSerializer
startWG sync.WaitGroup
config *config.Config
historyEventNotifier events.Notifier
rateLimiter quotas.Limiter
replicationTaskFetchers replication.TaskFetchers
queueTaskProcessor task.Processor
failoverCoordinator failover.Coordinator
workflowIDCache workflowcache.WFCache
ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter
queueProcessorFactory queue.ProcessorFactory
ratelimitAggregator algorithm.RequestWeighted
shuttingDown int32
controller shard.Controller
tokenSerializer common.TaskTokenSerializer
startWG sync.WaitGroup
config *config.Config
historyEventNotifier events.Notifier
rateLimiter quotas.Limiter
replicationTaskFetchers replication.TaskFetchers
queueTaskProcessor task.Processor
failoverCoordinator failover.Coordinator
workflowIDCache workflowcache.WFCache
queueProcessorFactory queue.ProcessorFactory
ratelimitAggregator algorithm.RequestWeighted
}
)

Expand All @@ -95,16 +93,14 @@ func NewHandler(
resource resource.Resource,
config *config.Config,
wfCache workflowcache.WFCache,
ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter,
) Handler {
handler := &handlerImpl{
Resource: resource,
config: config,
tokenSerializer: common.NewJSONTaskTokenSerializer(),
rateLimiter: quotas.NewDynamicRateLimiter(config.RPS.AsFloat64()),
workflowIDCache: wfCache,
ratelimitInternalPerWorkflowID: ratelimitInternalPerWorkflowID,
ratelimitAggregator: resource.GetRatelimiterAlgorithm(),
Resource: resource,
config: config,
tokenSerializer: common.NewJSONTaskTokenSerializer(),
rateLimiter: quotas.NewDynamicRateLimiter(config.RPS.AsFloat64()),
workflowIDCache: wfCache,
ratelimitAggregator: resource.GetRatelimiterAlgorithm(),
}

// prevent us from trying to serve requests before shard controller is started and ready
Expand Down Expand Up @@ -215,7 +211,6 @@ func (h *handlerImpl) CreateEngine(
h.queueTaskProcessor,
h.failoverCoordinator,
h.workflowIDCache,
h.ratelimitInternalPerWorkflowID,
queue.NewProcessorFactory(),
)
}
Expand Down
3 changes: 1 addition & 2 deletions service/history/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ func (s *handlerSuite) SetupTest() {
s.mockEngine = engine.NewMockEngine(s.controller)
s.mockWFCache = workflowcache.NewMockWFCache(s.controller)
s.mockFailoverCoordinator = failover.NewMockCoordinator(s.controller)
internalRequestRateLimitingEnabledConfig := func(domainName string) bool { return false }
s.handler = NewHandler(s.mockResource, config.NewForTest(), s.mockWFCache, internalRequestRateLimitingEnabledConfig).(*handlerImpl)
s.handler = NewHandler(s.mockResource, config.NewForTest(), s.mockWFCache).(*handlerImpl)
s.handler.controller = s.mockShardController
s.mockTokenSerializer = common.NewMockTaskTokenSerializer(s.controller)
s.mockRatelimiter = quotas.NewMockLimiter(s.controller)
Expand Down
15 changes: 1 addition & 14 deletions service/history/queue/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package queue

import (
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/reconciliation/invariant"
"github.com/uber/cadence/service/history/engine"
"github.com/uber/cadence/service/history/execution"
Expand All @@ -35,17 +34,7 @@ import (
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination factory_mock.go -self_package github.com/uber/cadence/service/history/queue

type ProcessorFactory interface {
NewTransferQueueProcessor(
shard shard.Context,
historyEngine engine.Engine,
taskProcessor task.Processor,
executionCache execution.Cache,
workflowResetter reset.WorkflowResetter,
archivalClient archiver.Client,
executionCheck invariant.Invariant,
wfIDCache workflowcache.WFCache,
ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter,
) Processor
NewTransferQueueProcessor(shard shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, executionCache execution.Cache, workflowResetter reset.WorkflowResetter, archivalClient archiver.Client, executionCheck invariant.Invariant, wfIDCache workflowcache.WFCache) Processor

NewTimerQueueProcessor(
shard shard.Context,
Expand Down Expand Up @@ -73,7 +62,6 @@ func (f *factoryImpl) NewTransferQueueProcessor(
archivalClient archiver.Client,
executionCheck invariant.Invariant,
wfIDCache workflowcache.WFCache,
ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter,
) Processor {
return NewTransferQueueProcessor(
shard,
Expand All @@ -84,7 +72,6 @@ func (f *factoryImpl) NewTransferQueueProcessor(
archivalClient,
executionCheck,
wfIDCache,
ratelimitInternalPerWorkflowID,
)
}

Expand Down
Loading

0 comments on commit d500a06

Please sign in to comment.