-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: [service-bus] Add in abortSignal support in createBatch, subscribe and receiveMessages #8994
WIP: [service-bus] Add in abortSignal support in createBatch, subscribe and receiveMessages #8994
Conversation
@@ -173,6 +186,12 @@ export class BatchingReceiver extends MessageReceiver { | |||
} | |||
}); | |||
|
|||
cleanupAbortSignal = checkAndRegisterWithAbortSignal( | |||
() => finalAction(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we calling finalAction()
when the abort signal is fired instead of throwing the AbortError?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was keeping with the sentiment that we've got in the connectionErrorHandler and returning the messages we've got so far, rather than just error out. In receiveAndDelete mode this is particularly important. Less so in peekLock.
TBH, it's inconsistent but it seems like it's favorable in this spot.
@bterlson - curious what your arch take on this is. Would it be better to be consistent and throw an AbortError here or would it be okay to treat abort as a soft cancel of the current operation when you have partial results?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's an interesting point...I could see abort
meaning "I don't care anymore, just get me out of here!" or "Just give me what you have now!". Since it could mean potentially losing messages if in receiveAndDelete mode it seems safer to return those messages if you've got them, but also curious to hear @bterlson 's opinion.
|
||
if (cleanupAbortSignal) { | ||
cleanupAbortSignal(); | ||
cleanupAbortSignal = undefined; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having the clean up being called from finalAction()
ensures clean up on the happy path. What about cases when the promise returned from receive()
is rejected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great point, will push a fix, just waiting for the test run to complete.
I'm just going to chain two promises together, rather than trying to intercept each exit from this particular promise.
/azp run js - servicebus - tests |
Pull request contains merge conflicts. |
0b17a16
to
0869c5a
Compare
/azp run js - servicebus - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
/azp run js - servicebus - tests |
Pull request contains merge conflicts. |
cc6bab1
to
19ed1f1
Compare
/azp run js - servicebus - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @richardpark-msft,
I have reviewed and left my first round of comments, please take a look.
If it is possible, then I'd much prefer the work being done here to be done in phases in two separate PRs. Phase 1 would involve the utilities that you added for timeout and abort signal with them being used from link creation code path, Phase 2 would involve introducing openLink
to consolidate the opening of AMQP links across senders and receivers.
I havent looked at the streaming and batching receiver yet
@@ -15,6 +15,7 @@ import { CreateSessionReceiverOptions, CreateSenderOptions } from "./models"; | |||
import { Receiver, ReceiverImpl } from "./receivers/receiver"; | |||
import { SessionReceiver, SessionReceiverImpl } from "./receivers/sessionReceiver"; | |||
import { ReceivedMessageWithLock, ReceivedMessage } from "./serviceBusMessage"; | |||
import { openLink } from "./shared/openLink"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to import openLink
here and pass to the constructors of ReceiverImpl
and SenderImpl
? These constructors can import and use it directly right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They could - this is to make unit testing possible by always injecting openLink.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is injecting the only way to ensure unit testability? My concern is that this model is not intuitive and adds complexity that passing around a function brings.
If openLink
was meant to be a common shared code between MessageSender
and MessageReceiver
, then I would expect it to be part of LinkEntity
which is the base class for both. An independent helper method makes sense when it is being used from multiple unrelated places & is stateless. Given that this helper is only ever going to be used by classes that inherit from LinkEntity
, the base class is a much better fit
Seeing the unit tests, I mainly see the below categories
- Ensure abort signal is plumbed through to open()
- Ensure open() is not called when abort signal is already fired
If this common code was on LinkEntity
, then for the above unit tests, we can override the method for open(). I see that the plumbing needs to be tested from SenderImpl -> MessageSender -> LinkEntity. Would perhaps combining SenderImpl & MessageSender help?
export class SenderImpl extends LinkEntity implements Sender
?
Then you test code would be
class TestAbortSignalSender extends SenderImpl {
async openLink(..) {
if (options.abortSignal != null) {
throw new Error("abortSignal was properly passed to open()!");
} else {
throw new Error("No abortSignal was passed to open()!");
}
}
}
instead of
sender = new SenderImpl(clientEntityContext, {}, async (openArgs) => {
if (openArgs.abortSignal != null) {
throw new Error("abortSignal was properly passed to open()!");
} else {
throw new Error("No abortSignal was passed to open()!");
}
});
cc @chradek
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely not the only way. There are so many ways!
Inheritance for testing is also a valid approach and I don't think it's bad. In the code as it is I think inheritance is making it difficult to see where the lines are between what's mutable, and what's not.
Also, this is definitely not this code's final form but it is a step in the direction that I think helps with making the code easier to maintain. Some state is ultimately required (so we might want to consider how to do that) though but I didn't want to do too much in one PR.
export function openLink<T extends RheaLink>(args: OpenArgs<T>): Promise<T | undefined> { | ||
const checkAborted = (): void => { | ||
if (args.abortSignal?.aborted) { | ||
throw new AbortError("open() has been cancelled by user."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The user would have no idea what open()
is, so this error message would not make much sense to them
checkAborted(); | ||
|
||
if (args.isOpen() || args.isConnecting() || args.getCloseInitiated()) { | ||
return Promise.resolve(undefined); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If openLink
is declared with the async
keyword, then this can simply be return
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another benefit to making openLink
async!
|
||
try { | ||
if (args.isOpen()) { | ||
return Promise.resolve(undefined); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this callback passed to lock acquire is declared as async, this can simply be return
* we can stop passing in some of these functions. | ||
* | ||
* For instance | ||
* - negotiateClaim + ensureTokenRenewal can be lifted out of LinkEntity |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
out of LinkEntity
and into where?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally I was thinking just moving the methods into into openLink.ts since they're all pretty related to that activity.
However, I've been thinking about that some more. My main issue with them being here is that inheritance can make it difficult to see the proper separation between LinkEntity and its child classes. So when I was tracing through this it wasn't entirely obvious which parts were "internal" state to LinkEntity, etc...
Having been through it more I could see it staying but it'd be nice to make that first-time read a bit easier.
// these are more candidates to move out from LinkEntity. They don't really depend on anything special | ||
// in LinkEntity and are just clutter. | ||
negotiateClaim(): Promise<void>; | ||
ensureTokenRenewal(): Promise<void>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After looking into _ensureTokenRenewal
in LinkEntity
, its not clear to me why this would be an async function. All it does is set a timer to negotiate claim
// TODO: do we _need_ this lock at the moment? If we're not trying to get | ||
// open and close to be mutually exclusive? | ||
return defaultLock.acquire(args.openLock, async () => { | ||
checkAborted(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a lot of calls to checkAborted()
being done in here after acquiring the lock. I understand the need to use it so that we can exit as soon as possible instead of going through the different async operations, but we don't need it to throw the AbortError itself.
We can use checkAndRegisterWithAbortSignal()
before calling openLink()
. This way, in here, we only need to add early exits before every async operation. And then when these async operations can support abort signal in the future, these checks can go away.
- Removing unnecessary clearTimeout
- Adding in some commentary - Adding in a missing test - when the action throws an error.
Created PR#9138 for the abort helpers. |
- Fixing some broken test
…e promise. - Also, make sure that finalActionHandler doesn't get called more than once.
… can run the same tests in the browser.
6d68bce
to
1905b3a
Compare
- Fixing up the logging so we still dump the options for created senders and receivers.
…sb-track2-abortsignal-v2
…sb-track2-abortsignal-v2
…sb-track2-abortsignal-v2
…sb-track2-abortsignal-v2
@@ -184,7 +184,7 @@ export class LinkEntity { | |||
* Ensures that the token is renewed within the predefined renewal margin. | |||
* @returns {void} | |||
*/ | |||
protected async _ensureTokenRenewal(): Promise<void> { | |||
protected _ensureTokenRenewal(): void { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are few places from where this is called. All those places need to be updated to remove the await
Resolved all my comments on the |
…sb-track2-abortsignal-v2
try { | ||
await Promise.race([this.open(), initTimeoutPromise]); | ||
await waitForTimeoutOrAbortOrResolve({ | ||
actionFn: () => this.open({ abortSignal: options?.abortSignal }), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: You already got the abortSignal
so can do this:
actionFn: () => this.open({ abortSignal: options?.abortSignal }), | |
actionFn: () => this.open({ abortSignal }), |
import { AbortSignalLike } from "@azure/core-http"; | ||
import { AbortError } from "@azure/abort-controller"; | ||
|
||
export type RheaLink = AwaitableSender | RheaReceiver; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: My obligatory "Add @internal
and @ignores
to anything exported that shouldn't be public!"
export function openLink<T extends RheaLink>(args: OpenArgs<T>): Promise<T | undefined> { | ||
const checkAborted = (): void => { | ||
if (args.abortSignal?.aborted) { | ||
throw new AbortError("open() has been cancelled by user."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We just happened to talk about this in another PR! openLink
is not currently an async method, so I'd expect that if this function gets called outside of the promise that openLink
returns, it would return a Promise.reject(new AbortError(""))
.
This gets a bit complicated though since you re-use this function inside the promise that's returned, and I don't think (not certain) that returning a reject promise would trigger the catch
block that's used to close a link.
Ultimately it might just be easier to make openLink
an async function so you don't have to worry about this.
checkAborted(); | ||
|
||
if (args.isOpen() || args.isConnecting() || args.getCloseInitiated()) { | ||
return Promise.resolve(undefined); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another benefit to making openLink
async!
throwErrorIfConnectionClosed(this._context.namespace); | ||
this._onMessage = onMessage; | ||
this._onError = onError; | ||
|
||
this._cleanupAbortHandler = checkAndRegisterWithAbortSignal( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't allow calling receive multiple times do we? Is there any risk that an existing _cleanupAbortHandler
might get overwritten with a new one? (Should we be pushing the cleanup methods to a list and iterate through them when we call close?)
@@ -173,6 +186,12 @@ export class BatchingReceiver extends MessageReceiver { | |||
} | |||
}); | |||
|
|||
cleanupAbortSignal = checkAndRegisterWithAbortSignal( | |||
() => finalAction(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's an interesting point...I could see abort
meaning "I don't care anymore, just get me out of here!" or "Just give me what you have now!". Since it could mean potentially losing messages if in receiveAndDelete mode it seems safer to return those messages if you've got them, but also curious to hear @bterlson 's opinion.
// Final action to be performed after | ||
// - maxMessageCount is reached or | ||
// - maxWaitTime is passed or | ||
// - newMessageWaitTimeoutInSeconds is passed since the last message was received | ||
const finalAction = (): void => { | ||
if (finalActionCalled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like we used to have something like this before...
isConnecting: () => this.isConnecting, | ||
setIsConnecting: (value: boolean) => (this.isConnecting = value), | ||
getCloseInitiated: () => this.wasCloseInitiated, | ||
create: async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Slight readability improvement:
create: async () => { | |
create: async createReceiver() => { |
throw new Error("abortSignal was properly passed to open()!"); | ||
} else { | ||
throw new Error("No abortSignal was passed to open()!"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Might not be worth doing now, but it might be slightly easier to maintain/read if these messages were in variables like abortSignalPresent
and abortSignalMissing
. Then it'd be easy to tell in the asserts what's supposed to happen without having to parse the text (because my brain now parses variable names better than english words)
Adding in abortSignal support to:
There were some refactors to make this more consistent (testing is still happening, so this is just a draft):