-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Service Bus] Fix credit management based on the buffer capacity for the receivers #14060
Changes from 53 commits
0e7de1c
dd940e6
76eb13e
a7191fe
e0ca3f5
bf789d0
fd9d20a
ecca8b0
b19575a
9a26c5e
fddbd28
a209104
144b904
e7d7a82
67bd418
26a30e1
ceedf58
d1c16ad
8f25785
fe6c0c2
a4b867b
ad2ef9b
f36fc9e
420af6d
eebb4fe
68b1ee7
1ac91f3
aa6b359
478a7ae
629de9a
9f2fd9b
db35c49
6a5fd66
55138fa
418a4f8
38a5fff
1d0203b
8324a30
a38830c
8090238
814765e
cb3975c
88bb497
99841cc
b782d56
2c048b3
0fc6f9b
fddc414
00f6b4b
93f6a97
7293fa4
6188a4e
d7e914a
65630c8
42e1b8f
ffd2b79
eea0cff
8994c2d
4bbbfbf
b72b80d
d67a552
a4c93f5
ae3109d
4a1b205
b692c80
25d465b
906f9f7
7192d04
f385a24
a466bbd
5fb5562
b63d88a
9634b74
f30bba5
e53e93d
270573d
741ada6
a91db19
1298c16
1711ded
7820768
ca98f09
50f1217
6cbebff
9bf59cb
d0ba78e
dbd1097
c25ef93
11b8206
8bbc2d9
d366486
c6ca65b
34fdc32
1f75e73
4415097
9416b33
3f3def6
97b5c8b
e1bc672
f66789e
71780be
516d270
bb07f09
db05be0
3969549
69be170
1d6a152
e71dfdd
d0c9426
a2f75c5
0e9091c
7647e3d
27e0480
eebffb3
7cad41d
716f7b1
931b9fd
b551bf8
7c9136c
f1332bf
4d26b7f
fce9703
afc9760
2d5800d
02710aa
fd07e8c
5295fe0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -370,7 +370,11 @@ export type ServiceBusErrorCode = | |
/** | ||
* The user doesn't have access to the entity. | ||
*/ | ||
| "UnauthorizedAccess"; | ||
| "UnauthorizedAccess" | ||
/** | ||
* The number of unsettled messages that can be held is reached, please settle the received messages. | ||
*/ | ||
| "UnsettledMessagesLimitExceeded"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe our friends in .NET encounter the same situation. Can you check what kind of error they throw when a user has too many unsettled messages and so we cannot given them any more messages? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We changed a link setting to avoid this - https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpConnectionScope.cs#L705 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Woah.. @ramya-rao-a I believe the equivalent for us would be to set the capacity of circular buffer instead of hard-coded 2048 at rhea. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's also ask our Java and Python friends :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the link is saying that prior to this change 5000 was the max. We have a test where we have 6000 unsettled messages - https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs#L878 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks @JoshLove-msft, makes sense! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @yunhaoling mentioned the window in python is 64*1024, and he is observing some weirdness around receiving 50K messages, he's testing for more observations. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @HarshaNalluru Based on what other languages are doing, are we planning to update rhea to support removing the 2048 message limit? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @chradek yes, we'd want want it to be configurable, I'll make an issue soon. |
||
|
||
// @public | ||
export interface ServiceBusMessage { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,9 +2,13 @@ | |
// Licensed under the MIT license. | ||
|
||
import { Delivery, ReceiverOptions, Source } from "rhea-promise"; | ||
import { translateServiceBusError } from "../serviceBusError"; | ||
import { ServiceBusError, translateServiceBusError } from "../serviceBusError"; | ||
import { receiverLogger } from "../log"; | ||
import { ReceiveMode } from "../models"; | ||
import { Receiver } from "rhea-promise"; | ||
import { OnError } from "./messageReceiver"; | ||
import { ReceiverHelper } from "./receiverHelper"; | ||
import { delay } from "@azure/core-amqp"; | ||
|
||
/** | ||
* @internal | ||
|
@@ -103,3 +107,99 @@ export function createReceiverOptions( | |
|
||
return rcvrOptions; | ||
} | ||
|
||
export const UnsettledMessagesLimitExceededError = | ||
HarshaNalluru marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"Failed to fetch new messages as the limit for unsettled messages is reached. Please settle received messages using settlement methods(such as `completeMessage()`) on the receiver to receive the next message."; | ||
|
||
/** | ||
* Returns the number of empty slots in the Circular buffer of incoming deliveries | ||
* based on the capacity and size of the buffer. | ||
* | ||
* @internal | ||
*/ | ||
export function numberOfEmptyIncomingSlots( | ||
receiver: Pick<Receiver, "session"> | undefined | ||
): number { | ||
const incomingDeliveries = receiver?.session?.incoming?.deliveries; | ||
return incomingDeliveries ? incomingDeliveries.capacity - incomingDeliveries.size : 0; | ||
} | ||
|
||
/** | ||
* Provides helper methods to manage the credits on the link for the | ||
* streaming messages scenarios. | ||
* (Used by both sessions(MessageSession) and non-sessions(StreamingReceiver)) | ||
* | ||
* @internal | ||
*/ | ||
export class StreamingReceiverCreditManager { | ||
constructor( | ||
private _getCurrentReceiver: () => { receiver: Receiver | undefined; logPrefix: string }, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we allow receiver to be undefined? Is there a scenario where that is actually intended? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not actually. Instead of passing the StreamingReceiverHelper, I've now merged my StreamingReceiverCreditManager into the existing StreamingReceiverHelper. Let me know if that looks fine. |
||
private receiverHelper: ReceiverHelper, | ||
private receiveMode: ReceiveMode, | ||
private entityPath: string, | ||
private fullyQualifiedNamespace: string | ||
) {} | ||
|
||
addCreditsInit(maxConcurrentCalls: number) { | ||
const emptySlots = numberOfEmptyIncomingSlots(this._getCurrentReceiver().receiver); | ||
this.receiverHelper.addCredit( | ||
this.receiveMode === "peekLock" | ||
? Math.min(maxConcurrentCalls, emptySlots <= 1 ? 0 : emptySlots - 1) | ||
: maxConcurrentCalls | ||
); | ||
// TODO: Add log message | ||
} | ||
/** | ||
* Upon receiving a new message, this method can be called to add a credit to receive one more message. | ||
* If no empty slots, calls the onError callback with the `UnsettledMessagesLimitExceeded` error to | ||
* let users know about the excess unsettled messages. | ||
* | ||
* @internal | ||
*/ | ||
onReceive(notifyError: OnError | undefined) { | ||
const receiver = this._getCurrentReceiver().receiver; | ||
if (this.receiveMode === "receiveAndDelete" || numberOfEmptyIncomingSlots(receiver) > 1) { | ||
this.receiverHelper.addCredit(1); | ||
} else if (receiver) { | ||
notifyError?.({ | ||
error: new ServiceBusError( | ||
UnsettledMessagesLimitExceededError, | ||
"UnsettledMessagesLimitExceeded" | ||
), | ||
errorSource: "receive", | ||
entityPath: this.entityPath, | ||
fullyQualifiedNamespace: this.fullyQualifiedNamespace | ||
}); | ||
} else { | ||
// Link doesn't exist | ||
// SessionLockLost for sessions/onAMQPError for non-sessions will be notified in one of the listeners | ||
// So, not notifying here | ||
// TODO: Validate above | ||
} | ||
} | ||
|
||
/** | ||
* After processing the message, if no empty slots, | ||
* - keeps checking if the link has more empty slots in a loop with a delay of 1 sec, | ||
* - adds a credit if there are empty slots. | ||
* | ||
* @internal | ||
*/ | ||
async postProcessing() { | ||
HarshaNalluru marked this conversation as resolved.
Show resolved
Hide resolved
|
||
const receiver = this._getCurrentReceiver().receiver; | ||
if (this.receiveMode === "peekLock" && numberOfEmptyIncomingSlots(receiver) <= 1) { | ||
// Wait for the user to clear the deliveries before adding more credits | ||
while (receiver?.isOpen() && numberOfEmptyIncomingSlots(receiver) <= 1) { | ||
// TODO: check for canReceiveMessages too to exit from the loop | ||
await delay(1000); // TODO: Not have hard-coded 1000ms as delay - move it to constants maybe | ||
HarshaNalluru marked this conversation as resolved.
Show resolved
Hide resolved
HarshaNalluru marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// TODO: Add jitter | ||
} | ||
// TODO: Instead of adding one credit, make it maxConcurrentCalls | ||
// Example: | ||
// - Suppose maxConcurrentCalls=1000 and the user is not settling the messages | ||
// - After 2048 messages, 1000 while loops are running(for all the processMessage callbacks) so that they can do their part of adding a credit | ||
// - Instead, replenish with maxConcurrentCalls with a single while loop and end the rest of the processMessage callbacks | ||
this.receiverHelper.addCredit(1); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering why this changelog entry was removed. Was there a merge conflict?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh no, I'll revert. Thanks! 🙂