diff --git a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts index 112b0e96c185..476bf6db75fc 100644 --- a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts @@ -246,7 +246,8 @@ export class BatchingReceiverLite { clientEntityContext, context.message!, context.delivery!, - true + true, + this._receiveMode ); }; diff --git a/sdk/servicebus/service-bus/src/core/managementClient.ts b/sdk/servicebus/service-bus/src/core/managementClient.ts index fd83a4786b30..8617946cfa9a 100644 --- a/sdk/servicebus/service-bus/src/core/managementClient.ts +++ b/sdk/servicebus/service-bus/src/core/managementClient.ts @@ -845,7 +845,8 @@ export class ManagementClient extends LinkEntity { this._context, decodedMessage as any, { tag: msg["lock-token"] } as any, - false + false, + receiveMode ); if (message.lockToken && message.lockedUntilUtc) { this._context.requestResponseLockedMessages.set( diff --git a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts index 9eb61fef1753..56aa8ffba178 100644 --- a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts @@ -285,7 +285,8 @@ export class StreamingReceiver extends MessageReceiver { this._context, context.message!, context.delivery!, - true + true, + this.receiveMode ); if (this.autoRenewLock && bMessage.lockToken) { diff --git a/sdk/servicebus/service-bus/src/serviceBusMessage.ts b/sdk/servicebus/service-bus/src/serviceBusMessage.ts index df9d9ba452b1..6582732ed797 100644 --- a/sdk/servicebus/service-bus/src/serviceBusMessage.ts +++ b/sdk/servicebus/service-bus/src/serviceBusMessage.ts @@ -899,9 +899,15 @@ export class ServiceBusMessageImpl implements ReceivedMessageWithLock { context: ClientEntityContext, msg: AmqpMessage, delivery: Delivery, - shouldReorderLockToken: boolean + shouldReorderLockToken: boolean, + receiveMode: ReceiveMode ) { Object.assign(this, fromAmqpMessage(msg, delivery, shouldReorderLockToken)); + // Lock on a message is applicable only in peekLock mode, but the service sets + // the lock token even in receiveAndDelete mode if the entity in question is partitioned. + if (receiveMode === ReceiveMode.receiveAndDelete) { + this.lockToken = undefined; + } this._context = context; if (msg.body) { this.body = this._context.namespace.dataTransformer.decode(msg.body); @@ -996,21 +1002,12 @@ export class ServiceBusMessageImpl implements ReceivedMessageWithLock { description: `Invalid operation on the message, message lock doesn't exist when dealing with sessions`, condition: ErrorNameConditionMapper.InvalidOperationError }); - } else if (!this._context.requestResponseLockedMessages.has(this.lockToken!)) { - // In case the message wasn't from a deferred queue, - // 1. We have the access to the receiver which can be used to throw error in case of the ReceiveAndDelete mode - // 2. We can additionally verify the remote_settled flag on the delivery - // - If the flag is true, throw an error since the message has been settled (Specifically, with a receive link) - // - If the flag is false, we can't say that the message has not been settled - // since settling with the management link won't update the delivery (In this case, service would throw an error) - const receiver = this._context.getReceiver(this.delivery.link.name, this.sessionId); - if (receiver && receiver.receiveMode !== ReceiveMode.peekLock) { - error = new Error( - getErrorMessageNotSupportedInReceiveAndDeleteMode(`renew the lock on the message`) - ); - } else if (this.delivery.remote_settled) { - error = new Error(`Failed to renew the lock as this message is already settled.`); - } + } else if (!this.lockToken) { + error = new Error( + getErrorMessageNotSupportedInReceiveAndDeleteMode(`renew the lock on the message`) + ); + } else if (this.delivery.remote_settled) { + error = new Error(`Failed to renew the lock as this message is already settled.`); } if (error) { log.error( @@ -1066,26 +1063,33 @@ export class ServiceBusMessageImpl implements ReceivedMessageWithLock { operation: DispositionType, options?: DispositionStatusOptions ): Promise { - const isDeferredMessage = this._context.requestResponseLockedMessages.has(this.lockToken!); + if (!this.lockToken) { + const error = new Error( + getErrorMessageNotSupportedInReceiveAndDeleteMode(`${operation} the message`) + ); + log.error( + "[%s] An error occurred when settling a message with id '%s': %O", + this._context.namespace.connectionId, + this.messageId, + error + ); + throw error; + } + const isDeferredMessage = this._context.requestResponseLockedMessages.has(this.lockToken); const receiver = isDeferredMessage ? undefined : this._context.getReceiver(this.delivery.link.name, this.sessionId); if (!isDeferredMessage) { // In case the message wasn't from a deferred queue, - // 1. We have the access to the receiver which can be used to throw error in case of the ReceiveAndDelete mode - // 2. We can additionally verify the remote_settled flag on the delivery + // 1. We can verify the remote_settled flag on the delivery // - If the flag is true, throw an error since the message has been settled (Specifically, with a receive link) // - If the flag is false, we can't say that the message has not been settled // since settling with the management link won't update the delivery (In this case, service would throw an error) - // 3. If the message has a session-id and if the associated receiver link is unavailable, + // 2. If the message has a session-id and if the associated receiver link is unavailable, // then throw an error since we need a lock on the session to settle the message. let error: Error | undefined; - if (receiver && receiver.receiveMode !== ReceiveMode.peekLock) { - error = new Error( - getErrorMessageNotSupportedInReceiveAndDeleteMode(`${operation} the message`) - ); - } else if (this.delivery.remote_settled) { + if (this.delivery.remote_settled) { error = new Error(`Failed to ${operation} the message as this message is already settled.`); } else if ((!receiver || !receiver.isOpen()) && this.sessionId != undefined) { error = translate({ @@ -1110,13 +1114,13 @@ export class ServiceBusMessageImpl implements ReceivedMessageWithLock { // 1. If the received message is deferred as such messages can only be settled using managementLink // 2. If the associated receiver link is not available. This does not apply to messages from sessions as we need a lock on the session to do so. if (isDeferredMessage || ((!receiver || !receiver.isOpen()) && this.sessionId == undefined)) { - await this._context.managementClient!.updateDispositionStatus(this.lockToken!, operation, { + await this._context.managementClient!.updateDispositionStatus(this.lockToken, operation, { ...options, sessionId: this.sessionId }); if (isDeferredMessage) { // Remove the message from the internal map of deferred messages - this._context.requestResponseLockedMessages.delete(this.lockToken!); + this._context.requestResponseLockedMessages.delete(this.lockToken); } return; } diff --git a/sdk/servicebus/service-bus/src/session/messageSession.ts b/sdk/servicebus/service-bus/src/session/messageSession.ts index f9408e653447..60b00247c1da 100644 --- a/sdk/servicebus/service-bus/src/session/messageSession.ts +++ b/sdk/servicebus/service-bus/src/session/messageSession.ts @@ -719,7 +719,8 @@ export class MessageSession extends LinkEntity { this._context, context.message!, context.delivery!, - true + true, + this.receiveMode ); try { diff --git a/sdk/servicebus/service-bus/test/receiveAndDeleteMode.spec.ts b/sdk/servicebus/service-bus/test/receiveAndDeleteMode.spec.ts index f1cbbe3809b9..35d73574f635 100644 --- a/sdk/servicebus/service-bus/test/receiveAndDeleteMode.spec.ts +++ b/sdk/servicebus/service-bus/test/receiveAndDeleteMode.spec.ts @@ -328,13 +328,16 @@ describe("receive and delete", () => { }); describe("Receive Deferred messages in ReceiveAndDelete mode", function(): void { - let sequenceNumber: Long; + let entityNames: EntityName; afterEach(async () => { await afterEachTest(); }); - async function deferMessage(useSessions?: boolean): Promise { - const testMessages = useSessions ? TestMessage.getSessionSample() : TestMessage.getSample(); + async function deferMessage(testClientType: TestClientType): Promise { + entityNames = await beforeEachTest(testClientType, "peekLock"); + const testMessages = entityNames.usesSessions + ? TestMessage.getSessionSample() + : TestMessage.getSample(); await sender.sendMessages(testMessages); const batch = await receiver.receiveMessages(1); const msgs = batch; @@ -349,11 +352,15 @@ describe("receive and delete", () => { ); should.equal(msgs[0].deliveryCount, 0, "DeliveryCount is different than expected"); - sequenceNumber = msgs[0].sequenceNumber!; await (msgs[0] as ReceivedMessageWithLock).defer(); + return msgs[0].sequenceNumber!; } - async function receiveDeferredMessage(): Promise { + async function testDeferredMessage(testClientType: TestClientType): Promise { + const sequenceNumber = await deferMessage(testClientType); + await receiver.close(); + receiver = await serviceBusClient.test.getReceiveAndDeleteReceiver(entityNames); + const [deferredMsg] = await receiver.receiveDeferredMessages(sequenceNumber); if (!deferredMsg) { throw `No message received for sequence number ${sequenceNumber}`; @@ -361,14 +368,8 @@ describe("receive and delete", () => { should.equal(deferredMsg!.deliveryCount, 1, "DeliveryCount is different than expected"); await testPeekMsgsLength(receiver, 0); - } - async function deferAndReceiveMessage(testClientType: TestClientType) { - const entityNames = await beforeEachTest(testClientType, "peekLock"); - await deferMessage(entityNames.usesSessions); - await receiver.close(); - receiver = await serviceBusClient.test.getReceiveAndDeleteReceiver(entityNames); - await receiveDeferredMessage(); + return deferredMsg; } /* @@ -376,33 +377,155 @@ describe("receive and delete", () => { it("Partitioned Queue: No settlement of the message removes message", async function(): Promise< void > { - await deferAndReceiveMessage(TestClientType.PartitionedQueue); + await testDeferredMessage(TestClientType.PartitionedQueue); }); it("Partitioned Subscription: No settlement of the message removes message", async function(): Promise< void > { - await deferAndReceiveMessage(TestClientType.PartitionedSubscription); + await testDeferredMessage(TestClientType.PartitionedSubscription); }); */ it("Unpartitioned Queue: No settlement of the message removes message", async function(): Promise< void > { - await deferAndReceiveMessage(TestClientType.UnpartitionedQueue); + await testDeferredMessage(TestClientType.UnpartitionedQueue); }); it("Unpartitioned Subscription: No settlement of the message removes message", async function(): Promise< void > { - await deferAndReceiveMessage(TestClientType.UnpartitionedSubscription); + await testDeferredMessage(TestClientType.UnpartitionedSubscription); }); it( withSessionTestClientType + ": No settlement of the message removes message", async function(): Promise { - await deferAndReceiveMessage(withSessionTestClientType); + await testDeferredMessage(withSessionTestClientType); } ); }); + + describe("Settlement of deferred msg in ReceiveAndDelete mode", () => { + afterEach(async () => { + await afterEachTest(); + }); + + let entityNames: EntityName; + + async function testDeferredMessage(testClientType: TestClientType): Promise { + entityNames = await beforeEachTest(testClientType, "peekLock"); + + // send message + const testMessage = entityNames.usesSessions + ? TestMessage.getSessionSample() + : TestMessage.getSample(); + await sender.sendMessages(testMessage); + + // receive and defer the message + const [msg] = await receiver.receiveMessages(1); + await (msg as ReceivedMessageWithLock).defer(); + const sequenceNumber = msg.sequenceNumber!; + await receiver.close(); + + // Receive the deferred message in ReceiveAndDelete mode + receiver = await serviceBusClient.test.getReceiveAndDeleteReceiver(entityNames); + const [deferredMsg] = await receiver.receiveDeferredMessages(sequenceNumber); + if (!deferredMsg) { + throw `No message received for sequence number ${sequenceNumber}`; + } + + return deferredMsg; + } + + const testError = (err: Error, operation: DispositionType): void => { + expect(err.message.toLowerCase(), "ErrorMessage is different than expected").includes( + `failed to ${operation} the message as the operation is only supported in \'peeklock\' receive mode.` + ); + }; + + async function testSettlement( + testClienttype: TestClientType, + operation: DispositionType + ): Promise { + const deferredMsg = await testDeferredMessage(testClienttype); + // we have to force this cast - the type system doesn't allow this if you've chosen receiveAndDelete + // as your lock mode. + const msg = deferredMsg as ReceivedMessageWithLock; + + try { + if (operation === DispositionType.complete) { + await msg.complete(); + } else if (operation === DispositionType.abandon) { + await msg.abandon(); + } else if (operation === DispositionType.deadletter) { + await msg.deadLetter(); + } else if (operation === DispositionType.defer) { + await msg.defer(); + } + } catch (err) { + errorWasThrown = true; + testError(err, operation); + } + + should.equal(errorWasThrown, true, "Error thrown flag must be true"); + } + + it(noSessionTestClientType + ": complete() throws error", async function(): Promise { + await testSettlement(noSessionTestClientType, DispositionType.complete); + }); + + it(withSessionTestClientType + ": complete() throws error", async function(): Promise { + await testSettlement(withSessionTestClientType, DispositionType.complete); + }); + + it(noSessionTestClientType + ": abandon() throws error", async function(): Promise { + await testSettlement(noSessionTestClientType, DispositionType.abandon); + }); + + it(withSessionTestClientType + ": abandon() throws error", async function(): Promise { + await testSettlement(withSessionTestClientType, DispositionType.abandon); + }); + + it(noSessionTestClientType + ": defer() throws error", async function(): Promise { + await testSettlement(noSessionTestClientType, DispositionType.defer); + }); + + it(withSessionTestClientType + ": defer() throws error", async function(): Promise { + await testSettlement(withSessionTestClientType, DispositionType.defer); + }); + + it(noSessionTestClientType + ": deadLetter() throws error", async function(): Promise { + await testSettlement(noSessionTestClientType, DispositionType.deadletter); + }); + + it(withSessionTestClientType + ": deadLetter() throws error", async function(): Promise { + await testSettlement(withSessionTestClientType, DispositionType.deadletter); + }); + + async function testRenewLock(testClienttype: TestClientType): Promise { + const deferredMsg = await testDeferredMessage(testClienttype); + // we have to force this cast - the type system doesn't allow this if you've chosen receiveAndDelete + // as your lock mode. + + // have to cast it - the type system doesn't allow us to call into this method otherwise. + await (deferredMsg as ReceivedMessageWithLock).renewLock().catch((err) => { + should.equal( + err.message, + getErrorMessageNotSupportedInReceiveAndDeleteMode("renew the lock on the message"), + "ErrorMessage is different than expected" + ); + errorWasThrown = true; + }); + + should.equal(errorWasThrown, true, "Error thrown flag must be true"); + } + + it(noSessionTestClientType + ": Renew message lock throws error", async function(): Promise< + void + > { + await testRenewLock(noSessionTestClientType); + }); + }); });