Skip to content

Commit

Permalink
refactor: use errors.Join to wrap multiple errors
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed Dec 18, 2024
1 parent 31ba011 commit 8148626
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 29 deletions.
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ require (
github.com/golang/protobuf v1.5.4
github.com/google/uuid v1.6.0
github.com/hamba/avro/v2 v2.26.0
github.com/hashicorp/go-multierror v1.1.1
github.com/klauspost/compress v1.17.9
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.19.0
Expand Down Expand Up @@ -61,7 +60,6 @@ require (
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
Expand Down
5 changes: 0 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,6 @@ github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
github.com/hamba/avro/v2 v2.26.0 h1:IaT5l6W3zh7K67sMrT2+RreJyDTllBGVJm4+Hedk9qE=
github.com/hamba/avro/v2 v2.26.0/go.mod h1:I8glyswHnpED3Nlx2ZdUe+4LJnCOOyiCzLMno9i/Uu0=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
Expand Down
8 changes: 0 additions & 8 deletions pulsar/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"

proto "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/hashicorp/go-multierror"
)

// Result used to represent pulsar processing is an alias of type int.
Expand Down Expand Up @@ -255,10 +254,3 @@ func getErrorFromServerError(serverError *proto.ServerError) error {
return newError(UnknownError, serverError.String())
}
}

// joinErrors can join multiple errors into one error, and the returned error can be tested by errors.Is()
// we use github.com/hashicorp/go-multierror instead of errors.Join() of Go 1.20 so that we can compile pulsar
// go client with go versions that newer than go 1.13
func joinErrors(errs ...error) error {
return multierror.Append(nil, errs...)
}
2 changes: 1 addition & 1 deletion pulsar/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func Test_joinErrors(t *testing.T) {
err1 := errors.New("err1")
err2 := errors.New("err2")
err3 := errors.New("err3")
err := joinErrors(ErrInvalidMessage, err1, err2)
err := errors.Join(ErrInvalidMessage, err1, err2)
assert.True(t, errors.Is(err, ErrInvalidMessage))
assert.True(t, errors.Is(err, err1))
assert.True(t, errors.Is(err, err2))
Expand Down
26 changes: 13 additions & 13 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,25 +514,25 @@ func (p *partitionProducer) reconnectToBroker(connectionClosed *connectionClosed
if strings.Contains(errMsg, errMsgTopicNotFound) {
// when topic is deleted, we should give up reconnection.
p.log.Warn("Topic not found, stop reconnecting, close the producer")
p.doClose(joinErrors(ErrTopicNotfound, err))
p.doClose(errors.Join(ErrTopicNotfound, err))
return struct{}{}, nil
}

if strings.Contains(errMsg, errMsgTopicTerminated) {
p.log.Warn("Topic was terminated, failing pending messages, stop reconnecting, close the producer")
p.doClose(joinErrors(ErrTopicTerminated, err))
p.doClose(errors.Join(ErrTopicTerminated, err))
return struct{}{}, nil
}

if strings.Contains(errMsg, errMsgProducerBlockedQuotaExceededException) {
p.log.Warn("Producer was blocked by quota exceed exception, failing pending messages, stop reconnecting")
p.failPendingMessages(joinErrors(ErrProducerBlockedQuotaExceeded, err))
p.failPendingMessages(errors.Join(ErrProducerBlockedQuotaExceeded, err))
return struct{}{}, nil
}

if strings.Contains(errMsg, errMsgProducerFenced) {
p.log.Warn("Producer was fenced, failing pending messages, stop reconnecting")
p.doClose(joinErrors(ErrProducerFenced, err))
p.doClose(errors.Join(ErrProducerFenced, err))
return struct{}{}, nil
}

Expand Down Expand Up @@ -1111,18 +1111,18 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,

func (p *partitionProducer) validateMsg(msg *ProducerMessage) error {
if msg == nil {
return joinErrors(ErrInvalidMessage, fmt.Errorf("message is nil"))
return errors.Join(ErrInvalidMessage, fmt.Errorf("message is nil"))
}

if msg.Value != nil && msg.Payload != nil {
return joinErrors(ErrInvalidMessage, fmt.Errorf("can not set Value and Payload both"))
return errors.Join(ErrInvalidMessage, fmt.Errorf("can not set Value and Payload both"))
}

if p.options.DisableMultiSchema {
if msg.Schema != nil && p.options.Schema != nil &&
msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() {
p.log.Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
return joinErrors(ErrSchema, fmt.Errorf("msg schema can not match with producer schema"))
return errors.Join(ErrSchema, fmt.Errorf("msg schema can not match with producer schema"))
}
}

Expand All @@ -1138,16 +1138,16 @@ func (p *partitionProducer) prepareTransaction(sr *sendRequest) error {
if txn.state.Load() != int32(TxnOpen) {
p.log.WithField("state", txn.state.Load()).Error("Failed to send message" +
" by a non-open transaction.")
return joinErrors(ErrTransaction,
return errors.Join(ErrTransaction,
fmt.Errorf("failed to send message by a non-open transaction"))
}

if err := txn.registerProducerTopic(p.topic); err != nil {
return joinErrors(ErrTransaction, err)
return errors.Join(ErrTransaction, err)
}

if err := txn.registerSendOrAckOp(); err != nil {
return joinErrors(ErrTransaction, err)
return errors.Join(ErrTransaction, err)
}

sr.transaction = txn
Expand All @@ -1173,7 +1173,7 @@ func (p *partitionProducer) updateSchema(sr *sendRequest) error {
if schemaVersion == nil {
schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo())
if err != nil {
return joinErrors(ErrSchema, fmt.Errorf("get schema version fail, err: %w", err))
return errors.Join(ErrSchema, fmt.Errorf("get schema version fail, err: %w", err))
}
p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
}
Expand All @@ -1190,15 +1190,15 @@ func (p *partitionProducer) updateUncompressedPayload(sr *sendRequest) error {
if sr.msg.Value != nil {
if sr.schema == nil {
p.log.Errorf("Schema encode message failed %s", sr.msg.Value)
return joinErrors(ErrSchema, fmt.Errorf("set schema value without setting schema"))
return errors.Join(ErrSchema, fmt.Errorf("set schema value without setting schema"))
}

// payload and schema are mutually exclusive
// try to get payload from schema value only if payload is not set
schemaPayload, err := sr.schema.Encode(sr.msg.Value)
if err != nil {
p.log.WithError(err).Errorf("Schema encode message failed %s", sr.msg.Value)
return joinErrors(ErrSchema, err)
return errors.Join(ErrSchema, err)
}

sr.uncompressedPayload = schemaPayload
Expand Down

0 comments on commit 8148626

Please sign in to comment.