Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: [service-bus] Add in abortSignal support in createBatch, subscribe and receiveMessages #8994

Conversation

richardpark-msft
Copy link
Member

Adding in abortSignal support to:

  • subscribe
  • receiveMessages
  • createBatch

There were some refactors to make this more consistent (testing is still happening, so this is just a draft):

  • Made the openLink (equivalent of the old .open()/_init() calls in MessageSender and MessageReceiver) a common function with no LinkEntity dependency (easier for testing)
  • Added in intermediate aborts into that function
  • Made a few abort signal helpers (in utils.ts) to make it easier to register with an abort signal and properly clean up (fixing a bug I left in MessageSender where I didn't clean up my abort signal handlers).

@@ -173,6 +186,12 @@ export class BatchingReceiver extends MessageReceiver {
}
});

cleanupAbortSignal = checkAndRegisterWithAbortSignal(
() => finalAction(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we calling finalAction() when the abort signal is fired instead of throwing the AbortError?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was keeping with the sentiment that we've got in the connectionErrorHandler and returning the messages we've got so far, rather than just error out. In receiveAndDelete mode this is particularly important. Less so in peekLock.

TBH, it's inconsistent but it seems like it's favorable in this spot.

@bterlson - curious what your arch take on this is. Would it be better to be consistent and throw an AbortError here or would it be okay to treat abort as a soft cancel of the current operation when you have partial results?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an interesting point...I could see abort meaning "I don't care anymore, just get me out of here!" or "Just give me what you have now!". Since it could mean potentially losing messages if in receiveAndDelete mode it seems safer to return those messages if you've got them, but also curious to hear @bterlson 's opinion.


if (cleanupAbortSignal) {
cleanupAbortSignal();
cleanupAbortSignal = undefined;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having the clean up being called from finalAction() ensures clean up on the happy path. What about cases when the promise returned from receive() is rejected?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point, will push a fix, just waiting for the test run to complete.

I'm just going to chain two promises together, rather than trying to intercept each exit from this particular promise.

@richardpark-msft
Copy link
Member Author

/azp run js - servicebus - tests

@azure-pipelines
Copy link

Pull request contains merge conflicts.

@richardpark-msft richardpark-msft force-pushed the richardpark-sb-track2-abortsignal-v2 branch from 0b17a16 to 0869c5a Compare May 21, 2020 01:52
@richardpark-msft
Copy link
Member Author

/azp run js - servicebus - tests

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

@richardpark-msft
Copy link
Member Author

/azp run js - servicebus - tests

@azure-pipelines
Copy link

Pull request contains merge conflicts.

@richardpark-msft richardpark-msft force-pushed the richardpark-sb-track2-abortsignal-v2 branch from cc6bab1 to 19ed1f1 Compare May 21, 2020 22:00
@richardpark-msft
Copy link
Member Author

/azp run js - servicebus - tests

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

Copy link
Contributor

@ramya-rao-a ramya-rao-a left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @richardpark-msft,
I have reviewed and left my first round of comments, please take a look.

If it is possible, then I'd much prefer the work being done here to be done in phases in two separate PRs. Phase 1 would involve the utilities that you added for timeout and abort signal with them being used from link creation code path, Phase 2 would involve introducing openLink to consolidate the opening of AMQP links across senders and receivers.

I havent looked at the streaming and batching receiver yet

@@ -15,6 +15,7 @@ import { CreateSessionReceiverOptions, CreateSenderOptions } from "./models";
import { Receiver, ReceiverImpl } from "./receivers/receiver";
import { SessionReceiver, SessionReceiverImpl } from "./receivers/sessionReceiver";
import { ReceivedMessageWithLock, ReceivedMessage } from "./serviceBusMessage";
import { openLink } from "./shared/openLink";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to import openLink here and pass to the constructors of ReceiverImpl and SenderImpl? These constructors can import and use it directly right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They could - this is to make unit testing possible by always injecting openLink.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is injecting the only way to ensure unit testability? My concern is that this model is not intuitive and adds complexity that passing around a function brings.

If openLink was meant to be a common shared code between MessageSender and MessageReceiver, then I would expect it to be part of LinkEntity which is the base class for both. An independent helper method makes sense when it is being used from multiple unrelated places & is stateless. Given that this helper is only ever going to be used by classes that inherit from LinkEntity, the base class is a much better fit

Seeing the unit tests, I mainly see the below categories

  • Ensure abort signal is plumbed through to open()
  • Ensure open() is not called when abort signal is already fired

If this common code was on LinkEntity, then for the above unit tests, we can override the method for open(). I see that the plumbing needs to be tested from SenderImpl -> MessageSender -> LinkEntity. Would perhaps combining SenderImpl & MessageSender help?

export class SenderImpl extends LinkEntity implements Sender ?

Then you test code would be

class TestAbortSignalSender extends SenderImpl {
     async openLink(..) {
        if (options.abortSignal != null) {
          throw new Error("abortSignal was properly passed to open()!");
        } else {
          throw new Error("No abortSignal was passed to open()!");
        }
     } 
}

instead of

sender = new SenderImpl(clientEntityContext, {}, async (openArgs) => {
       if (openArgs.abortSignal != null) {
          throw new Error("abortSignal was properly passed to open()!");
        } else {
          throw new Error("No abortSignal was passed to open()!");
        }
});

cc @chradek

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely not the only way. There are so many ways!

Inheritance for testing is also a valid approach and I don't think it's bad. In the code as it is I think inheritance is making it difficult to see where the lines are between what's mutable, and what's not.

Also, this is definitely not this code's final form but it is a step in the direction that I think helps with making the code easier to maintain. Some state is ultimately required (so we might want to consider how to do that) though but I didn't want to do too much in one PR.

export function openLink<T extends RheaLink>(args: OpenArgs<T>): Promise<T | undefined> {
const checkAborted = (): void => {
if (args.abortSignal?.aborted) {
throw new AbortError("open() has been cancelled by user.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user would have no idea what open() is, so this error message would not make much sense to them

checkAborted();

if (args.isOpen() || args.isConnecting() || args.getCloseInitiated()) {
return Promise.resolve(undefined);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If openLink is declared with the async keyword, then this can simply be return

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another benefit to making openLink async!


try {
if (args.isOpen()) {
return Promise.resolve(undefined);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this callback passed to lock acquire is declared as async, this can simply be return

* we can stop passing in some of these functions.
*
* For instance
* - negotiateClaim + ensureTokenRenewal can be lifted out of LinkEntity
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of LinkEntity and into where?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally I was thinking just moving the methods into into openLink.ts since they're all pretty related to that activity.

However, I've been thinking about that some more. My main issue with them being here is that inheritance can make it difficult to see the proper separation between LinkEntity and its child classes. So when I was tracing through this it wasn't entirely obvious which parts were "internal" state to LinkEntity, etc...

Having been through it more I could see it staying but it'd be nice to make that first-time read a bit easier.

// these are more candidates to move out from LinkEntity. They don't really depend on anything special
// in LinkEntity and are just clutter.
negotiateClaim(): Promise<void>;
ensureTokenRenewal(): Promise<void>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking into _ensureTokenRenewal in LinkEntity, its not clear to me why this would be an async function. All it does is set a timer to negotiate claim

// TODO: do we _need_ this lock at the moment? If we're not trying to get
// open and close to be mutually exclusive?
return defaultLock.acquire(args.openLock, async () => {
checkAborted();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a lot of calls to checkAborted() being done in here after acquiring the lock. I understand the need to use it so that we can exit as soon as possible instead of going through the different async operations, but we don't need it to throw the AbortError itself.

We can use checkAndRegisterWithAbortSignal() before calling openLink(). This way, in here, we only need to add early exits before every async operation. And then when these async operations can support abort signal in the future, these checks can go away.

- Removing unnecessary clearTimeout
- Adding in some commentary
- Adding in a missing test - when the action throws an error.
@richardpark-msft
Copy link
Member Author

Hey @richardpark-msft,
I have reviewed and left my first round of comments, please take a look.

If it is possible, then I'd much prefer the work being done here to be done in phases in two separate PRs. Phase 1 would involve the utilities that you added for timeout and abort signal with them being used from link creation code path, Phase 2 would involve introducing openLink to consolidate the opening of AMQP links across senders and receivers.

I havent looked at the streaming and batching receiver yet

Created PR#9138 for the abort helpers.

@richardpark-msft richardpark-msft force-pushed the richardpark-sb-track2-abortsignal-v2 branch from 6d68bce to 1905b3a Compare May 28, 2020 00:10
@@ -184,7 +184,7 @@ export class LinkEntity {
* Ensures that the token is renewed within the predefined renewal margin.
* @returns {void}
*/
protected async _ensureTokenRenewal(): Promise<void> {
protected _ensureTokenRenewal(): void {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are few places from where this is called. All those places need to be updated to remove the await

@ramya-rao-a
Copy link
Contributor

Resolved all my comments on the utils.ts file as they have either been addressed in #9138 or I have copied the conversation over to that PR

try {
await Promise.race([this.open(), initTimeoutPromise]);
await waitForTimeoutOrAbortOrResolve({
actionFn: () => this.open({ abortSignal: options?.abortSignal }),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: You already got the abortSignal so can do this:

Suggested change
actionFn: () => this.open({ abortSignal: options?.abortSignal }),
actionFn: () => this.open({ abortSignal }),

import { AbortSignalLike } from "@azure/core-http";
import { AbortError } from "@azure/abort-controller";

export type RheaLink = AwaitableSender | RheaReceiver;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: My obligatory "Add @internal and @ignores to anything exported that shouldn't be public!"

export function openLink<T extends RheaLink>(args: OpenArgs<T>): Promise<T | undefined> {
const checkAborted = (): void => {
if (args.abortSignal?.aborted) {
throw new AbortError("open() has been cancelled by user.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just happened to talk about this in another PR! openLink is not currently an async method, so I'd expect that if this function gets called outside of the promise that openLink returns, it would return a Promise.reject(new AbortError("")).

This gets a bit complicated though since you re-use this function inside the promise that's returned, and I don't think (not certain) that returning a reject promise would trigger the catch block that's used to close a link.

Ultimately it might just be easier to make openLink an async function so you don't have to worry about this.

checkAborted();

if (args.isOpen() || args.isConnecting() || args.getCloseInitiated()) {
return Promise.resolve(undefined);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another benefit to making openLink async!

throwErrorIfConnectionClosed(this._context.namespace);
this._onMessage = onMessage;
this._onError = onError;

this._cleanupAbortHandler = checkAndRegisterWithAbortSignal(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't allow calling receive multiple times do we? Is there any risk that an existing _cleanupAbortHandler might get overwritten with a new one? (Should we be pushing the cleanup methods to a list and iterate through them when we call close?)

@@ -173,6 +186,12 @@ export class BatchingReceiver extends MessageReceiver {
}
});

cleanupAbortSignal = checkAndRegisterWithAbortSignal(
() => finalAction(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an interesting point...I could see abort meaning "I don't care anymore, just get me out of here!" or "Just give me what you have now!". Since it could mean potentially losing messages if in receiveAndDelete mode it seems safer to return those messages if you've got them, but also curious to hear @bterlson 's opinion.

// Final action to be performed after
// - maxMessageCount is reached or
// - maxWaitTime is passed or
// - newMessageWaitTimeoutInSeconds is passed since the last message was received
const finalAction = (): void => {
if (finalActionCalled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we used to have something like this before...

isConnecting: () => this.isConnecting,
setIsConnecting: (value: boolean) => (this.isConnecting = value),
getCloseInitiated: () => this.wasCloseInitiated,
create: async () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slight readability improvement:

Suggested change
create: async () => {
create: async createReceiver() => {

Comment on lines +39 to +41
throw new Error("abortSignal was properly passed to open()!");
} else {
throw new Error("No abortSignal was passed to open()!");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Might not be worth doing now, but it might be slightly easier to maintain/read if these messages were in variables like abortSignalPresent and abortSignalMissing. Then it'd be easy to tell in the asserts what's supposed to happen without having to parse the text (because my brain now parses variable names better than english words)

@richardpark-msft
Copy link
Member Author

Closing this PR - the work has migrated into two separate PRs:
#9233 - sender
#9284 - receiver

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants