Skip to content

Commit

Permalink
fix direct error type assertion and a few mishandled cases with !ok i…
Browse files Browse the repository at this point in the history
…nstead of ok
  • Loading branch information
3vilhamster committed May 24, 2024
1 parent 9e76b6c commit 528e29c
Show file tree
Hide file tree
Showing 32 changed files with 92 additions and 81 deletions.
2 changes: 1 addition & 1 deletion bench/lib/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (client CadenceClient) CreateDomain(name string, desc string, owner string)
defer cancel()
err := client.Register(ctx, req)
if err != nil {
if errors.As(err, new(*shared.DomainAlreadyExistsError)) {
if !errors.As(err, new(*shared.DomainAlreadyExistsError)) {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion canary/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (c *canaryImpl) startCronWorkflow() {
if err != nil {
// TODO: improvement: compare the cron schedule to decide whether or not terminating the current one
// https://github.com/uber/cadence/issues/4469
if errors.As(err, new(*shared.WorkflowExecutionAlreadyStartedError)) {
if !errors.As(err, new(*shared.WorkflowExecutionAlreadyStartedError)) {
c.runtime.logger.Error("error starting cron workflow", zap.Error(err))
} else {
c.runtime.logger.Info("cron workflow already started, you may need to terminate and restart if cron schedule is changed...")
Expand Down
2 changes: 1 addition & 1 deletion canary/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (client *cadenceClient) createDomain(name string, desc string, owner string
}
err := client.Register(context.Background(), req)
if err != nil {
if errors.As(err, new(*shared.DomainAlreadyExistsError)) {
if !errors.As(err, new(*shared.DomainAlreadyExistsError)) {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion canary/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func timeoutWorkflow(ctx workflow.Context, inputScheduledTimeNanos int64) error

activityErr := activityFuture.Get(ctx, nil)
if activityErr != nil {
if errors.As(err, new(*workflow.TimeoutError)) {
if !errors.As(err, new(*workflow.TimeoutError)) {
workflow.GetLogger(ctx).Info("activity timeout failed", zap.Error(activityErr))
} else {
activityErr = nil
Expand Down
9 changes: 5 additions & 4 deletions common/persistence/sql/sqlplugin/postgres/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package postgres
import (
"context"
"database/sql"
"errors"
"time"

"github.com/jmoiron/sqlx"
Expand Down Expand Up @@ -56,8 +57,8 @@ const ErrInsufficientResources = "53000"
const ErrTooManyConnections = "53300"

func (pdb *db) IsDupEntryError(err error) bool {
sqlErr, ok := err.(*pq.Error)
return ok && sqlErr.Code == ErrDupEntry
var sqlErr *pq.Error
return errors.As(err, &sqlErr) && sqlErr.Code == ErrDupEntry
}

func (pdb *db) IsNotFoundError(err error) bool {
Expand All @@ -69,8 +70,8 @@ func (pdb *db) IsTimeoutError(err error) bool {
}

func (pdb *db) IsThrottlingError(err error) bool {
sqlErr, ok := err.(*pq.Error)
if ok {
var sqlErr *pq.Error
if errors.As(err, &sqlErr) {
if sqlErr.Code == ErrTooManyConnections ||
sqlErr.Code == ErrInsufficientResources {
return true
Expand Down
2 changes: 1 addition & 1 deletion service/frontend/admin/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ func (adh *adminHandlerImpl) GetWorkflowExecutionRawHistoryV2(
DomainName: request.GetDomain(),
})
if err != nil {
if _, ok := err.(*types.EntityNotExistsError); ok {
if errors.As(err, new(*types.EntityNotExistsError)) {
// when no events can be returned from DB, DB layer will return
// EntityNotExistsError, this API shall return empty response
return &types.GetWorkflowExecutionRawHistoryV2Response{
Expand Down
3 changes: 2 additions & 1 deletion service/history/decision/task_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package decision

import (
"context"
"errors"
"fmt"

"github.com/pborman/uuid"
Expand Down Expand Up @@ -1074,7 +1075,7 @@ func (handler *taskHandlerImpl) validateDecisionAttr(
) error {

if err := validationFn(); err != nil {
if _, ok := err.(*types.BadRequestError); ok {
if errors.As(err, new(*types.BadRequestError)) {
return handler.handlerFailDecision(failedCause, err.Error())
}
return err
Expand Down
12 changes: 3 additions & 9 deletions service/history/engine/engineimpl/history_engine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1225,9 +1225,7 @@ func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_NonDeDup() {
RequestID: "newRequestID",
},
})
if _, ok := err.(*types.WorkflowExecutionAlreadyStartedError); !ok {
s.Fail("return err is not *types.WorkflowExecutionAlreadyStartedError")
}
s.ErrorAs(err, new(*types.WorkflowExecutionAlreadyStartedError))
s.Nil(resp)
}

Expand Down Expand Up @@ -1425,9 +1423,7 @@ func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevSuccess() {
})

if expectedErrs[index] {
if _, ok := err.(*types.WorkflowExecutionAlreadyStartedError); !ok {
s.Fail("return err is not *types.WorkflowExecutionAlreadyStartedError")
}
s.ErrorAs(err, new(*types.WorkflowExecutionAlreadyStartedError))
s.Nil(resp)
} else {
s.Nil(err)
Expand Down Expand Up @@ -1515,9 +1511,7 @@ func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevFail() {
})

if expectedErrs[j] {
if _, ok := err.(*types.WorkflowExecutionAlreadyStartedError); !ok {
s.Fail("return err is not *types.WorkflowExecutionAlreadyStartedError")
}
s.ErrorAs(err, new(*types.WorkflowExecutionAlreadyStartedError))
s.Nil(resp)
} else {
s.Nil(err)
Expand Down
4 changes: 2 additions & 2 deletions service/history/engine/engineimpl/poll_mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package engineimpl
import (
"bytes"
"context"
"errors"
"time"

"github.com/uber/cadence/common"
Expand Down Expand Up @@ -120,8 +121,7 @@ func (e *historyEngineImpl) getMutableState(
}

func (e *historyEngineImpl) updateEntityNotExistsErrorOnPassiveCluster(err error, domainID string) error {
switch err.(type) {
case *types.EntityNotExistsError:
if errors.As(err, new(*types.EntityNotExistsError)) {
domainEntry, domainCacheErr := e.shard.GetDomainCache().GetDomainByID(domainID)
if domainCacheErr != nil {
return err // if could not access domain cache simply return original error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package engineimpl

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -232,7 +233,8 @@ func (e *historyEngineImpl) startWorkflowHelper(
return nil, t
}
// handle already started error
if t, ok := err.(*persistence.WorkflowExecutionAlreadyStartedError); ok {
var t *persistence.WorkflowExecutionAlreadyStartedError
if errors.As(err, &t) {

if t.StartRequestID == request.GetRequestID() {
return &types.StartWorkflowExecutionResponse{
Expand Down
4 changes: 2 additions & 2 deletions service/history/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -1154,7 +1154,7 @@ func createWorkflowExecutionWithRetry(
return err
}
isRetryable := func(err error) bool {
if _, ok := err.(*persistence.TimeoutError); ok {
if errors.As(err, new(*persistence.TimeoutError)) {
// TODO: is timeout error retryable for create workflow?
// if we treat it as retryable, user may receive workflowAlreadyRunning error
// on the first start workflow execution request.
Expand Down Expand Up @@ -1239,7 +1239,7 @@ func updateWorkflowExecutionWithRetry(
// checker, _ := taskvalidator.NewWfChecker(zapLogger, metricsClient, domainCache, executionManager, historymanager)

isRetryable := func(err error) bool {
if _, ok := err.(*persistence.TimeoutError); ok {
if errors.As(err, new(*persistence.TimeoutError)) {
// timeout error is not retryable for update workflow execution
return false
}
Expand Down
3 changes: 2 additions & 1 deletion service/history/ndc/activity_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package ndc

import (
ctx "context"
"errors"
"time"

"github.com/uber/cadence/common"
Expand Down Expand Up @@ -100,7 +101,7 @@ func (r *activityReplicatorImpl) SyncActivity(

mutableState, err := context.LoadWorkflowExecution(ctx)
if err != nil {
if _, ok := err.(*types.EntityNotExistsError); !ok {
if !errors.As(err, new(*types.EntityNotExistsError)) {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions service/history/ndc/workflow_resetter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ func (s *workflowResetterSuite) TestResetWorkflow_Error() {
s.IsType(&types.RetryTaskV2Error{}, err)
s.Nil(rebuiltMutableState)

retryErr, isRetryError := err.(*types.RetryTaskV2Error)
s.True(isRetryError)
var retryErr *types.RetryTaskV2Error
s.ErrorAs(err, &retryErr)
expectedErr := &types.RetryTaskV2Error{
Message: resendOnResetWorkflowMessage,
DomainID: s.domainID,
Expand Down
6 changes: 3 additions & 3 deletions service/history/queue/task_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (t *taskAllocatorImpl) VerifyActiveTask(taskDomainID string, task interface
if err != nil {
// it is possible that the domain is deleted
// we should treat that domain as active
if _, ok := err.(*types.EntityNotExistsError); !ok {
if !errors.As(err, new(*types.EntityNotExistsError)) {
t.logger.Warn("Cannot find domain", tag.WorkflowDomainID(taskDomainID))
return false, err
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func (t *taskAllocatorImpl) VerifyFailoverActiveTask(targetDomainIDs map[string]
if err != nil {
// it is possible that the domain is deleted
// we should treat that domain as not active
if _, ok := err.(*types.EntityNotExistsError); !ok {
if !errors.As(err, new(*types.EntityNotExistsError)) {
t.logger.Warn("Cannot find domain", tag.WorkflowDomainID(taskDomainID))
return false, err
}
Expand Down Expand Up @@ -139,7 +139,7 @@ func (t *taskAllocatorImpl) VerifyStandbyTask(standbyCluster string, taskDomainI
if err != nil {
// it is possible that the domain is deleted
// we should treat that domain as not active
if _, ok := err.(*types.EntityNotExistsError); !ok {
if !errors.As(err, new(*types.EntityNotExistsError)) {
t.logger.Warn("Cannot find domain", tag.WorkflowDomainID(taskDomainID))
return false, err
}
Expand Down
4 changes: 3 additions & 1 deletion service/history/queue/timer_queue_standby_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package queue

import (
"errors"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/persistence"
Expand Down Expand Up @@ -62,7 +64,7 @@ func newTimerQueueStandbyProcessor(
return true, nil
}
} else {
if _, ok := err.(*types.EntityNotExistsError); !ok {
if !errors.As(err, new(*types.EntityNotExistsError)) {
// retry the task if failed to find the domain
logger.Warn("Cannot find domain", tag.WorkflowDomainID(timer.DomainID))
return false, err
Expand Down
2 changes: 1 addition & 1 deletion service/history/queue/transfer_queue_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ func newTransferQueueStandbyProcessor(
return true, nil
}
} else {
if _, ok := err.(*types.EntityNotExistsError); !ok {
if !errors.As(err, new(*types.EntityNotExistsError)) {
// retry the task if failed to find the domain
logger.Warn("Cannot find domain", tag.WorkflowDomainID(task.DomainID))
return false, err
Expand Down
8 changes: 6 additions & 2 deletions service/history/replication/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package replication

import (
"context"
"errors"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
Expand Down Expand Up @@ -306,6 +307,9 @@ func (e *taskExecutorImpl) filterTask(
}

func toRetryTaskV2Error(err error) (*types.RetryTaskV2Error, bool) {
retError, ok := err.(*types.RetryTaskV2Error)
return retError, ok
var retError *types.RetryTaskV2Error
if errors.As(err, &retError) {
return retError, true
}
return nil, false
}
5 changes: 3 additions & 2 deletions service/history/replication/task_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package replication

import (
"context"
"errors"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -235,7 +236,7 @@ func (f *taskFetcherImpl) fetchTasks() {
// When timer fires, we collect all the requests we have so far and attempt to send them to remote.
err := f.fetchAndDistributeTasksFn(requestByShard)
if err != nil {
if _, ok := err.(*types.ServiceBusyError); ok {
if errors.As(err, new(*types.ServiceBusyError)) {
// slow down replication when source cluster is busy
timer.Reset(f.config.ReplicationTaskFetcherServiceBusyWait())
} else {
Expand Down Expand Up @@ -265,7 +266,7 @@ func (f *taskFetcherImpl) fetchAndDistributeTasks(requestByShard map[int32]*requ

messagesByShard, err := f.getMessages(requestByShard)
if err != nil {
if _, ok := err.(*types.ServiceBusyError); !ok {
if !errors.As(err, new(*types.ServiceBusyError)) {
f.logger.Error("Failed to get replication tasks", tag.Error(err))
} else {
f.logger.Debug("Failed to get replication tasks because service busy")
Expand Down
7 changes: 3 additions & 4 deletions service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -1146,7 +1146,7 @@ func (s *contextImpl) persistShardInfoLocked(

if err != nil {
// Shard is stolen, trigger history engine shutdown
if _, ok := err.(*persistence.ShardOwnershipLostError); ok {
if errors.As(err, new(*persistence.ShardOwnershipLostError)) {
s.logger.Warn(
"Closing shard: updateShardInfoLocked failed due to stolen shard.",
tag.Error(err),
Expand Down Expand Up @@ -1515,8 +1515,7 @@ func acquireShard(
if persistence.IsTransientError(err) {
return true
}
_, ok := err.(*persistence.ShardAlreadyExistError)
return ok
return errors.As(err, new(*persistence.ShardAlreadyExistError))
}

getShard := func() error {
Expand All @@ -1527,7 +1526,7 @@ func acquireShard(
shardInfo = resp.ShardInfo
return nil
}
if _, ok := err.(*types.EntityNotExistsError); !ok {
if !errors.As(err, new(*types.EntityNotExistsError)) {
return err
}

Expand Down
10 changes: 5 additions & 5 deletions service/history/task/cross_cluster_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,26 +374,26 @@ func (t *crossClusterSourceTask) HandleErr(

logEvent(t.eventLogger, "Handling task processing error", err)

if _, ok := err.(*types.EntityNotExistsError); ok {
if errors.As(err, new(*types.EntityNotExistsError)) {
return nil
}
if _, ok := err.(*types.WorkflowExecutionAlreadyCompletedError); ok {
if errors.As(err, new(*types.WorkflowExecutionAlreadyCompletedError)) {
return nil
}

if err == errWorkflowBusy {
if errors.Is(err, errWorkflowBusy) {
t.scope.IncCounter(metrics.TaskWorkflowBusyPerDomain)
return err
}
if err == ErrTaskPendingActive {
if errors.Is(err, ErrTaskPendingActive) {
t.scope.IncCounter(metrics.TaskPendingActiveCounterPerDomain)
return err
}
// return domain not active error here so that the cross-cluster task can be
// convert to a (passive) transfer task

t.scope.IncCounter(metrics.TaskFailuresPerDomain)
if _, ok := err.(*persistence.CurrentWorkflowConditionFailedError); ok {
if errors.As(err, new(*persistence.CurrentWorkflowConditionFailedError)) {
t.logger.Error("More than 2 workflow are running.", tag.Error(err), tag.LifeCycleProcessingFailed)
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion service/history/task/priority_assigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package task

import (
"errors"
"sync"

"github.com/uber/cadence/common"
Expand Down Expand Up @@ -134,7 +135,7 @@ func (a *priorityAssignerImpl) Assign(queueTask Task) error {
func (a *priorityAssignerImpl) getDomainInfo(domainID string) (string, bool, error) {
domainEntry, err := a.domainCache.GetDomainByID(domainID)
if err != nil {
if _, ok := err.(*types.EntityNotExistsError); !ok {
if !errors.As(err, new(*types.EntityNotExistsError)) {
a.logger.Warn("Cannot find domain", tag.WorkflowDomainID(domainID))
return "", false, err
}
Expand Down
Loading

0 comments on commit 528e29c

Please sign in to comment.