-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: [service-bus] Add in abortSignal support in createBatch, subscribe and receiveMessages #8994
Changes from 25 commits
032c4b7
6fcf8f2
620dba2
d261f9f
12516a1
8fe802b
7ac95d7
dfc5ded
c3eac87
a3b54ac
1905b3a
252621b
05e8ef8
e4d2c4b
f07dce4
7dd35a9
50982cf
df16247
a9e8bdd
f9d9a28
86c63cb
1509c05
988f4b6
3901fcd
9785864
1f6c689
c917759
2284cd6
62b234b
4871157
d1a246b
c785c58
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,9 @@ import { | |
} from "./messageReceiver"; | ||
import { ClientEntityContext } from "../clientEntityContext"; | ||
import { throwErrorIfConnectionClosed } from "../util/errors"; | ||
import { checkAndRegisterWithAbortSignal } from "../util/utils"; | ||
import { AbortSignalLike } from "@azure/abort-controller"; | ||
import { openLink } from "../shared/openLink"; | ||
|
||
/** | ||
* Describes the batching receiver where the user can receive a specified number of messages for | ||
|
@@ -44,8 +47,8 @@ export class BatchingReceiver extends MessageReceiver { | |
* @param {ClientEntityContext} context The client entity context. | ||
* @param {ReceiveOptions} [options] Options for how you'd like to connect. | ||
*/ | ||
constructor(context: ClientEntityContext, options?: ReceiveOptions) { | ||
super(context, ReceiverType.batching, options); | ||
constructor(context: ClientEntityContext, openLinkFn: typeof openLink, options?: ReceiveOptions) { | ||
super(context, ReceiverType.batching, openLinkFn, options); | ||
this.newMessageWaitTimeoutInMs = 1000; | ||
} | ||
|
||
|
@@ -84,7 +87,8 @@ export class BatchingReceiver extends MessageReceiver { | |
*/ | ||
async receive( | ||
maxMessageCount: number, | ||
maxWaitTimeInMs?: number | ||
maxWaitTimeInMs?: number, | ||
abortSignal?: AbortSignalLike | ||
): Promise<ServiceBusMessageImpl[]> { | ||
throwErrorIfConnectionClosed(this._context.namespace); | ||
|
||
|
@@ -95,7 +99,7 @@ export class BatchingReceiver extends MessageReceiver { | |
this.isReceivingMessages = true; | ||
|
||
try { | ||
return await this._receiveImpl(maxMessageCount, maxWaitTimeInMs); | ||
return await this._receiveImpl(maxMessageCount, maxWaitTimeInMs, abortSignal); | ||
} catch (error) { | ||
log.error( | ||
"[%s] Receiver '%s': Rejecting receiveMessages() with error %O: ", | ||
|
@@ -112,12 +116,16 @@ export class BatchingReceiver extends MessageReceiver { | |
|
||
private _receiveImpl( | ||
maxMessageCount: number, | ||
maxWaitTimeInMs: number | ||
maxWaitTimeInMs: number, | ||
abortSignal?: AbortSignalLike | ||
): Promise<ServiceBusMessageImpl[]> { | ||
const brokeredMessages: ServiceBusMessageImpl[] = []; | ||
|
||
this.isReceivingMessages = true; | ||
return new Promise<ServiceBusMessageImpl[]>((resolve, reject) => { | ||
|
||
let cleanupAbortSignal: (() => void) | undefined; | ||
|
||
const receivePromise = new Promise<ServiceBusMessageImpl[]>((resolve, reject) => { | ||
let totalWaitTimer: NodeJS.Timer | undefined; | ||
|
||
const onSessionError: OnAmqpEvent = (context: EventContext) => { | ||
|
@@ -179,11 +187,19 @@ export class BatchingReceiver extends MessageReceiver { | |
reject(translate(error)); | ||
}; | ||
|
||
let finalActionCalled = false; | ||
|
||
// Final action to be performed after | ||
// - maxMessageCount is reached or | ||
// - maxWaitTime is passed or | ||
// - newMessageWaitTimeoutInSeconds is passed since the last message was received | ||
const finalAction = (): void => { | ||
if (finalActionCalled) { | ||
return; | ||
} | ||
|
||
finalActionCalled = true; | ||
|
||
if (this._newMessageReceivedTimer) { | ||
clearTimeout(this._newMessageReceivedTimer); | ||
} | ||
|
@@ -226,6 +242,12 @@ export class BatchingReceiver extends MessageReceiver { | |
} | ||
}; | ||
|
||
cleanupAbortSignal = checkAndRegisterWithAbortSignal( | ||
() => finalAction(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was keeping with the sentiment that we've got in the connectionErrorHandler and returning the messages we've got so far, rather than just error out. In receiveAndDelete mode this is particularly important. Less so in peekLock. TBH, it's inconsistent but it seems like it's favorable in this spot. @bterlson - curious what your arch take on this is. Would it be better to be consistent and throw an AbortError here or would it be okay to treat abort as a soft cancel of the current operation when you have partial results? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's an interesting point...I could see |
||
"Receive has been cancelled by user", | ||
abortSignal | ||
); | ||
|
||
// Action to be performed on the "message" event. | ||
const onReceiveMessage: OnAmqpEventAsPromise = async (context: EventContext) => { | ||
this.resetTimerOnNewMessageReceived(); | ||
|
@@ -462,7 +484,7 @@ export class BatchingReceiver extends MessageReceiver { | |
onClose: onReceiveClose, | ||
onSessionClose: onSessionClose | ||
}); | ||
this._init(rcvrOptions) | ||
this._init({ ...rcvrOptions, abortSignal }) | ||
.then(() => { | ||
if (!this._receiver) { | ||
// there's a really small window here where the receiver can be closed | ||
|
@@ -483,6 +505,20 @@ export class BatchingReceiver extends MessageReceiver { | |
this._receiver!.session.on(SessionEvents.sessionError, onSessionError); | ||
} | ||
}); | ||
|
||
return receivePromise | ||
.then((messages) => { | ||
if (cleanupAbortSignal) { | ||
cleanupAbortSignal(); | ||
} | ||
return messages; | ||
}) | ||
.catch((err) => { | ||
if (cleanupAbortSignal) { | ||
cleanupAbortSignal(); | ||
} | ||
throw err; | ||
}); | ||
} | ||
|
||
/** | ||
|
@@ -492,9 +528,13 @@ export class BatchingReceiver extends MessageReceiver { | |
* @param {ClientEntityContext} context The connection context. | ||
* @param {ReceiveOptions} [options] Receive options. | ||
*/ | ||
static create(context: ClientEntityContext, options?: ReceiveOptions): BatchingReceiver { | ||
static create( | ||
context: ClientEntityContext, | ||
openLinkFn: typeof openLink, | ||
options?: ReceiveOptions | ||
): BatchingReceiver { | ||
throwErrorIfConnectionClosed(context.namespace); | ||
const bReceiver = new BatchingReceiver(context, options); | ||
const bReceiver = new BatchingReceiver(context, openLinkFn, options); | ||
context.batchingReceiver = bReceiver; | ||
return bReceiver; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -184,7 +184,7 @@ export class LinkEntity { | |
* Ensures that the token is renewed within the predefined renewal margin. | ||
* @returns {void} | ||
*/ | ||
protected async _ensureTokenRenewal(): Promise<void> { | ||
protected _ensureTokenRenewal(): void { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are few places from where this is called. All those places need to be updated to remove the |
||
if (!this._tokenTimeout) { | ||
return; | ||
} | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -18,7 +18,8 @@ import { | |||||
EventContext, | ||||||
ReceiverOptions, | ||||||
AmqpError, | ||||||
isAmqpError | ||||||
isAmqpError, | ||||||
generate_uuid | ||||||
} from "rhea-promise"; | ||||||
import * as log from "../log"; | ||||||
import { LinkEntity } from "./linkEntity"; | ||||||
|
@@ -27,6 +28,8 @@ import { ServiceBusMessageImpl, DispositionType, ReceiveMode } from "../serviceB | |||||
import { getUniqueName, calculateRenewAfterDuration } from "../util/utils"; | ||||||
import { MessageHandlerOptions } from "../models"; | ||||||
import { DispositionStatusOptions } from "./managementClient"; | ||||||
import { OperationOptions } from "../modelsToBeSharedWithEventHubs"; | ||||||
import { openLink } from "../shared/openLink"; | ||||||
|
||||||
/** | ||||||
* @internal | ||||||
|
@@ -208,7 +211,7 @@ export class MessageReceiver extends LinkEntity { | |||||
/** | ||||||
* @property {boolean} wasCloseInitiated Denotes if receiver was explicitly closed by user. | ||||||
*/ | ||||||
protected wasCloseInitiated?: boolean; | ||||||
protected wasCloseInitiated: boolean; | ||||||
/** | ||||||
* @property {Map<string, Function>} _messageRenewLockTimers Maintains a map of messages for which | ||||||
* the lock is automatically renewed. | ||||||
|
@@ -244,7 +247,14 @@ export class MessageReceiver extends LinkEntity { | |||||
*/ | ||||||
private _isDetaching: boolean = false; | ||||||
|
||||||
constructor(context: ClientEntityContext, receiverType: ReceiverType, options?: ReceiveOptions) { | ||||||
private readonly _openLock: string = `receiver-${generate_uuid()}`; | ||||||
|
||||||
constructor( | ||||||
context: ClientEntityContext, | ||||||
receiverType: ReceiverType, | ||||||
private _openLinkFn: typeof openLink, | ||||||
options?: ReceiveOptions | ||||||
) { | ||||||
super(context.entityPath, context, { | ||||||
address: context.entityPath, | ||||||
audience: `${context.namespace.config.endpoint}${context.entityPath}` | ||||||
|
@@ -757,89 +767,43 @@ export class MessageReceiver extends LinkEntity { | |||||
* | ||||||
* @returns {Promise<void>} Promise<void>. | ||||||
*/ | ||||||
protected async _init(options?: ReceiverOptions): Promise<void> { | ||||||
const connectionId = this._context.namespace.connectionId; | ||||||
try { | ||||||
if (!this.isOpen() && !this.isConnecting) { | ||||||
if (this.wasCloseInitiated) { | ||||||
// in track 1 we'll maintain backwards compatible behavior for the codebase and | ||||||
// just treat this as a no-op. There are cases, like in onDetached, where throwing | ||||||
// an error here could have unintended consequences. | ||||||
return; | ||||||
} | ||||||
|
||||||
log.error( | ||||||
"[%s] The receiver '%s' with address '%s' is not open and is not currently " + | ||||||
"establishing itself. Hence let's try to connect.", | ||||||
connectionId, | ||||||
this.name, | ||||||
this.address | ||||||
); | ||||||
|
||||||
protected _init( | ||||||
options?: ReceiverOptions & Pick<OperationOptions, "abortSignal"> | ||||||
): Promise<void> { | ||||||
return this._openLinkFn({ | ||||||
abortSignal: options?.abortSignal, | ||||||
isConnecting: () => this.isConnecting, | ||||||
setIsConnecting: (value: boolean) => (this.isConnecting = value), | ||||||
getCloseInitiated: () => this.wasCloseInitiated, | ||||||
create: async () => { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Slight readability improvement:
Suggested change
|
||||||
if (options && options.name) { | ||||||
this.name = options.name; | ||||||
} | ||||||
|
||||||
this.isConnecting = true; | ||||||
await this._negotiateClaim(); | ||||||
if (!options) { | ||||||
options = this._createReceiverOptions(); | ||||||
} | ||||||
log.error( | ||||||
"[%s] Trying to create receiver '%s' with options %O", | ||||||
connectionId, | ||||||
this.name, | ||||||
options | ||||||
); | ||||||
|
||||||
this._receiver = await this._context.namespace.connection.createReceiver(options); | ||||||
this.isConnecting = false; | ||||||
log.error( | ||||||
"[%s] Receiver '%s' with address '%s' has established itself.", | ||||||
connectionId, | ||||||
this.name, | ||||||
this.address | ||||||
); | ||||||
log[this.receiverType]( | ||||||
"Promise to create the receiver resolved. " + "Created receiver with name: ", | ||||||
this.name | ||||||
); | ||||||
const receiver = await this._context.namespace.connection.createReceiver(options); | ||||||
|
||||||
log[this.receiverType]( | ||||||
"[%s] Receiver '%s' created with receiver options: %O", | ||||||
connectionId, | ||||||
this.name, | ||||||
`[${this._context.namespace.connectionId}] The receiver '${this.name}' with address '${this.address} with options %O`, | ||||||
options | ||||||
); | ||||||
// It is possible for someone to close the receiver and then start it again. | ||||||
// Thus make sure that the receiver is present in the client cache. | ||||||
if (this.receiverType === ReceiverType.streaming && !this._context.streamingReceiver) { | ||||||
this._context.streamingReceiver = this as any; | ||||||
} else if (this.receiverType === ReceiverType.batching && !this._context.batchingReceiver) { | ||||||
this._context.batchingReceiver = this as any; | ||||||
} | ||||||
await this._ensureTokenRenewal(); | ||||||
} else { | ||||||
log.error( | ||||||
"[%s] The receiver '%s' with address '%s' is open -> %s and is connecting " + | ||||||
"-> %s. Hence not reconnecting.", | ||||||
connectionId, | ||||||
this.name, | ||||||
this.address, | ||||||
this.isOpen(), | ||||||
this.isConnecting | ||||||
); | ||||||
|
||||||
return receiver; | ||||||
}, | ||||||
ensureTokenRenewal: () => this._ensureTokenRenewal(), | ||||||
isOpen: () => this.isOpen(), | ||||||
logPrefix: `[${this._context.namespace.connectionId}] The receiver '${this.name}' with address '${this.address}'`, | ||||||
negotiateClaim: () => this._negotiateClaim(), | ||||||
openLock: this._openLock | ||||||
}).then((receiver) => { | ||||||
if (receiver != null) { | ||||||
this._receiver = receiver; | ||||||
} | ||||||
} catch (err) { | ||||||
this.isConnecting = false; | ||||||
err = translate(err); | ||||||
log.error( | ||||||
"[%s] An error occured while creating the receiver '%s': %O", | ||||||
this._context.namespace.connectionId, | ||||||
this.name, | ||||||
err | ||||||
); | ||||||
throw err; | ||||||
} | ||||||
return; | ||||||
}); | ||||||
} | ||||||
|
||||||
protected _deleteFromCache(): void { | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like we used to have something like this before...