Skip to content

Commit

Permalink
[Service Bus] Better errors for settling deferred msg in ReceiveAndDe…
Browse files Browse the repository at this point in the history
…lete mode (#10396)
  • Loading branch information
ramya-rao-a authored Aug 5, 2020
1 parent 6512a65 commit 866d723
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 48 deletions.
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ export class BatchingReceiverLite {
clientEntityContext,
context.message!,
context.delivery!,
true
true,
this._receiveMode
);
};

Expand Down
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/src/core/managementClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,8 @@ export class ManagementClient extends LinkEntity {
this._context,
decodedMessage as any,
{ tag: msg["lock-token"] } as any,
false
false,
receiveMode
);
if (message.lockToken && message.lockedUntilUtc) {
this._context.requestResponseLockedMessages.set(
Expand Down
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,8 @@ export class StreamingReceiver extends MessageReceiver {
this._context,
context.message!,
context.delivery!,
true
true,
this.receiveMode
);

if (this.autoRenewLock && bMessage.lockToken) {
Expand Down
58 changes: 31 additions & 27 deletions sdk/servicebus/service-bus/src/serviceBusMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -899,9 +899,15 @@ export class ServiceBusMessageImpl implements ReceivedMessageWithLock {
context: ClientEntityContext,
msg: AmqpMessage,
delivery: Delivery,
shouldReorderLockToken: boolean
shouldReorderLockToken: boolean,
receiveMode: ReceiveMode
) {
Object.assign(this, fromAmqpMessage(msg, delivery, shouldReorderLockToken));
// Lock on a message is applicable only in peekLock mode, but the service sets
// the lock token even in receiveAndDelete mode if the entity in question is partitioned.
if (receiveMode === ReceiveMode.receiveAndDelete) {
this.lockToken = undefined;
}
this._context = context;
if (msg.body) {
this.body = this._context.namespace.dataTransformer.decode(msg.body);
Expand Down Expand Up @@ -996,21 +1002,12 @@ export class ServiceBusMessageImpl implements ReceivedMessageWithLock {
description: `Invalid operation on the message, message lock doesn't exist when dealing with sessions`,
condition: ErrorNameConditionMapper.InvalidOperationError
});
} else if (!this._context.requestResponseLockedMessages.has(this.lockToken!)) {
// In case the message wasn't from a deferred queue,
// 1. We have the access to the receiver which can be used to throw error in case of the ReceiveAndDelete mode
// 2. We can additionally verify the remote_settled flag on the delivery
// - If the flag is true, throw an error since the message has been settled (Specifically, with a receive link)
// - If the flag is false, we can't say that the message has not been settled
// since settling with the management link won't update the delivery (In this case, service would throw an error)
const receiver = this._context.getReceiver(this.delivery.link.name, this.sessionId);
if (receiver && receiver.receiveMode !== ReceiveMode.peekLock) {
error = new Error(
getErrorMessageNotSupportedInReceiveAndDeleteMode(`renew the lock on the message`)
);
} else if (this.delivery.remote_settled) {
error = new Error(`Failed to renew the lock as this message is already settled.`);
}
} else if (!this.lockToken) {
error = new Error(
getErrorMessageNotSupportedInReceiveAndDeleteMode(`renew the lock on the message`)
);
} else if (this.delivery.remote_settled) {
error = new Error(`Failed to renew the lock as this message is already settled.`);
}
if (error) {
log.error(
Expand Down Expand Up @@ -1066,26 +1063,33 @@ export class ServiceBusMessageImpl implements ReceivedMessageWithLock {
operation: DispositionType,
options?: DispositionStatusOptions
): Promise<void> {
const isDeferredMessage = this._context.requestResponseLockedMessages.has(this.lockToken!);
if (!this.lockToken) {
const error = new Error(
getErrorMessageNotSupportedInReceiveAndDeleteMode(`${operation} the message`)
);
log.error(
"[%s] An error occurred when settling a message with id '%s': %O",
this._context.namespace.connectionId,
this.messageId,
error
);
throw error;
}
const isDeferredMessage = this._context.requestResponseLockedMessages.has(this.lockToken);
const receiver = isDeferredMessage
? undefined
: this._context.getReceiver(this.delivery.link.name, this.sessionId);

if (!isDeferredMessage) {
// In case the message wasn't from a deferred queue,
// 1. We have the access to the receiver which can be used to throw error in case of the ReceiveAndDelete mode
// 2. We can additionally verify the remote_settled flag on the delivery
// 1. We can verify the remote_settled flag on the delivery
// - If the flag is true, throw an error since the message has been settled (Specifically, with a receive link)
// - If the flag is false, we can't say that the message has not been settled
// since settling with the management link won't update the delivery (In this case, service would throw an error)
// 3. If the message has a session-id and if the associated receiver link is unavailable,
// 2. If the message has a session-id and if the associated receiver link is unavailable,
// then throw an error since we need a lock on the session to settle the message.
let error: Error | undefined;
if (receiver && receiver.receiveMode !== ReceiveMode.peekLock) {
error = new Error(
getErrorMessageNotSupportedInReceiveAndDeleteMode(`${operation} the message`)
);
} else if (this.delivery.remote_settled) {
if (this.delivery.remote_settled) {
error = new Error(`Failed to ${operation} the message as this message is already settled.`);
} else if ((!receiver || !receiver.isOpen()) && this.sessionId != undefined) {
error = translate({
Expand All @@ -1110,13 +1114,13 @@ export class ServiceBusMessageImpl implements ReceivedMessageWithLock {
// 1. If the received message is deferred as such messages can only be settled using managementLink
// 2. If the associated receiver link is not available. This does not apply to messages from sessions as we need a lock on the session to do so.
if (isDeferredMessage || ((!receiver || !receiver.isOpen()) && this.sessionId == undefined)) {
await this._context.managementClient!.updateDispositionStatus(this.lockToken!, operation, {
await this._context.managementClient!.updateDispositionStatus(this.lockToken, operation, {
...options,
sessionId: this.sessionId
});
if (isDeferredMessage) {
// Remove the message from the internal map of deferred messages
this._context.requestResponseLockedMessages.delete(this.lockToken!);
this._context.requestResponseLockedMessages.delete(this.lockToken);
}
return;
}
Expand Down
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/src/session/messageSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,8 @@ export class MessageSession extends LinkEntity {
this._context,
context.message!,
context.delivery!,
true
true,
this.receiveMode
);

try {
Expand Down
157 changes: 140 additions & 17 deletions sdk/servicebus/service-bus/test/receiveAndDeleteMode.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -328,13 +328,16 @@ describe("receive and delete", () => {
});

describe("Receive Deferred messages in ReceiveAndDelete mode", function(): void {
let sequenceNumber: Long;
let entityNames: EntityName;

afterEach(async () => {
await afterEachTest();
});
async function deferMessage(useSessions?: boolean): Promise<void> {
const testMessages = useSessions ? TestMessage.getSessionSample() : TestMessage.getSample();
async function deferMessage(testClientType: TestClientType): Promise<Long> {
entityNames = await beforeEachTest(testClientType, "peekLock");
const testMessages = entityNames.usesSessions
? TestMessage.getSessionSample()
: TestMessage.getSample();
await sender.sendMessages(testMessages);
const batch = await receiver.receiveMessages(1);
const msgs = batch;
Expand All @@ -349,60 +352,180 @@ describe("receive and delete", () => {
);
should.equal(msgs[0].deliveryCount, 0, "DeliveryCount is different than expected");

sequenceNumber = msgs[0].sequenceNumber!;
await (msgs[0] as ReceivedMessageWithLock).defer();
return msgs[0].sequenceNumber!;
}

async function receiveDeferredMessage(): Promise<void> {
async function testDeferredMessage(testClientType: TestClientType): Promise<ReceivedMessage> {
const sequenceNumber = await deferMessage(testClientType);
await receiver.close();
receiver = await serviceBusClient.test.getReceiveAndDeleteReceiver(entityNames);

const [deferredMsg] = await receiver.receiveDeferredMessages(sequenceNumber);
if (!deferredMsg) {
throw `No message received for sequence number ${sequenceNumber}`;
}

should.equal(deferredMsg!.deliveryCount, 1, "DeliveryCount is different than expected");
await testPeekMsgsLength(receiver, 0);
}

async function deferAndReceiveMessage(testClientType: TestClientType) {
const entityNames = await beforeEachTest(testClientType, "peekLock");
await deferMessage(entityNames.usesSessions);
await receiver.close();
receiver = await serviceBusClient.test.getReceiveAndDeleteReceiver(entityNames);
await receiveDeferredMessage();
return deferredMsg;
}

/*
// The below are commented due to service bug described in https://github.com/Azure/azure-sdk-for-js/issues/2268
it("Partitioned Queue: No settlement of the message removes message", async function(): Promise<
void
> {
await deferAndReceiveMessage(TestClientType.PartitionedQueue);
await testDeferredMessage(TestClientType.PartitionedQueue);
});
it("Partitioned Subscription: No settlement of the message removes message", async function(): Promise<
void
> {
await deferAndReceiveMessage(TestClientType.PartitionedSubscription);
await testDeferredMessage(TestClientType.PartitionedSubscription);
});
*/

it("Unpartitioned Queue: No settlement of the message removes message", async function(): Promise<
void
> {
await deferAndReceiveMessage(TestClientType.UnpartitionedQueue);
await testDeferredMessage(TestClientType.UnpartitionedQueue);
});

it("Unpartitioned Subscription: No settlement of the message removes message", async function(): Promise<
void
> {
await deferAndReceiveMessage(TestClientType.UnpartitionedSubscription);
await testDeferredMessage(TestClientType.UnpartitionedSubscription);
});

it(
withSessionTestClientType + ": No settlement of the message removes message",
async function(): Promise<void> {
await deferAndReceiveMessage(withSessionTestClientType);
await testDeferredMessage(withSessionTestClientType);
}
);
});

describe("Settlement of deferred msg in ReceiveAndDelete mode", () => {
afterEach(async () => {
await afterEachTest();
});

let entityNames: EntityName;

async function testDeferredMessage(testClientType: TestClientType): Promise<ReceivedMessage> {
entityNames = await beforeEachTest(testClientType, "peekLock");

// send message
const testMessage = entityNames.usesSessions
? TestMessage.getSessionSample()
: TestMessage.getSample();
await sender.sendMessages(testMessage);

// receive and defer the message
const [msg] = await receiver.receiveMessages(1);
await (msg as ReceivedMessageWithLock).defer();
const sequenceNumber = msg.sequenceNumber!;
await receiver.close();

// Receive the deferred message in ReceiveAndDelete mode
receiver = await serviceBusClient.test.getReceiveAndDeleteReceiver(entityNames);
const [deferredMsg] = await receiver.receiveDeferredMessages(sequenceNumber);
if (!deferredMsg) {
throw `No message received for sequence number ${sequenceNumber}`;
}

return deferredMsg;
}

const testError = (err: Error, operation: DispositionType): void => {
expect(err.message.toLowerCase(), "ErrorMessage is different than expected").includes(
`failed to ${operation} the message as the operation is only supported in \'peeklock\' receive mode.`
);
};

async function testSettlement(
testClienttype: TestClientType,
operation: DispositionType
): Promise<void> {
const deferredMsg = await testDeferredMessage(testClienttype);
// we have to force this cast - the type system doesn't allow this if you've chosen receiveAndDelete
// as your lock mode.
const msg = deferredMsg as ReceivedMessageWithLock;

try {
if (operation === DispositionType.complete) {
await msg.complete();
} else if (operation === DispositionType.abandon) {
await msg.abandon();
} else if (operation === DispositionType.deadletter) {
await msg.deadLetter();
} else if (operation === DispositionType.defer) {
await msg.defer();
}
} catch (err) {
errorWasThrown = true;
testError(err, operation);
}

should.equal(errorWasThrown, true, "Error thrown flag must be true");
}

it(noSessionTestClientType + ": complete() throws error", async function(): Promise<void> {
await testSettlement(noSessionTestClientType, DispositionType.complete);
});

it(withSessionTestClientType + ": complete() throws error", async function(): Promise<void> {
await testSettlement(withSessionTestClientType, DispositionType.complete);
});

it(noSessionTestClientType + ": abandon() throws error", async function(): Promise<void> {
await testSettlement(noSessionTestClientType, DispositionType.abandon);
});

it(withSessionTestClientType + ": abandon() throws error", async function(): Promise<void> {
await testSettlement(withSessionTestClientType, DispositionType.abandon);
});

it(noSessionTestClientType + ": defer() throws error", async function(): Promise<void> {
await testSettlement(noSessionTestClientType, DispositionType.defer);
});

it(withSessionTestClientType + ": defer() throws error", async function(): Promise<void> {
await testSettlement(withSessionTestClientType, DispositionType.defer);
});

it(noSessionTestClientType + ": deadLetter() throws error", async function(): Promise<void> {
await testSettlement(noSessionTestClientType, DispositionType.deadletter);
});

it(withSessionTestClientType + ": deadLetter() throws error", async function(): Promise<void> {
await testSettlement(withSessionTestClientType, DispositionType.deadletter);
});

async function testRenewLock(testClienttype: TestClientType): Promise<void> {
const deferredMsg = await testDeferredMessage(testClienttype);
// we have to force this cast - the type system doesn't allow this if you've chosen receiveAndDelete
// as your lock mode.

// have to cast it - the type system doesn't allow us to call into this method otherwise.
await (deferredMsg as ReceivedMessageWithLock).renewLock().catch((err) => {
should.equal(
err.message,
getErrorMessageNotSupportedInReceiveAndDeleteMode("renew the lock on the message"),
"ErrorMessage is different than expected"
);
errorWasThrown = true;
});

should.equal(errorWasThrown, true, "Error thrown flag must be true");
}

it(noSessionTestClientType + ": Renew message lock throws error", async function(): Promise<
void
> {
await testRenewLock(noSessionTestClientType);
});
});
});

0 comments on commit 866d723

Please sign in to comment.