Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: [service-bus] Add in abortSignal support in createBatch, subscribe and receiveMessages #8994

Closed
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
032c4b7
Splitting out some abortSignal helpers
richardpark-msft May 27, 2020
6fcf8f2
- Some renames
richardpark-msft May 27, 2020
620dba2
- Some more renames
richardpark-msft May 27, 2020
d261f9f
Add in abortSignal support to createBatch, receiveBatch and subscribe().
richardpark-msft May 11, 2020
12516a1
- Add in early exit for openLink
richardpark-msft May 21, 2020
8fe802b
Properly clean up the abort signal regardless of the resolution of th…
richardpark-msft May 21, 2020
7ac95d7
I've added a test that uses an EventEmitter. Adding in "events" so we…
richardpark-msft May 21, 2020
dfc5ded
Just use log.error() for now and remove the need to pass an entity sp…
richardpark-msft May 27, 2020
c3eac87
Update comment about future code movement.
richardpark-msft May 27, 2020
a3b54ac
Update for a build error
richardpark-msft May 27, 2020
1905b3a
ensureTokenRenewal didn't need to be async (or return a Promise)
richardpark-msft May 28, 2020
252621b
- Removing TODO comments have been addressed or are no longer valid.
richardpark-msft May 28, 2020
05e8ef8
abortSignal can't be null since we did a check above.
richardpark-msft May 28, 2020
e4d2c4b
Removing an unneeded clearing of the abortSignal (the .catch will tak…
richardpark-msft May 28, 2020
f07dce4
Remove unneeded ? check.
richardpark-msft May 28, 2020
7dd35a9
Remove .only
richardpark-msft May 28, 2020
50982cf
Merge branch 'richardpark-sb-track2-a-aborthelpers' into richardpark-…
richardpark-msft May 28, 2020
df16247
Change over to return an empty function instead of undefined.
richardpark-msft May 28, 2020
a9e8bdd
Merge branch 'richardpark-sb-track2-a-aborthelpers' into richardpark-…
richardpark-msft May 28, 2020
f9d9a28
Rename for idiomacy.
richardpark-msft May 28, 2020
86c63cb
Small rename (and to or)
richardpark-msft May 28, 2020
1509c05
Adding in abortError so a user can resolve() with it if they choose to.
richardpark-msft May 28, 2020
988f4b6
Merge branch 'richardpark-sb-track2-a-aborthelpers' into richardpark-…
richardpark-msft May 28, 2020
3901fcd
Another nice optimization suggested by Ramya. Make ourselves `reject`…
richardpark-msft May 28, 2020
9785864
Merge branch 'richardpark-sb-track2-a-aborthelpers' into richardpark-…
richardpark-msft May 28, 2020
1f6c689
Make the abort message non-optional
richardpark-msft May 29, 2020
c917759
Just use AbortError for timeout - apparently we do this in core-http …
richardpark-msft May 29, 2020
2284cd6
Swapping required and non-required args.
richardpark-msft May 29, 2020
62b234b
Change over to async and try/finally
richardpark-msft May 29, 2020
4871157
Use OperationTimeoutError
richardpark-msft May 29, 2020
d1a246b
Update doc string
richardpark-msft May 29, 2020
c785c58
Merge branch 'richardpark-sb-track2-a-aborthelpers' into richardpark-…
richardpark-msft May 29, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/servicebus/service-bus/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
"eslint-plugin-no-only-tests": "^2.3.0",
"eslint-plugin-promise": "^4.1.1",
"esm": "^3.2.18",
"events": "^3.0.0",
"glob": "^7.1.2",
"https-proxy-agent": "^3.0.1",
"karma": "^4.0.1",
Expand Down
58 changes: 49 additions & 9 deletions sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import {
} from "./messageReceiver";
import { ClientEntityContext } from "../clientEntityContext";
import { throwErrorIfConnectionClosed } from "../util/errors";
import { checkAndRegisterWithAbortSignal } from "../util/utils";
import { AbortSignalLike } from "@azure/abort-controller";
import { openLink } from "../shared/openLink";

/**
* Describes the batching receiver where the user can receive a specified number of messages for
Expand Down Expand Up @@ -44,8 +47,8 @@ export class BatchingReceiver extends MessageReceiver {
* @param {ClientEntityContext} context The client entity context.
* @param {ReceiveOptions} [options] Options for how you'd like to connect.
*/
constructor(context: ClientEntityContext, options?: ReceiveOptions) {
super(context, ReceiverType.batching, options);
constructor(context: ClientEntityContext, openLinkFn: typeof openLink, options?: ReceiveOptions) {
super(context, ReceiverType.batching, openLinkFn, options);
this.newMessageWaitTimeoutInMs = 1000;
}

Expand Down Expand Up @@ -84,7 +87,8 @@ export class BatchingReceiver extends MessageReceiver {
*/
async receive(
maxMessageCount: number,
maxWaitTimeInMs?: number
maxWaitTimeInMs?: number,
abortSignal?: AbortSignalLike
): Promise<ServiceBusMessageImpl[]> {
throwErrorIfConnectionClosed(this._context.namespace);

Expand All @@ -95,7 +99,7 @@ export class BatchingReceiver extends MessageReceiver {
this.isReceivingMessages = true;

try {
return await this._receiveImpl(maxMessageCount, maxWaitTimeInMs);
return await this._receiveImpl(maxMessageCount, maxWaitTimeInMs, abortSignal);
} catch (error) {
log.error(
"[%s] Receiver '%s': Rejecting receiveMessages() with error %O: ",
Expand All @@ -112,12 +116,16 @@ export class BatchingReceiver extends MessageReceiver {

private _receiveImpl(
maxMessageCount: number,
maxWaitTimeInMs: number
maxWaitTimeInMs: number,
abortSignal?: AbortSignalLike
): Promise<ServiceBusMessageImpl[]> {
const brokeredMessages: ServiceBusMessageImpl[] = [];

this.isReceivingMessages = true;
return new Promise<ServiceBusMessageImpl[]>((resolve, reject) => {

let cleanupAbortSignal: (() => void) | undefined;

const receivePromise = new Promise<ServiceBusMessageImpl[]>((resolve, reject) => {
let totalWaitTimer: NodeJS.Timer | undefined;

const onSessionError: OnAmqpEvent = (context: EventContext) => {
Expand Down Expand Up @@ -179,11 +187,19 @@ export class BatchingReceiver extends MessageReceiver {
reject(translate(error));
};

let finalActionCalled = false;

// Final action to be performed after
// - maxMessageCount is reached or
// - maxWaitTime is passed or
// - newMessageWaitTimeoutInSeconds is passed since the last message was received
const finalAction = (): void => {
if (finalActionCalled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we used to have something like this before...

return;
}

finalActionCalled = true;

if (this._newMessageReceivedTimer) {
clearTimeout(this._newMessageReceivedTimer);
}
Expand Down Expand Up @@ -226,6 +242,12 @@ export class BatchingReceiver extends MessageReceiver {
}
};

cleanupAbortSignal = checkAndRegisterWithAbortSignal(
() => finalAction(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we calling finalAction() when the abort signal is fired instead of throwing the AbortError?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was keeping with the sentiment that we've got in the connectionErrorHandler and returning the messages we've got so far, rather than just error out. In receiveAndDelete mode this is particularly important. Less so in peekLock.

TBH, it's inconsistent but it seems like it's favorable in this spot.

@bterlson - curious what your arch take on this is. Would it be better to be consistent and throw an AbortError here or would it be okay to treat abort as a soft cancel of the current operation when you have partial results?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an interesting point...I could see abort meaning "I don't care anymore, just get me out of here!" or "Just give me what you have now!". Since it could mean potentially losing messages if in receiveAndDelete mode it seems safer to return those messages if you've got them, but also curious to hear @bterlson 's opinion.

"Receive has been cancelled by user",
abortSignal
);

// Action to be performed on the "message" event.
const onReceiveMessage: OnAmqpEventAsPromise = async (context: EventContext) => {
this.resetTimerOnNewMessageReceived();
Expand Down Expand Up @@ -462,7 +484,7 @@ export class BatchingReceiver extends MessageReceiver {
onClose: onReceiveClose,
onSessionClose: onSessionClose
});
this._init(rcvrOptions)
this._init({ ...rcvrOptions, abortSignal })
.then(() => {
if (!this._receiver) {
// there's a really small window here where the receiver can be closed
Expand All @@ -483,6 +505,20 @@ export class BatchingReceiver extends MessageReceiver {
this._receiver!.session.on(SessionEvents.sessionError, onSessionError);
}
});

return receivePromise
.then((messages) => {
if (cleanupAbortSignal) {
cleanupAbortSignal();
}
return messages;
})
.catch((err) => {
if (cleanupAbortSignal) {
cleanupAbortSignal();
}
throw err;
});
}

/**
Expand All @@ -492,9 +528,13 @@ export class BatchingReceiver extends MessageReceiver {
* @param {ClientEntityContext} context The connection context.
* @param {ReceiveOptions} [options] Receive options.
*/
static create(context: ClientEntityContext, options?: ReceiveOptions): BatchingReceiver {
static create(
context: ClientEntityContext,
openLinkFn: typeof openLink,
options?: ReceiveOptions
): BatchingReceiver {
throwErrorIfConnectionClosed(context.namespace);
const bReceiver = new BatchingReceiver(context, options);
const bReceiver = new BatchingReceiver(context, openLinkFn, options);
context.batchingReceiver = bReceiver;
return bReceiver;
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/src/core/linkEntity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ export class LinkEntity {
* Ensures that the token is renewed within the predefined renewal margin.
* @returns {void}
*/
protected async _ensureTokenRenewal(): Promise<void> {
protected _ensureTokenRenewal(): void {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are few places from where this is called. All those places need to be updated to remove the await

if (!this._tokenTimeout) {
return;
}
Expand Down
112 changes: 38 additions & 74 deletions sdk/servicebus/service-bus/src/core/messageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import {
EventContext,
ReceiverOptions,
AmqpError,
isAmqpError
isAmqpError,
generate_uuid
} from "rhea-promise";
import * as log from "../log";
import { LinkEntity } from "./linkEntity";
Expand All @@ -27,6 +28,8 @@ import { ServiceBusMessageImpl, DispositionType, ReceiveMode } from "../serviceB
import { getUniqueName, calculateRenewAfterDuration } from "../util/utils";
import { MessageHandlerOptions } from "../models";
import { DispositionStatusOptions } from "./managementClient";
import { OperationOptions } from "../modelsToBeSharedWithEventHubs";
import { openLink } from "../shared/openLink";

/**
* @internal
Expand Down Expand Up @@ -208,7 +211,7 @@ export class MessageReceiver extends LinkEntity {
/**
* @property {boolean} wasCloseInitiated Denotes if receiver was explicitly closed by user.
*/
protected wasCloseInitiated?: boolean;
protected wasCloseInitiated: boolean;
/**
* @property {Map<string, Function>} _messageRenewLockTimers Maintains a map of messages for which
* the lock is automatically renewed.
Expand Down Expand Up @@ -244,7 +247,14 @@ export class MessageReceiver extends LinkEntity {
*/
private _isDetaching: boolean = false;

constructor(context: ClientEntityContext, receiverType: ReceiverType, options?: ReceiveOptions) {
private readonly _openLock: string = `receiver-${generate_uuid()}`;

constructor(
context: ClientEntityContext,
receiverType: ReceiverType,
private _openLinkFn: typeof openLink,
options?: ReceiveOptions
) {
super(context.entityPath, context, {
address: context.entityPath,
audience: `${context.namespace.config.endpoint}${context.entityPath}`
Expand Down Expand Up @@ -757,89 +767,43 @@ export class MessageReceiver extends LinkEntity {
*
* @returns {Promise<void>} Promise<void>.
*/
protected async _init(options?: ReceiverOptions): Promise<void> {
const connectionId = this._context.namespace.connectionId;
try {
if (!this.isOpen() && !this.isConnecting) {
if (this.wasCloseInitiated) {
// in track 1 we'll maintain backwards compatible behavior for the codebase and
// just treat this as a no-op. There are cases, like in onDetached, where throwing
// an error here could have unintended consequences.
return;
}

log.error(
"[%s] The receiver '%s' with address '%s' is not open and is not currently " +
"establishing itself. Hence let's try to connect.",
connectionId,
this.name,
this.address
);

protected _init(
options?: ReceiverOptions & Pick<OperationOptions, "abortSignal">
): Promise<void> {
return this._openLinkFn({
abortSignal: options?.abortSignal,
isConnecting: () => this.isConnecting,
setIsConnecting: (value: boolean) => (this.isConnecting = value),
getCloseInitiated: () => this.wasCloseInitiated,
create: async () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slight readability improvement:

Suggested change
create: async () => {
create: async createReceiver() => {

if (options && options.name) {
this.name = options.name;
}

this.isConnecting = true;
await this._negotiateClaim();
if (!options) {
options = this._createReceiverOptions();
}
log.error(
"[%s] Trying to create receiver '%s' with options %O",
connectionId,
this.name,
options
);

this._receiver = await this._context.namespace.connection.createReceiver(options);
this.isConnecting = false;
log.error(
"[%s] Receiver '%s' with address '%s' has established itself.",
connectionId,
this.name,
this.address
);
log[this.receiverType](
"Promise to create the receiver resolved. " + "Created receiver with name: ",
this.name
);
const receiver = await this._context.namespace.connection.createReceiver(options);

log[this.receiverType](
"[%s] Receiver '%s' created with receiver options: %O",
connectionId,
this.name,
`[${this._context.namespace.connectionId}] The receiver '${this.name}' with address '${this.address} with options %O`,
options
);
// It is possible for someone to close the receiver and then start it again.
// Thus make sure that the receiver is present in the client cache.
if (this.receiverType === ReceiverType.streaming && !this._context.streamingReceiver) {
this._context.streamingReceiver = this as any;
} else if (this.receiverType === ReceiverType.batching && !this._context.batchingReceiver) {
this._context.batchingReceiver = this as any;
}
await this._ensureTokenRenewal();
} else {
log.error(
"[%s] The receiver '%s' with address '%s' is open -> %s and is connecting " +
"-> %s. Hence not reconnecting.",
connectionId,
this.name,
this.address,
this.isOpen(),
this.isConnecting
);

return receiver;
},
ensureTokenRenewal: () => this._ensureTokenRenewal(),
isOpen: () => this.isOpen(),
logPrefix: `[${this._context.namespace.connectionId}] The receiver '${this.name}' with address '${this.address}'`,
negotiateClaim: () => this._negotiateClaim(),
openLock: this._openLock
}).then((receiver) => {
if (receiver != null) {
this._receiver = receiver;
}
} catch (err) {
this.isConnecting = false;
err = translate(err);
log.error(
"[%s] An error occured while creating the receiver '%s': %O",
this._context.namespace.connectionId,
this.name,
err
);
throw err;
}
return;
});
}

protected _deleteFromCache(): void {
Expand Down
Loading