Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[azservicebus] Fixing bug from #23893, where a message could be received in the mess… #23929

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/messaging/azservicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

### Bugs Fixed

- Receivers had a bug where a message could be received but not returned to the user. Callers would see that, occasionally, a message would not be returned from ReceiveMessages(), but would appear to have been received. Thanks to @patrickwhite256 for reporting this issue. (PR#23929)

### Other Changes

## 1.7.3 (2024-10-14)
Expand Down
8 changes: 4 additions & 4 deletions sdk/messaging/azservicebus/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.18
retract v1.1.2 // Breaks customers in situations where close is slow/infinite.

require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.16.0
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.0
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0
github.com/Azure/go-amqp v1.3.0
Expand Down Expand Up @@ -34,9 +34,9 @@ require (
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/crypto v0.32.0 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/text v0.21.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
16 changes: 8 additions & 8 deletions sdk/messaging/azservicebus/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.16.0 h1:JZg6HRh6W6U4OLl6lk7BZ7BLisIzM9dG1R50zUk9C/M=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.16.0/go.mod h1:YL1xnZ6QejvQHWJrX/AvhFl4WW4rqHVoKspWNVwFk0M=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0 h1:g0EZJwz7xkXQiZAI5xi9f3WWFYBlX1CPTrR+NDToRkQ=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0/go.mod h1:XCW7KnZet0Opnr7HccfUw1PLc4CjHqpcaxW8DHklNkQ=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.0 h1:B/dfvscEQtew9dVuoxqxrUKKv8Ih2f55PydknDamU+g=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.0/go.mod h1:fiPSssYvltE08HJchL04dOy+RD4hgrjph0cwGGMntdI=
github.com/Azure/azure-sdk-for-go/sdk/azidentity/cache v0.3.0 h1:+m0M/LFxN43KvULkDNfdXOgrjtg6UYJPFBJyuEcRCAw=
Expand Down Expand Up @@ -41,14 +41,14 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand All @@ -57,8 +57,8 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ func SendAndReceiveDrain(remainingArgs []string) {
// this is bad - it means we didn't get _any_ messages within an entire
// minute and might indicate that we're hitting the customer bug.

log.Printf("Exceeded the timeout, trying one more time real fast")
log.Printf("Exceeded the timeout, trying one more time real fast to see what the bug might be...")

// let's see if there is some other momentary issue happening here by doing a quick receive again.
ctx, cancel := context.WithTimeout(sc.Context, time.Minute)
defer cancel()
messages, err = receiver.ReceiveMessages(ctx, numToSend+100, nil)
sc.PanicOnError("Exceeded a minute while waiting for messages", err)
sc.Failf("Exceeded a minute while waiting for messages (got %d messages in second try). Error: %#v", len(messages), err)
}

log.Printf("Got %d messages, completing...", len(messages))
Expand Down
26 changes: 17 additions & 9 deletions sdk/messaging/azservicebus/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Receiver struct {
receiving bool
retryOptions RetryOptions
settler *messageSettler
defaultReleaserTimeout time.Duration // defaults to 1min, settable for unit tests.
}

// ReceiverOptions contains options for the `Client.NewReceiverForQueue` or `Client.NewReceiverForSubscription`
Expand Down Expand Up @@ -133,6 +134,7 @@ func newReceiver(args newReceiverArgs, options *ReceiverOptions) (*Receiver, err
lastPeekedSequenceNumber: 0,
maxAllowedCredits: defaultLinkRxBuffer,
retryOptions: args.retryOptions,
defaultReleaserTimeout: time.Minute,
}

receiver.cancelReleaser.Store(emptyCancelFn)
Expand Down Expand Up @@ -608,32 +610,38 @@ func (r *Receiver) newReleaserFunc(receiver amqpwrap.AMQPReceiver) func() {
}

ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
releaseLoopDone := make(chan struct{})
released := 0

// this func gets called when a new ReceiveMessages() starts
r.cancelReleaser.Store(func() string {
cancel()
<-done
<-releaseLoopDone
return receiver.LinkName()
})

return func() {
defer close(done)
defer close(releaseLoopDone)

for {
// we might not have all the messages we need here.
msg, err := receiver.Receive(ctx, nil)

if err == nil {
err = receiver.ReleaseMessage(ctx, msg)
}
releaseCtx, cancelRelease := context.WithTimeout(context.Background(), r.defaultReleaserTimeout)

if err == nil {
released++
// We don't use `ctx` here to avoid cancelling Release(), and leaving this message
// in limbo until it expires.
err = receiver.ReleaseMessage(releaseCtx, msg)
cancelRelease()

if err == nil {
released++
}
}

if internal.IsCancelError(err) {
// We check `ctx.Err()` here, instead of testing the returned err from .Receive(), because Receive()
// ignores cancellation if it has any messages in its prefetch queue.
if ctx.Err() != nil {
if released > 0 {
r.amqpLinks.Writef(exported.EventReceiver, "Message releaser pausing. Released %d messages", released)
}
Expand Down
128 changes: 128 additions & 0 deletions sdk/messaging/azservicebus/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"regexp"
"sort"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -1043,6 +1044,133 @@ func TestReceiveWithDifferentWaitTime(t *testing.T) {
require.Greater(t, bigger, base2)
}

func TestReceiveCancelReleaser(t *testing.T) {
getLogs := test.CaptureLogsForTest(false)

client, cleanup, queueName := setupLiveTest(t, &liveTestOptions{
QueueProperties: &admin.QueueProperties{
// use a long lock time to really make it clear when we've accidentally
// orphaned a message.
LockDuration: to.Ptr("PT5M"),
},
})
defer cleanup()

receiver, err := client.NewReceiverForQueue(queueName, nil)
require.NoError(t, err)

sender, err := client.NewSender(queueName, nil)
require.NoError(t, err)

padding := make([]byte, 1)

var batch *MessageBatch
const numSent = 2000

t.Logf("Sending messages")
SendLoop:
for i := 0; i < numSent; i++ {
if batch == nil {
tmpBatch, err := sender.NewMessageBatch(context.Background(), nil)
require.NoError(t, err)
batch = tmpBatch
}

err := batch.AddMessage(&Message{
MessageID: to.Ptr(fmt.Sprintf("%d", i)),
Body: padding}, nil)

if errors.Is(err, ErrMessageTooLarge) {
err := sender.SendMessageBatch(context.Background(), batch, nil)
require.NoError(t, err)
batch = nil
i--
continue SendLoop
}

if i == numSent-1 {
err := sender.SendMessageBatch(context.Background(), batch, nil)
require.NoError(t, err)
break SendLoop
}
}

t.Logf("Receiving small subset of messages")
messages, err := receiver.ReceiveMessages(context.Background(), numSent, &ReceiveMessagesOptions{
// Receive with a high credit count, but too little time to actually get them all
// This will force us into a situation where the AMQP receiver will have a lot of messages
// in its prefetch cache, giving us a high chance of triggering a previous bug where early
// cancellation could result in us losing messages.
TimeAfterFirstMessage: time.Nanosecond,
})
require.NoError(t, err)

ids := &sync.Map{}

for _, msg := range messages {
require.NoError(t, receiver.CompleteMessage(context.Background(), msg, nil))
_, exists := ids.LoadOrStore(msg.MessageID, true)
require.False(t, exists)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

expected := numSent - len(messages) // remove any messages we've already received. Usually it's just one but it's not
remaining := expected

t.Logf("Receiving remaining messages (%d)", remaining)

for remaining > 0 {
messages, err := receiver.ReceiveMessages(ctx, remaining, nil)
require.NoError(t, err)
require.NotEmpty(t, messages)

t.Logf("Received %d messages", len(messages))

wg := sync.WaitGroup{}

for _, msg := range messages {
msg := msg
wg.Add(1)

go func() {
defer wg.Done()
require.NoError(t, receiver.CompleteMessage(context.Background(), msg, nil))
_, exists := ids.LoadOrStore(msg.MessageID, true)
require.False(t, exists)
}()
}

wg.Wait()

remaining -= len(messages)
}

count := 0
ids.Range(func(_, _ any) bool {
count++
return true
})

require.Equal(t, numSent, count)

logs := getLogs()

found := 0

for _, log := range logs {
if strings.Contains(log, "Message releaser pausing. Released ") {
found++
}
}

// This is a bit of a non-deterministic bit so I'm not going to fail the overall test
if found == 0 {
t.Logf("Failed to find our 'messages released' log entry: %#v", logs)
}
}

type receivedMessageSlice []*ReceivedMessage

func (messages receivedMessageSlice) Len() int {
Expand Down
Loading
Loading