Skip to content

Commit

Permalink
[service-bus] Change MessageSession over to using the .NET receiveMes…
Browse files Browse the repository at this point in the history
…sages algorithm (Azure#10107)

[sessions] Changing our receiveMessages() algorithm to match .NET's.

The new version works like this:

receive(maxMessages, maxWaitTime)

Internally we then wait for:

  maxMessages to arrive
  maxWaitTime to expire
  or
  An internal 1 second timeout that will fire after the first message has been received or 
  the remaining time left from maxWaitTime (whichever one is smaller)

(also eliminated some dead code as a result of making SessionManager a sample rather
 than being part of the core library)

Complements Azure#9968 and is the final change for Azure#9718
  • Loading branch information
richardpark-msft authored Jul 17, 2020
1 parent 17ef66a commit 5d544e8
Show file tree
Hide file tree
Showing 7 changed files with 429 additions and 201 deletions.
1 change: 1 addition & 0 deletions sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
message, rather than how long to wait for an entire set of messages. This change allows for a faster return
of messages to your application.
[PR 9968](https://github.com/Azure/azure-sdk-for-js/pull/9968)
[PR 10107](https://github.com/Azure/azure-sdk-for-js/pull/10107)
- `userProperties` attribute under the `ServiceBusMessage`(and `ReceivedMessage`, `ReceivedMessageWithLock`) has been renamed to `properties`. Same change has been made to the `userProperties` attribute in the correlation-rule filter.
[PR 10003](https://github.com/Azure/azure-sdk-for-js/pull/10003)

Expand Down
47 changes: 29 additions & 18 deletions sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -479,28 +479,11 @@ export class BatchingReceiver extends MessageReceiver {
});
}

/**
* Gets a function that will calculate the correct amount of time to wait
* for the next message to arrive.
*
* @param maxWaitTimeInMs The maximum amount of time to wait for the first message.
* @param maxTimeAfterFirstMessageInMs The maximum time to wait after the first message.
*/
private _getRemainingWaitTimeInMsFn(
maxWaitTimeInMs: number,
maxTimeAfterFirstMessageInMs: number
): () => number {
const startTimeMs = Date.now();

return () => {
const remainingTimeMs = maxWaitTimeInMs - (Date.now() - startTimeMs);

if (remainingTimeMs < 0) {
return 0;
}

return Math.min(remainingTimeMs, maxTimeAfterFirstMessageInMs);
};
return getRemainingWaitTimeInMsFn(maxWaitTimeInMs, maxTimeAfterFirstMessageInMs);
}

private _getServiceBusMessage(context: EventContext): ServiceBusMessageImpl {
Expand All @@ -521,3 +504,31 @@ export class BatchingReceiver extends MessageReceiver {
return bReceiver;
}
}

/**
* Gets a function that returns the smaller of the two timeouts,
* taking into account elapsed time from when getRemainingWaitTimeInMsFn
* was called.
*
* @param maxWaitTimeInMs Maximum time to wait for the first message
* @param maxTimeAfterFirstMessageInMs Maximum time to wait after the first message before completing the receive.
*
* @internal
* @ignore
*/
export function getRemainingWaitTimeInMsFn(
maxWaitTimeInMs: number,
maxTimeAfterFirstMessageInMs: number
): () => number {
const startTimeMs = Date.now();

return () => {
const remainingTimeMs = maxWaitTimeInMs - (Date.now() - startTimeMs);

if (remainingTimeMs < 0) {
return 0;
}

return Math.min(remainingTimeMs, maxTimeAfterFirstMessageInMs);
};
}
9 changes: 7 additions & 2 deletions sdk/servicebus/service-bus/src/receivers/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ export class ReceiverImpl<ReceivedMessageT extends ReceivedMessage | ReceivedMes
const receivedMessages = await this._context.batchingReceiver.receive(
maxMessageCount,
options?.maxWaitTimeInMs ?? Constants.defaultOperationTimeoutInMs,
maxTimeAfterFirstMessageMs,
defaultMaxTimeAfterFirstMessageForBatchingMs,
options?.abortSignal
);
return (receivedMessages as unknown) as ReceivedMessageT[];
Expand Down Expand Up @@ -465,5 +465,10 @@ export class ReceiverImpl<ReceivedMessageT extends ReceivedMessage | ReceivedMes
/**
* The default time to wait for messages _after_ the first message
* has been received.
*
* This timeout only applies to receiveMessages()
*
* @internal
* @ignore
*/
const maxTimeAfterFirstMessageMs = 1000;
export const defaultMaxTimeAfterFirstMessageForBatchingMs = 1000;
5 changes: 3 additions & 2 deletions sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import * as log from "../log";
import { OnError, OnMessage } from "../core/messageReceiver";
import { assertValidMessageHandlers, getMessageIterator, wrapProcessErrorHandler } from "./shared";
import { convertToInternalReceiveMode } from "../constructorHelpers";
import { Receiver } from "./receiver";
import { Receiver, defaultMaxTimeAfterFirstMessageForBatchingMs } from "./receiver";
import Long from "long";
import { ReceivedMessageWithLock, ServiceBusMessageImpl } from "../serviceBusMessage";
import {
Expand Down Expand Up @@ -393,7 +393,8 @@ export class SessionReceiverImpl<ReceivedMessageT extends ReceivedMessage | Rece
const receiveBatchOperationPromise = async () => {
const receivedMessages = await this._messageSession!.receiveMessages(
maxMessageCount,
options?.maxWaitTimeInMs ?? Constants.defaultOperationTimeoutInMs
options?.maxWaitTimeInMs ?? Constants.defaultOperationTimeoutInMs,
defaultMaxTimeAfterFirstMessageForBatchingMs
);

return (receivedMessages as any) as ReceivedMessageT[];
Expand Down
Loading

0 comments on commit 5d544e8

Please sign in to comment.