Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce support for wrapped errors in the codebase #6041

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
3 changes: 2 additions & 1 deletion bench/lib/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package lib

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -64,7 +65,7 @@ func (client CadenceClient) CreateDomain(name string, desc string, owner string)
defer cancel()
err := client.Register(ctx, req)
if err != nil {
if _, ok := err.(*shared.DomainAlreadyExistsError); !ok {
if !errors.As(err, new(*shared.DomainAlreadyExistsError)) {
return err
}
}
Expand Down
3 changes: 2 additions & 1 deletion bench/lib/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package lib

import (
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -147,7 +148,7 @@ func recordWorkflowEnd(
return err
}
scope.Counter(FailedCount).Inc(1)
if _, ok := err.(*workflow.TimeoutError); ok {
if errors.As(err, new(*workflow.TimeoutError)) {
scope.Counter(errTimeoutCount).Inc(1)
}
return err
Expand Down
12 changes: 5 additions & 7 deletions bench/load/common/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package common

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -51,25 +52,22 @@ func GetActivityServiceConfig(ctx context.Context) *lib.RuntimeContext {

// IsServiceBusyError returns if the err is a ServiceBusyError
func IsServiceBusyError(err error) bool {
_, ok := err.(*shared.ServiceBusyError)
return ok
return errors.As(err, new(*shared.ServiceBusyError))
}

// IsCancellationAlreadyRequestedError returns if the err is a CancellationAlreadyRequestedError
func IsCancellationAlreadyRequestedError(err error) bool {
_, ok := err.(*shared.CancellationAlreadyRequestedError)
return ok
return errors.As(err, new(*shared.CancellationAlreadyRequestedError))
}

// IsEntityNotExistsError returns if the err is a EntityNotExistsError
func IsEntityNotExistsError(err error) bool {
_, ok := err.(*shared.EntityNotExistsError)
return ok
return errors.As(err, new(*shared.EntityNotExistsError))
}

// IsNonRetryableError return true if the err is considered non-retryable
func IsNonRetryableError(err error) bool {
if err == context.DeadlineExceeded || err == context.Canceled {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return true
}

Expand Down
4 changes: 3 additions & 1 deletion bench/load/cron/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package cron

import (
"context"
"errors"
"fmt"
"math/rand"
"strconv"
Expand Down Expand Up @@ -286,7 +287,8 @@ func launcherWorkflow(
} else if err := childFuture.Get(childCtx, nil); err != nil {
result.TestStatus = testStatusFailed
result.Details = err.Error()
if customErr, ok := err.(*cadence.CustomError); ok {
var customErr *cadence.CustomError
if errors.As(err, &customErr) {
var detailStr string
if err := customErr.Details(&detailStr); err == nil {
result.Details += ": " + detailStr
Expand Down
3 changes: 2 additions & 1 deletion canary/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package canary

import (
"context"
"errors"
"fmt"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -158,7 +159,7 @@ func (c *canaryImpl) startCronWorkflow() {
if err != nil {
// TODO: improvement: compare the cron schedule to decide whether or not terminating the current one
// https://github.com/uber/cadence/issues/4469
if _, ok := err.(*shared.WorkflowExecutionAlreadyStartedError); !ok {
if !errors.As(err, new(*shared.WorkflowExecutionAlreadyStartedError)) {
c.runtime.logger.Error("error starting cron workflow", zap.Error(err))
} else {
c.runtime.logger.Info("cron workflow already started, you may need to terminate and restart if cron schedule is changed...")
Expand Down
8 changes: 4 additions & 4 deletions canary/cancellation.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func cancellationWorkflow(ctx workflow.Context, inputScheduledTimeNanos int64) e
workflow.GetLogger(ctx).Info(msg)
return profile.end(errors.New(msg))
}
if _, ok := err.(*cadence.CanceledError); !ok {
if !errors.As(err, new(*cadence.CanceledError)) {
workflow.GetLogger(ctx).Info("cancellationWorkflow failed: child workflow return non CanceledError", zap.Error(err))
return profile.end(err)
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func cancellationWorkflow(ctx workflow.Context, inputScheduledTimeNanos int64) e
workflow.GetLogger(ctx).Info(msg)
return errors.New(msg)
}
if _, ok := err.(*cadence.CanceledError); !ok {
if !errors.As(err, new(*cadence.CanceledError)) {
workflow.GetLogger(ctx).Info("cancellationWorkflow failed: child workflow return non CanceledError", zap.Error(err))
return err
}
Expand Down Expand Up @@ -151,7 +151,7 @@ func cancellationWorkflow(ctx workflow.Context, inputScheduledTimeNanos int64) e
workflow.GetLogger(ctx).Info(msg)
return errors.New(msg)
}
if _, ok := err.(*cadence.CanceledError); !ok {
if !errors.As(err, new(*cadence.CanceledError)) {
workflow.GetLogger(ctx).Info("cancellationWorkflow failed: child workflow return non CanceledError", zap.Error(err))
return err
}
Expand Down Expand Up @@ -192,7 +192,7 @@ func cancellationActivity(ctx context.Context, scheduledTimeNanos int64) error {
if err == nil {
return errors.New("cancellationWorkflow failed: non child workflow not cancelled")
}
if _, ok := err.(*cadence.CanceledError); !ok {
if !errors.As(err, new(*cadence.CanceledError)) {
return err
}
return nil
Expand Down
3 changes: 2 additions & 1 deletion canary/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package canary

import (
"context"
"errors"
"time"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -74,7 +75,7 @@ func (client *cadenceClient) createDomain(name string, desc string, owner string
}
err := client.Register(context.Background(), req)
if err != nil {
if _, ok := err.(*shared.DomainAlreadyExistsError); !ok {
if !errors.As(err, new(*shared.DomainAlreadyExistsError)) {
return err
}
}
Expand Down
10 changes: 5 additions & 5 deletions canary/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package canary

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -122,10 +123,10 @@ func startJob(
wf, err := cadenceClient.StartWorkflow(ctx, opts, jobName, time.Now().UnixNano(), domain)
if err != nil {
scope.Counter(startWorkflowFailureCount).Inc(1)
switch err.(type) {
case *shared.WorkflowExecutionAlreadyStartedError:
switch {
case errors.As(err, new(*shared.WorkflowExecutionAlreadyStartedError)):
scope.Counter(startWorkflowAlreadyStartedCount).Inc(1)
case *shared.DomainNotActiveError:
case errors.As(err, new(*shared.DomainNotActiveError)):
scope.Counter(startWorkflowDomainNotActiveCount).Inc(1)
}
return nil, err
Expand All @@ -135,6 +136,5 @@ func startJob(
}

func isDomainNotActiveErr(err error) bool {
_, ok := err.(*shared.DomainNotActiveError)
return ok
return errors.As(err, new(*shared.DomainNotActiveError))
}
3 changes: 2 additions & 1 deletion canary/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package canary

import (
"errors"
"time"

"github.com/uber-go/tally"
Expand Down Expand Up @@ -99,7 +100,7 @@ func recordWorkflowEnd(scope tally.Scope, elapsed time.Duration, err error) erro
return err
}
scope.Counter(failedCount).Inc(1)
if _, ok := err.(*workflow.TimeoutError); ok {
if errors.As(err, new(*workflow.TimeoutError)) {
scope.Counter(errTimeoutCount).Inc(1)
}
return err
Expand Down
3 changes: 2 additions & 1 deletion canary/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package canary

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -61,7 +62,7 @@ func timeoutWorkflow(ctx workflow.Context, inputScheduledTimeNanos int64) error

activityErr := activityFuture.Get(ctx, nil)
if activityErr != nil {
if _, ok := activityErr.(*workflow.TimeoutError); !ok {
if !errors.As(activityErr, new(*workflow.TimeoutError)) {
workflow.GetLogger(ctx).Info("activity timeout failed", zap.Error(activityErr))
} else {
activityErr = nil
Expand Down
3 changes: 1 addition & 2 deletions canary/visibilityArchival.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,5 @@ func getURIScheme(URI string) string {
func isBadRequestError(
err error,
) bool {
_, ok := err.(*shared.BadRequestError)
return ok
return errors.As(err, new(*shared.BadRequestError))
}
6 changes: 4 additions & 2 deletions client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package history

import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
Expand Down Expand Up @@ -756,7 +757,7 @@ func (c *clientImpl) GetReplicationMessages(
tag.ShardReplicationToken(req),
)
// Returns service busy error to notify replication
if _, ok := err.(*types.ServiceBusyError); ok {
if errors.As(err, new(*types.ServiceBusyError)) {
return err
}
return nil
Expand Down Expand Up @@ -1113,7 +1114,8 @@ redirectLoop:
}
err = op(ctx, peer)
if err != nil {
if s, ok := err.(*types.ShardOwnershipLostError); ok {
var s *types.ShardOwnershipLostError
if errors.As(err, &s) {
// TODO: consider emitting a metric for number of redirects
peer, err = c.peerResolver.FromHostAddress(s.GetOwner())
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions common/archiver/historyIterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (i *historyIterator) readHistoryBatches(ctx context.Context, firstEventID i
newIterState := historyIteratorState{}
for size < targetSize {
currHistoryBatches, err := i.readHistory(ctx, firstEventID)
if _, ok := err.(*types.EntityNotExistsError); ok && firstEventID != common.FirstEventID {
if errors.As(err, new(*types.EntityNotExistsError)) && firstEventID != common.FirstEventID {
newIterState.FinishedIteration = true
return historyBatches, newIterState, nil
}
Expand Down Expand Up @@ -217,7 +217,7 @@ func (i *historyIterator) readHistoryBatches(ctx context.Context, firstEventID i
// If you are here, it means the target size is met after adding the last batch of read history.
// We need to check if there's more history batches.
_, err := i.readHistory(ctx, firstEventID)
if _, ok := err.(*types.EntityNotExistsError); ok && firstEventID != common.FirstEventID {
if errors.As(err, new(*types.EntityNotExistsError)) && firstEventID != common.FirstEventID {
newIterState.FinishedIteration = true
return historyBatches, newIterState, nil
}
Expand Down
30 changes: 18 additions & 12 deletions common/archiver/s3store/historyArchiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ func (h *historyArchiver) Get(
if isRetryableError(err) {
return nil, &types.InternalServiceError{Message: err.Error()}
}
switch err.(type) {
case *types.BadRequestError, *types.InternalServiceError, *types.EntityNotExistsError:
switch {
case errors.As(err, new(*types.BadRequestError)), errors.As(err, new(*types.InternalServiceError)), errors.As(err, new(*types.EntityNotExistsError)):
return nil, err
default:
return nil, &types.InternalServiceError{Message: err.Error()}
Expand Down Expand Up @@ -389,7 +389,8 @@ func (h *historyArchiver) getHighestVersion(ctx context.Context, URI archiver.UR
Delimiter: aws.String("/"),
})
if err != nil {
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == s3.ErrCodeNoSuchBucket {
var aerr awserr.Error
if errors.As(err, &aerr) && aerr.Code() == s3.ErrCodeNoSuchBucket {
return nil, &types.BadRequestError{Message: errBucketNotExists.Error()}
}
return nil, err
Expand All @@ -416,22 +417,27 @@ func isRetryableError(err error) bool {
if err == nil {
return false
}
if aerr, ok := err.(awserr.Error); ok {
var aerr awserr.Error
if errors.As(err, &aerr) {
return isStatusCodeRetryable(aerr) || request.IsErrorRetryable(aerr) || request.IsErrorThrottle(aerr)
}
return false
}

func isStatusCodeRetryable(err error) bool {
if aerr, ok := err.(awserr.Error); ok {
if rerr, ok := err.(awserr.RequestFailure); ok {
if rerr.StatusCode() == 429 {
return true
}
if rerr.StatusCode() >= 500 && rerr.StatusCode() != 501 {
return true
}
var (
aerr awserr.Error
rerr awserr.RequestFailure
)
if errors.As(err, &rerr) {
if rerr.StatusCode() == 429 {
return true
}
if rerr.StatusCode() >= 500 && rerr.StatusCode() != 501 {
return true
}
}
if errors.As(err, &aerr) {
return isStatusCodeRetryable(aerr.OrigErr())
}
return false
Expand Down
11 changes: 7 additions & 4 deletions common/archiver/s3store/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"strings"
Expand Down Expand Up @@ -127,8 +128,8 @@ func keyExists(ctx context.Context, s3cli s3iface.S3API, URI archiver.URI, key s
}

func isNotFoundError(err error) bool {
aerr, ok := err.(awserr.Error)
return ok && (aerr.Code() == "NotFound")
var aerr awserr.Error
return errors.As(err, &aerr) && (aerr.Code() == "NotFound")
}

// Key construction
Expand Down Expand Up @@ -191,7 +192,8 @@ func upload(ctx context.Context, s3cli s3iface.S3API, URI archiver.URI, key stri
Body: bytes.NewReader(data),
})
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
var aerr awserr.Error
if errors.As(err, &aerr) {
if aerr.Code() == s3.ErrCodeNoSuchBucket {
return &types.BadRequestError{Message: errBucketNotExists.Error()}
}
Expand All @@ -210,7 +212,8 @@ func download(ctx context.Context, s3cli s3iface.S3API, URI archiver.URI, key st
})

if err != nil {
if aerr, ok := err.(awserr.Error); ok {
var aerr awserr.Error
if errors.As(err, &aerr) {
if aerr.Code() == s3.ErrCodeNoSuchBucket {
return nil, &types.BadRequestError{Message: errBucketNotExists.Error()}
}
Expand Down
4 changes: 2 additions & 2 deletions common/backoff/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package backoff

import (
"context"
"errors"
"sync"
"time"

Expand Down Expand Up @@ -117,8 +118,7 @@ func NewThrottleRetry(opts ...ThrottleRetryOption) *ThrottleRetry {
},
throttlePolicy: throttlePolicy,
isThrottle: func(err error) bool {
_, ok := err.(*types.ServiceBusyError)
return ok
return errors.As(err, new(*types.ServiceBusyError))
},
clock: SystemClock,
}
Expand Down
Loading
Loading