Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Service Bus] Better errors for settling deferred msg in ReceiveAndDelete mode #10396

Merged
merged 4 commits into from
Aug 5, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ export class BatchingReceiverLite {
clientEntityContext,
context.message!,
context.delivery!,
true
true,
this._receiveMode
);
};

Expand Down
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/src/core/managementClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,8 @@ export class ManagementClient extends LinkEntity {
this._context,
decodedMessage as any,
{ tag: msg["lock-token"] } as any,
false
false,
receiveMode
);
if (message.lockToken && message.lockedUntilUtc) {
this._context.requestResponseLockedMessages.set(
Expand Down
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,8 @@ export class StreamingReceiver extends MessageReceiver {
this._context,
context.message!,
context.delivery!,
true
true,
this.receiveMode
);

if (this.autoRenewLock && bMessage.lockToken) {
Expand Down
56 changes: 29 additions & 27 deletions sdk/servicebus/service-bus/src/serviceBusMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -899,9 +899,13 @@ export class ServiceBusMessageImpl implements ReceivedMessageWithLock {
context: ClientEntityContext,
msg: AmqpMessage,
delivery: Delivery,
shouldReorderLockToken: boolean
shouldReorderLockToken: boolean,
receiveMode: ReceiveMode
) {
Object.assign(this, fromAmqpMessage(msg, delivery, shouldReorderLockToken));
if (receiveMode === ReceiveMode.receiveAndDelete) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just pop a comment in here saying this is a workaround (because eventually we can get rid of this, right?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment in 6fec199

this.lockToken = undefined;
}
this._context = context;
if (msg.body) {
this.body = this._context.namespace.dataTransformer.decode(msg.body);
Expand Down Expand Up @@ -996,21 +1000,12 @@ export class ServiceBusMessageImpl implements ReceivedMessageWithLock {
description: `Invalid operation on the message, message lock doesn't exist when dealing with sessions`,
condition: ErrorNameConditionMapper.InvalidOperationError
});
} else if (!this._context.requestResponseLockedMessages.has(this.lockToken!)) {
// In case the message wasn't from a deferred queue,
// 1. We have the access to the receiver which can be used to throw error in case of the ReceiveAndDelete mode
// 2. We can additionally verify the remote_settled flag on the delivery
// - If the flag is true, throw an error since the message has been settled (Specifically, with a receive link)
// - If the flag is false, we can't say that the message has not been settled
// since settling with the management link won't update the delivery (In this case, service would throw an error)
const receiver = this._context.getReceiver(this.delivery.link.name, this.sessionId);
if (receiver && receiver.receiveMode !== ReceiveMode.peekLock) {
error = new Error(
getErrorMessageNotSupportedInReceiveAndDeleteMode(`renew the lock on the message`)
);
} else if (this.delivery.remote_settled) {
error = new Error(`Failed to renew the lock as this message is already settled.`);
}
} else if (!this.lockToken) {
error = new Error(
getErrorMessageNotSupportedInReceiveAndDeleteMode(`renew the lock on the message`)
);
} else if (this.delivery.remote_settled) {
error = new Error(`Failed to renew the lock as this message is already settled.`);
}
if (error) {
log.error(
Expand Down Expand Up @@ -1066,26 +1061,33 @@ export class ServiceBusMessageImpl implements ReceivedMessageWithLock {
operation: DispositionType,
options?: DispositionStatusOptions
): Promise<void> {
const isDeferredMessage = this._context.requestResponseLockedMessages.has(this.lockToken!);
if (!this.lockToken) {
const error = new Error(
getErrorMessageNotSupportedInReceiveAndDeleteMode(`${operation} the message`)
);
log.error(
"[%s] An error occurred when settling a message with id '%s': %O",
this._context.namespace.connectionId,
this.messageId,
error
);
throw error;
}
const isDeferredMessage = this._context.requestResponseLockedMessages.has(this.lockToken);
const receiver = isDeferredMessage
? undefined
: this._context.getReceiver(this.delivery.link.name, this.sessionId);

if (!isDeferredMessage) {
// In case the message wasn't from a deferred queue,
// 1. We have the access to the receiver which can be used to throw error in case of the ReceiveAndDelete mode
// 2. We can additionally verify the remote_settled flag on the delivery
// 1. We can verify the remote_settled flag on the delivery
// - If the flag is true, throw an error since the message has been settled (Specifically, with a receive link)
// - If the flag is false, we can't say that the message has not been settled
// since settling with the management link won't update the delivery (In this case, service would throw an error)
// 3. If the message has a session-id and if the associated receiver link is unavailable,
// 2. If the message has a session-id and if the associated receiver link is unavailable,
// then throw an error since we need a lock on the session to settle the message.
let error: Error | undefined;
if (receiver && receiver.receiveMode !== ReceiveMode.peekLock) {
error = new Error(
getErrorMessageNotSupportedInReceiveAndDeleteMode(`${operation} the message`)
);
} else if (this.delivery.remote_settled) {
if (this.delivery.remote_settled) {
error = new Error(`Failed to ${operation} the message as this message is already settled.`);
} else if ((!receiver || !receiver.isOpen()) && this.sessionId != undefined) {
error = translate({
Expand All @@ -1110,13 +1112,13 @@ export class ServiceBusMessageImpl implements ReceivedMessageWithLock {
// 1. If the received message is deferred as such messages can only be settled using managementLink
// 2. If the associated receiver link is not available. This does not apply to messages from sessions as we need a lock on the session to do so.
if (isDeferredMessage || ((!receiver || !receiver.isOpen()) && this.sessionId == undefined)) {
await this._context.managementClient!.updateDispositionStatus(this.lockToken!, operation, {
await this._context.managementClient!.updateDispositionStatus(this.lockToken, operation, {
...options,
sessionId: this.sessionId
});
if (isDeferredMessage) {
// Remove the message from the internal map of deferred messages
this._context.requestResponseLockedMessages.delete(this.lockToken!);
this._context.requestResponseLockedMessages.delete(this.lockToken);
}
return;
}
Expand Down
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/src/session/messageSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,8 @@ export class MessageSession extends LinkEntity {
this._context,
context.message!,
context.delivery!,
true
true,
this.receiveMode
);

try {
Expand Down
157 changes: 140 additions & 17 deletions sdk/servicebus/service-bus/test/receiveAndDeleteMode.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -328,13 +328,16 @@ describe("receive and delete", () => {
});

describe("Receive Deferred messages in ReceiveAndDelete mode", function(): void {
let sequenceNumber: Long;
let entityNames: EntityName;

afterEach(async () => {
await afterEachTest();
});
async function deferMessage(useSessions?: boolean): Promise<void> {
const testMessages = useSessions ? TestMessage.getSessionSample() : TestMessage.getSample();
async function deferMessage(testClientType: TestClientType): Promise<Long> {
entityNames = await beforeEachTest(testClientType, "peekLock");
const testMessages = entityNames.usesSessions
? TestMessage.getSessionSample()
: TestMessage.getSample();
await sender.sendMessages(testMessages);
const batch = await receiver.receiveMessages(1);
const msgs = batch;
Expand All @@ -349,60 +352,180 @@ describe("receive and delete", () => {
);
should.equal(msgs[0].deliveryCount, 0, "DeliveryCount is different than expected");

sequenceNumber = msgs[0].sequenceNumber!;
await (msgs[0] as ReceivedMessageWithLock).defer();
return msgs[0].sequenceNumber!;
}

async function receiveDeferredMessage(): Promise<void> {
async function testDeferredMessage(testClientType: TestClientType): Promise<ReceivedMessage> {
const sequenceNumber = await deferMessage(testClientType);
await receiver.close();
receiver = await serviceBusClient.test.getReceiveAndDeleteReceiver(entityNames);

const [deferredMsg] = await receiver.receiveDeferredMessages(sequenceNumber);
if (!deferredMsg) {
throw `No message received for sequence number ${sequenceNumber}`;
}

should.equal(deferredMsg!.deliveryCount, 1, "DeliveryCount is different than expected");
await testPeekMsgsLength(receiver, 0);
}

async function deferAndReceiveMessage(testClientType: TestClientType) {
const entityNames = await beforeEachTest(testClientType, "peekLock");
await deferMessage(entityNames.usesSessions);
await receiver.close();
receiver = await serviceBusClient.test.getReceiveAndDeleteReceiver(entityNames);
await receiveDeferredMessage();
return deferredMsg;
}

/*
// The below are commented due to service bug described in https://github.com/Azure/azure-sdk-for-js/issues/2268
it("Partitioned Queue: No settlement of the message removes message", async function(): Promise<
void
> {
await deferAndReceiveMessage(TestClientType.PartitionedQueue);
await testDeferredMessage(TestClientType.PartitionedQueue);
});

it("Partitioned Subscription: No settlement of the message removes message", async function(): Promise<
void
> {
await deferAndReceiveMessage(TestClientType.PartitionedSubscription);
await testDeferredMessage(TestClientType.PartitionedSubscription);
});
*/

it("Unpartitioned Queue: No settlement of the message removes message", async function(): Promise<
void
> {
await deferAndReceiveMessage(TestClientType.UnpartitionedQueue);
await testDeferredMessage(TestClientType.UnpartitionedQueue);
});

it("Unpartitioned Subscription: No settlement of the message removes message", async function(): Promise<
void
> {
await deferAndReceiveMessage(TestClientType.UnpartitionedSubscription);
await testDeferredMessage(TestClientType.UnpartitionedSubscription);
});

it(
withSessionTestClientType + ": No settlement of the message removes message",
async function(): Promise<void> {
await deferAndReceiveMessage(withSessionTestClientType);
await testDeferredMessage(withSessionTestClientType);
}
);
});

describe("Settlement of deferred msg in ReceiveAndDelete mode", () => {
afterEach(async () => {
await afterEachTest();
});

let entityNames: EntityName;

async function testDeferredMessage(testClientType: TestClientType): Promise<ReceivedMessage> {
entityNames = await beforeEachTest(testClientType, "peekLock");

// send message
const testMessage = entityNames.usesSessions
? TestMessage.getSessionSample()
: TestMessage.getSample();
await sender.sendMessages(testMessage);

// receive and defer the message
const [msg] = await receiver.receiveMessages(1);
await (msg as ReceivedMessageWithLock).defer();
const sequenceNumber = msg.sequenceNumber!;
await receiver.close();

// Receive the deferred message in ReceiveAndDelete mode
receiver = await serviceBusClient.test.getReceiveAndDeleteReceiver(entityNames);
const [deferredMsg] = await receiver.receiveDeferredMessages(sequenceNumber);
if (!deferredMsg) {
throw `No message received for sequence number ${sequenceNumber}`;
}

return deferredMsg;
}

const testError = (err: Error, operation: DispositionType): void => {
expect(err.message.toLowerCase(), "ErrorMessage is different than expected").includes(
`failed to ${operation} the message as the operation is only supported in \'peeklock\' receive mode.`
);
};

async function testSettlement(
testClienttype: TestClientType,
operation: DispositionType
): Promise<void> {
const deferredMsg = await testDeferredMessage(testClienttype);
// we have to force this cast - the type system doesn't allow this if you've chosen receiveAndDelete
// as your lock mode.
const msg = deferredMsg as ReceivedMessageWithLock;

try {
if (operation === DispositionType.complete) {
await msg.complete();
} else if (operation === DispositionType.abandon) {
await msg.abandon();
} else if (operation === DispositionType.deadletter) {
await msg.deadLetter();
} else if (operation === DispositionType.defer) {
await msg.defer();
}
} catch (err) {
errorWasThrown = true;
testError(err, operation);
}

should.equal(errorWasThrown, true, "Error thrown flag must be true");
}

it(noSessionTestClientType + ": complete() throws error", async function(): Promise<void> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be simpler to just do the [].forEach(() => {}) trick instead?

Like this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

await testSettlement(noSessionTestClientType, DispositionType.complete);
});

it(withSessionTestClientType + ": complete() throws error", async function(): Promise<void> {
await testSettlement(withSessionTestClientType, DispositionType.complete);
});

it(noSessionTestClientType + ": abandon() throws error", async function(): Promise<void> {
await testSettlement(noSessionTestClientType, DispositionType.abandon);
});

it(withSessionTestClientType + ": abandon() throws error", async function(): Promise<void> {
await testSettlement(withSessionTestClientType, DispositionType.abandon);
});

it(noSessionTestClientType + ": defer() throws error", async function(): Promise<void> {
await testSettlement(noSessionTestClientType, DispositionType.defer);
});

it(withSessionTestClientType + ": defer() throws error", async function(): Promise<void> {
await testSettlement(withSessionTestClientType, DispositionType.defer);
});

it(noSessionTestClientType + ": deadLetter() throws error", async function(): Promise<void> {
await testSettlement(noSessionTestClientType, DispositionType.deadletter);
});

it(withSessionTestClientType + ": deadLetter() throws error", async function(): Promise<void> {
await testSettlement(withSessionTestClientType, DispositionType.deadletter);
});

async function testRenewLock(testClienttype: TestClientType): Promise<void> {
const deferredMsg = await testDeferredMessage(testClienttype);
// we have to force this cast - the type system doesn't allow this if you've chosen receiveAndDelete
// as your lock mode.

// have to cast it - the type system doesn't allow us to call into this method otherwise.
Comment on lines +509 to +512
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repetitive?

Suggested change
// we have to force this cast - the type system doesn't allow this if you've chosen receiveAndDelete
// as your lock mode.
// have to cast it - the type system doesn't allow us to call into this method otherwise.
// we have to force this cast - the type system doesn't allow us to call into this method otherwise.

Copy link
Contributor Author

@ramya-rao-a ramya-rao-a Aug 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no idea, I copied the entire test set from the ones we have for "normal" messages in this file :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the problem is that the Receiver isn't being typed more specifically.

We should fix this at some point to allow the receiver to be scoped properly but I think (grossly) this is the best we can do for now.

await (deferredMsg as ReceivedMessageWithLock).renewLock().catch((err) => {
should.equal(
err.message,
getErrorMessageNotSupportedInReceiveAndDeleteMode("renew the lock on the message"),
"ErrorMessage is different than expected"
);
errorWasThrown = true;
});

should.equal(errorWasThrown, true, "Error thrown flag must be true");
}

it(noSessionTestClientType + ": Renew message lock throws error", async function(): Promise<
void
> {
await testRenewLock(noSessionTestClientType);
});
});
});