diff --git a/test/batchReceiver.spec.ts b/test/batchReceiver.spec.ts index 6bb058336a48..bd31625299ce 100644 --- a/test/batchReceiver.spec.ts +++ b/test/batchReceiver.spec.ts @@ -35,7 +35,11 @@ async function testPeekMsgsLength( expectedPeekLength: number ): Promise { const peekedMsgs = await client.peek(expectedPeekLength + 1); - should.equal(peekedMsgs.length, expectedPeekLength); + should.equal( + peekedMsgs.length, + expectedPeekLength, + "Unexpected number of msgs found when peeking" + ); } const maxDeliveryCount = 10; @@ -156,7 +160,7 @@ describe("Complete/Abandon/Defer/Deadletter normal message", () => { await testComplete(queueClient, queueClient); }); - it("Queue: complete() removes message", async function(): Promise { + it("Subscription: complete() removes message", async function(): Promise { await testComplete(topicClient, subscriptionClient); }); @@ -172,13 +176,13 @@ describe("Complete/Abandon/Defer/Deadletter normal message", () => { await completeMessages(receiverClient, 1); } - it("Queue: Abandoned message is retained with incremented deliveryCount", async function(): Promise< + it("Queue: abandon() retains message with incremented deliveryCount", async function(): Promise< void > { await testAbandon(queueClient, queueClient); }); - it("Subscription: Abandoned message is retained with incremented deliveryCount", async function(): Promise< + it("Subscription: abandon() retains message with incremented deliveryCount", async function(): Promise< void > { await testAbandon(topicClient, subscriptionClient); @@ -209,13 +213,11 @@ describe("Complete/Abandon/Defer/Deadletter normal message", () => { await testPeekMsgsLength(receiverClient, 0); } - it("Queue: Receive deferred message from queue/subscription", async function(): Promise { + it("Queue: defer() moves message to deferred queue", async function(): Promise { await testDefer(queueClient, queueClient); }); - it("Subscription: Receive deferred message from queue/subscription", async function(): Promise< - void - > { + it("Subscription: defer() moves message to deferred queue", async function(): Promise { await testDefer(topicClient, subscriptionClient); }); @@ -232,11 +234,11 @@ describe("Complete/Abandon/Defer/Deadletter normal message", () => { await completeMessages(deadLetterClient, 0); } - it("Queue: Receive dead letter message from queue/subscription", async function(): Promise { + it("Queue: deadLetter() moves message to deadletter queue", async function(): Promise { await testDeadletter(queueClient, queueClient, deadletterQueueClient); }); - it("Subscription: Receive dead letter message from queue/subscription", async function(): Promise< + it("Subscription: deadLetter() moves message to deadletter queue", async function(): Promise< void > { await testDeadletter(topicClient, subscriptionClient, deadletterSubscriptionClient); @@ -601,7 +603,7 @@ describe("Multiple ReceiveBatch calls", () => { }); }); -describe("Other ReceiveBatch Tests", function(): void { +describe("Batching Receiver Misc Tests", function(): void { beforeEach(async () => { await beforeEachTest(); }); diff --git a/test/streamingReceiver.spec.ts b/test/streamingReceiver.spec.ts index f13a742aa7d9..39ba851120d0 100644 --- a/test/streamingReceiver.spec.ts +++ b/test/streamingReceiver.spec.ts @@ -31,81 +31,104 @@ const testMessages: SendableMessageInfo[] = [ } ]; -function testReceivedMessages(receivedMsgs: ServiceBusMessage[]): void { - should.equal(receivedMsgs.length, 2); - should.equal(receivedMsgs[0].body, testMessages[0].body); - should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); - should.equal(receivedMsgs[1].body, testMessages[1].body); - should.equal(receivedMsgs[1].messageId, testMessages[1].messageId); -} - async function testPeekMsgsLength( client: QueueClient | SubscriptionClient, expectedPeekLength: number ): Promise { const peekedMsgs = await client.peek(expectedPeekLength + 1); - should.equal(peekedMsgs.length, expectedPeekLength); + should.equal( + peekedMsgs.length, + expectedPeekLength, + "Unexpected number of msgs found when peeking" + ); } const maxDeliveryCount = 10; +let namespace: Namespace; +let queueClient: QueueClient; +let topicClient: TopicClient; +let subscriptionClient: SubscriptionClient; +let deadletterQueueClient: QueueClient; +let deadletterSubscriptionClient: SubscriptionClient; + +async function beforeEachTest(): Promise { + // The tests in this file expect the env variables to contain the connection string and + // the names of empty queue/topic/subscription that are to be tested + + if (!process.env.SERVICEBUS_CONNECTION_STRING) { + throw new Error( + "Define SERVICEBUS_CONNECTION_STRING in your environment before running integration tests." + ); + } + if (!process.env.TOPIC_NAME) { + throw new Error("Define TOPIC_NAME in your environment before running integration tests."); + } + if (!process.env.QUEUE_NAME) { + throw new Error("Define QUEUE_NAME in your environment before running integration tests."); + } + if (!process.env.SUBSCRIPTION_NAME) { + throw new Error( + "Define SUBSCRIPTION_NAME in your environment before running integration tests." + ); + } -describe("Streaming Receiver from Queue/Subscription", function(): void { - let namespace: Namespace; - let queueClient: QueueClient; - let topicClient: TopicClient; - let subscriptionClient: SubscriptionClient; - - beforeEach(async () => { - // The tests in this file expect the env variables to contain the connection string and - // the names of empty queue/topic/subscription that are to be tested + namespace = Namespace.createFromConnectionString(process.env.SERVICEBUS_CONNECTION_STRING); + queueClient = namespace.createQueueClient(process.env.QUEUE_NAME); + topicClient = namespace.createTopicClient(process.env.TOPIC_NAME); + subscriptionClient = namespace.createSubscriptionClient( + process.env.TOPIC_NAME, + process.env.SUBSCRIPTION_NAME + ); + deadletterQueueClient = namespace.createQueueClient( + Namespace.getDeadLetterQueuePathForQueue(queueClient.name) + ); + deadletterSubscriptionClient = namespace.createSubscriptionClient( + Namespace.getDeadLetterSubcriptionPathForSubcription( + topicClient.name, + subscriptionClient.subscriptionName + ), + subscriptionClient.subscriptionName + ); - if (!process.env.SERVICEBUS_CONNECTION_STRING) { - throw new Error( - "Define SERVICEBUS_CONNECTION_STRING in your environment before running integration tests." - ); - } - if (!process.env.TOPIC_NAME) { - throw new Error("Define TOPIC_NAME in your environment before running integration tests."); - } - if (!process.env.QUEUE_NAME) { - throw new Error("Define QUEUE_NAME in your environment before running integration tests."); - } - if (!process.env.SUBSCRIPTION_NAME) { - throw new Error( - "Define SUBSCRIPTION_NAME in your environment before running integration tests." - ); - } + const peekedQueueMsg = await queueClient.peek(); + if (peekedQueueMsg.length) { + throw new Error("Please use an empty queue for integration testing"); + } - namespace = Namespace.createFromConnectionString(process.env.SERVICEBUS_CONNECTION_STRING); - queueClient = namespace.createQueueClient(process.env.QUEUE_NAME); - topicClient = namespace.createTopicClient(process.env.TOPIC_NAME); - subscriptionClient = namespace.createSubscriptionClient( - process.env.TOPIC_NAME, - process.env.SUBSCRIPTION_NAME - ); + const peekedSubscriptionMsg = await subscriptionClient.peek(); + if (peekedSubscriptionMsg.length) { + throw new Error("Please use an empty Subscription for integration testing"); + } +} - const peekedQueueMsg = await queueClient.peek(); - if (peekedQueueMsg.length) { - throw new Error("Please use an empty queue for integration testing"); - } +async function afterEachTest(): Promise { + await namespace.close(); +} - const peekedSubscriptionMsg = await subscriptionClient.peek(); - if (peekedSubscriptionMsg.length) { - throw new Error("Please use an empty Subscription for integration testing"); - } +describe("Streaming Receiver Misc Tests", function(): void { + beforeEach(async () => { + await beforeEachTest(); }); afterEach(async () => { - return namespace.close(); + await afterEachTest(); }); - it("AutoComplete removes the message from Queue", async function(): Promise { - await queueClient.sendBatch(testMessages); + async function testAutoComplete( + senderClient: QueueClient | TopicClient, + receiverClient: QueueClient | SubscriptionClient + ): Promise { + await senderClient.sendBatch(testMessages); + await testPeekMsgsLength(receiverClient, testMessages.length); const receivedMsgs: ServiceBusMessage[] = []; - const receiveListener = queueClient.receive( + const receiveListener = receiverClient.receive( (msg: ServiceBusMessage) => { receivedMsgs.push(msg); + should.equal( + testMessages.some((x) => msg.body === x.body && msg.messageId === x.messageId), + true + ); return Promise.resolve(); }, (err: Error) => { @@ -113,45 +136,39 @@ describe("Streaming Receiver from Queue/Subscription", function(): void { } ); - await delay(1000); - - testReceivedMessages(receivedMsgs); + for (let i = 0; i < 5; i++) { + await delay(1000); + if (receivedMsgs.length === testMessages.length) { + break; + } + } await receiveListener.stop(); - await testPeekMsgsLength(queueClient, 0); + await testPeekMsgsLength(receiverClient, 0); + } + + it("AutoComplete removes the message from Queue", async function(): Promise { + await testAutoComplete(queueClient, queueClient); }); it("AutoComplete removes the message from Subscription", async function(): Promise { - await topicClient.sendBatch(testMessages); - - const receivedMsgs: ServiceBusMessage[] = []; - const receiveListener = subscriptionClient.receive( - (msg: ServiceBusMessage) => { - receivedMsgs.push(msg); - return Promise.resolve(); - }, - (err: Error) => { - should.not.exist(err); - } - ); - - await delay(1000); - - testReceivedMessages(receivedMsgs); - - await receiveListener.stop(); - await testPeekMsgsLength(subscriptionClient, 0); + await testAutoComplete(topicClient, subscriptionClient); }); - it("Disabled autoComplete, no manual complete retains the message in Queue", async function(): Promise< - void - > { - await queueClient.sendBatch(testMessages); + async function testManualComplete( + senderClient: QueueClient | TopicClient, + receiverClient: QueueClient | SubscriptionClient + ): Promise { + await senderClient.sendBatch(testMessages); const receivedMsgs: ServiceBusMessage[] = []; - const receiveListener = queueClient.receive( + const receiveListener = receiverClient.receive( (msg: ServiceBusMessage) => { receivedMsgs.push(msg); + should.equal( + testMessages.some((x) => msg.body === x.body && msg.messageId === x.messageId), + true + ); return Promise.resolve(); }, (err: Error) => { @@ -160,106 +177,43 @@ describe("Streaming Receiver from Queue/Subscription", function(): void { { autoComplete: false } ); - await delay(1000); - - testReceivedMessages(receivedMsgs); + for (let i = 0; i < 5; i++) { + await delay(1000); + if (receivedMsgs.length === testMessages.length) { + break; + } + } - await testPeekMsgsLength(queueClient, 2); + await testPeekMsgsLength(receiverClient, 2); await receivedMsgs[0].complete(); await receivedMsgs[1].complete(); await receiveListener.stop(); - }); + } - it("Disabled autoComplete, no manual complete retains the message in Subscription", async function(): Promise< + it("Disabled autoComplete, no manual complete retains the message in Queue", async function(): Promise< void > { - await topicClient.sendBatch(testMessages); - - const receivedMsgs: ServiceBusMessage[] = []; - const receiveListener = subscriptionClient.receive( - (msg: ServiceBusMessage) => { - receivedMsgs.push(msg); - return Promise.resolve(); - }, - (err: Error) => { - should.not.exist(err); - }, - { autoComplete: false } - ); - - await delay(1000); - - testReceivedMessages(receivedMsgs); - - await testPeekMsgsLength(subscriptionClient, 2); - - await receivedMsgs[0].complete(); - await receivedMsgs[1].complete(); - await receiveListener.stop(); + await testManualComplete(queueClient, queueClient); }); - it("Disabled autoComplete, manual complete removes the message from Queue", async function(): Promise< + it("Disabled autoComplete, no manual complete retains the message in Subscription", async function(): Promise< void > { - await queueClient.sendBatch(testMessages); - - const receivedMsgs: ServiceBusMessage[] = []; - const receiveListener = queueClient.receive( - (msg: ServiceBusMessage) => { - receivedMsgs.push(msg); - return msg.complete(); - }, - (err: Error) => { - should.not.exist(err); - }, - { autoComplete: false } - ); - - await delay(1000); - - testReceivedMessages(receivedMsgs); - - await testPeekMsgsLength(queueClient, 0); - - await receiveListener.stop(); + await testManualComplete(topicClient, subscriptionClient); }); - it("Disabled autoComplete, manual complete removes the message from Subscription", async function(): Promise< - void - > { - await topicClient.sendBatch(testMessages); - - const receivedMsgs: ServiceBusMessage[] = []; - const receiveListener = subscriptionClient.receive( - (msg: ServiceBusMessage) => { - receivedMsgs.push(msg); - return msg.complete(); - }, - (err: Error) => { - should.not.exist(err); - }, - { autoComplete: false } - ); - - await delay(1000); - - testReceivedMessages(receivedMsgs); - - await testPeekMsgsLength(subscriptionClient, 0); - - await receiveListener.stop(); - }); - - it("Abandoned message is retained in the Queue with incremented deliveryCount. After 10 times, you can only get it from the dead letter queue.", async function(): Promise< - void - > { - await queueClient.sendBatch(testMessages); + async function testMultipleAbandons( + senderClient: QueueClient | TopicClient, + receiverClient: QueueClient | SubscriptionClient, + deadletterClient: QueueClient | SubscriptionClient + ): Promise { + await senderClient.sendBatch(testMessages); let checkDeliveryCount0 = 0; let checkDeliveryCount1 = 0; - const receiveListener = await queueClient.receive( + const receiveListener = await receiverClient.receive( (msg: ServiceBusMessage) => { if (msg.messageId === testMessages[0].messageId) { should.equal(msg.deliveryCount, checkDeliveryCount0); @@ -283,155 +237,111 @@ describe("Streaming Receiver from Queue/Subscription", function(): void { should.equal(checkDeliveryCount0, maxDeliveryCount); should.equal(checkDeliveryCount1, maxDeliveryCount); - await testPeekMsgsLength(queueClient, 0); // No messages in the queue + await testPeekMsgsLength(receiverClient, 0); // No messages in the queue - const deadLetterQueuePath = Namespace.getDeadLetterQueuePathForQueue(queueClient.name); - const deadletterQueueClient = namespace.createQueueClient(deadLetterQueuePath); - - const deadLetterMsgs = await deadletterQueueClient.receiveBatch(2); + const deadLetterMsgs = await deadletterClient.receiveBatch(2); should.equal(Array.isArray(deadLetterMsgs), true); - should.equal(deadLetterMsgs.length, 2); + should.equal(deadLetterMsgs.length, testMessages.length); should.equal(deadLetterMsgs[0].deliveryCount, maxDeliveryCount); should.equal(deadLetterMsgs[1].deliveryCount, maxDeliveryCount); - should.equal(deadLetterMsgs[0].messageId, testMessages[0].messageId); - should.equal(deadLetterMsgs[1].messageId, testMessages[1].messageId); + should.equal(testMessages.some((x) => deadLetterMsgs[0].messageId === x.messageId), true); + should.equal(testMessages.some((x) => deadLetterMsgs[1].messageId === x.messageId), true); await deadLetterMsgs[0].complete(); await deadLetterMsgs[1].complete(); - await testPeekMsgsLength(deadletterQueueClient, 0); - }); + await testPeekMsgsLength(deadletterClient, 0); + } - it("Abandoned message is retained in the Subsrciption with incremented deliveryCount. After 10 times, you can only get it from the dead letter.", async function(): Promise< + it("Abandoned message is retained in the Queue with incremented deliveryCount. After 10 times, you can only get it from the dead letter queue.", async function(): Promise< void > { - await topicClient.sendBatch(testMessages); - - let checkDeliveryCount0 = 0; - let checkDeliveryCount1 = 0; - const receiveListener = await subscriptionClient.receive( - (msg: ServiceBusMessage) => { - if (msg.messageId === testMessages[0].messageId) { - should.equal(msg.deliveryCount, checkDeliveryCount0); - checkDeliveryCount0++; - } else if (msg.messageId === testMessages[1].messageId) { - should.equal(msg.deliveryCount, checkDeliveryCount1); - checkDeliveryCount1++; - } - return msg.abandon(); - }, - (err: Error) => { - should.not.exist(err); - }, - { autoComplete: false } - ); - - await delay(4000); - - await receiveListener.stop(); - - should.equal(checkDeliveryCount0, maxDeliveryCount); - should.equal(checkDeliveryCount1, maxDeliveryCount); - - const peekedMsgs = await subscriptionClient.peek(2); - should.equal(peekedMsgs.length, 0); - - const deadLetterSubscriptionPath = Namespace.getDeadLetterSubcriptionPathForSubcription( - topicClient.name, - subscriptionClient.subscriptionName - ); - - const deadletterSubscriptionClient = namespace.createSubscriptionClient( - deadLetterSubscriptionPath ? deadLetterSubscriptionPath : "", - subscriptionClient.subscriptionName - ); - - await testPeekMsgsLength(deadletterSubscriptionClient, 2); // Two messages in the DL - - const deadLetterMsgs = await deadletterSubscriptionClient.receiveBatch(2); - should.equal(Array.isArray(deadLetterMsgs), true); - should.equal(deadLetterMsgs.length, 2); - should.equal(deadLetterMsgs[0].deliveryCount, maxDeliveryCount); - should.equal(deadLetterMsgs[1].deliveryCount, maxDeliveryCount); - should.equal(deadLetterMsgs[0].messageId, testMessages[0].messageId); - should.equal(deadLetterMsgs[1].messageId, testMessages[1].messageId); - - await deadLetterMsgs[0].complete(); - await deadLetterMsgs[1].complete(); - - await testPeekMsgsLength(deadletterSubscriptionClient, 0); + await testMultipleAbandons(queueClient, queueClient, deadletterQueueClient); }); - it("With auto-complete enabled, manual completion in the Queue by the user should not result in errors", async function(): Promise< + it("Abandoned message is retained in the Subsrciption with incremented deliveryCount. After 10 times, you can only get it from the dead letter.", async function(): Promise< void > { - await queueClient.sendBatch(testMessages); - await testPeekMsgsLength(queueClient, 2); - const receiveListener = await queueClient.receive( - (msg: ServiceBusMessage) => { - return msg.complete(); - }, - (err: Error) => { - should.not.exist(err); - } - ); + await testMultipleAbandons(topicClient, subscriptionClient, deadletterSubscriptionClient); + }); +}); - await delay(4000); - await receiveListener.stop(); +describe("Complete message", function(): void { + beforeEach(async () => { + await beforeEachTest(); + }); - await testPeekMsgsLength(queueClient, 0); + afterEach(async () => { + await afterEachTest(); }); - it("With auto-complete enabled, manual completion in the Subscription by the user should not result in errors", async function(): Promise< - void - > { - await topicClient.sendBatch(testMessages); + async function testComplete( + senderClient: QueueClient | TopicClient, + receiverClient: QueueClient | SubscriptionClient, + autoComplete: boolean + ): Promise { + await senderClient.sendBatch(testMessages); - const receiveListener = await subscriptionClient.receive( + const receivedMsgs: ServiceBusMessage[] = []; + const receiveListener = receiverClient.receive( (msg: ServiceBusMessage) => { + receivedMsgs.push(msg); + should.equal( + testMessages.some((x) => msg.body === x.body && msg.messageId === x.messageId), + true + ); return msg.complete(); }, (err: Error) => { should.not.exist(err); - } + }, + { autoComplete } ); - await delay(4000); + for (let i = 0; i < 5; i++) { + await delay(1000); + if (receivedMsgs.length === testMessages.length) { + break; + } + } + + await testPeekMsgsLength(receiverClient, 0); await receiveListener.stop(); + } + it("Queue: complete() removes message", async function(): Promise { + await testComplete(queueClient, queueClient, false); + }); - await testPeekMsgsLength(subscriptionClient, 0); + it("Subscription: complete() removes message", async function(): Promise { + await testComplete(topicClient, subscriptionClient, false); }); - it("With auto-complete enabled, manual abandon in the Queue by the user should not result in errors", async function(): Promise< - void - > { - await queueClient.send(testMessages[0]); - const receiveListener: ReceiveHandler = await queueClient.receive( - (msg: ServiceBusMessage) => { - return msg.abandon().then(() => { - return receiveListener.stop(); - }); - }, - (err: Error) => { - should.not.exist(err); - }, - { maxAutoRenewDurationInSeconds: 0 } - ); - await delay(4000); + it("Queue with autoComplete: complete() removes message", async function(): Promise { + await testComplete(queueClient, queueClient, true); + }); - const receivedMsgs = await queueClient.receiveBatch(1); - should.equal(receivedMsgs.length, 1); - should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); - await receivedMsgs[0].complete(); - await testPeekMsgsLength(queueClient, 0); + it("Subscription with autoComplete: complete() removes message", async function(): Promise { + await testComplete(topicClient, subscriptionClient, true); }); +}); - it("With auto-complete enabled, manual abandon in the Subscription by the user should not result in errors", async function(): Promise< - void - > { - await topicClient.send(testMessages[0]); - const receiveListener: ReceiveHandler = await subscriptionClient.receive( +describe("Abandon message", function(): void { + beforeEach(async () => { + await beforeEachTest(); + }); + + afterEach(async () => { + await afterEachTest(); + }); + + async function testAbandon( + senderClient: QueueClient | TopicClient, + receiverClient: QueueClient | SubscriptionClient, + autoComplete: boolean + ): Promise { + await senderClient.send(testMessages[0]); + const receiveListener: ReceiveHandler = await receiverClient.receive( (msg: ServiceBusMessage) => { return msg.abandon().then(() => { return receiveListener.stop(); @@ -440,190 +350,61 @@ describe("Streaming Receiver from Queue/Subscription", function(): void { (err: Error) => { should.not.exist(err); }, - { maxAutoRenewDurationInSeconds: 0 } + { maxAutoRenewDurationInSeconds: 0, autoComplete } ); - await delay(4000); - const receivedMsgs = await subscriptionClient.receiveBatch(1); + const receivedMsgs = await receiverClient.receiveBatch(1); should.equal(receivedMsgs.length, 1); should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); + // should.equal(receivedMsgs[0].deliveryCount, 1); await receivedMsgs[0].complete(); - await testPeekMsgsLength(subscriptionClient, 0); - }); - - it("With auto-complete enabled, manual deadletter in the Queue by the user should not result in errors", async function(): Promise< + await testPeekMsgsLength(receiverClient, 0); + } + it("Queue: abandon() retains message with incremented deliveryCount", async function(): Promise< void > { - await queueClient.sendBatch(testMessages); - await testPeekMsgsLength(queueClient, 2); - const receiveListener = await queueClient.receive( - (msg: ServiceBusMessage) => { - return msg.deadLetter(); - }, - (err: Error) => { - should.not.exist(err); - } - ); - - await delay(4000); - await receiveListener.stop(); - - await testPeekMsgsLength(queueClient, 0); - - const deadLetterQueuePath = Namespace.getDeadLetterQueuePathForQueue(queueClient.name); - const deadletterQueueClient = namespace.createQueueClient(deadLetterQueuePath); - - const deadLetterMsgs = await deadletterQueueClient.receiveBatch(2); - should.equal(Array.isArray(deadLetterMsgs), true); - should.equal(deadLetterMsgs.length, 2); - should.equal(deadLetterMsgs[0].messageId, testMessages[0].messageId); - should.equal(deadLetterMsgs[1].messageId, testMessages[1].messageId); - - await deadLetterMsgs[0].complete(); - await deadLetterMsgs[1].complete(); - - await testPeekMsgsLength(deadletterQueueClient, 0); + await testAbandon(queueClient, queueClient, false); }); - it("With auto-complete enabled, manual deadletter in the Subscription by the user should not result in errors", async function(): Promise< + it("Subscription: abandon() retains message with incremented deliveryCount", async function(): Promise< void > { - await topicClient.sendBatch(testMessages); - - const receiveListener = await subscriptionClient.receive( - (msg: ServiceBusMessage) => { - return msg.deadLetter(); - }, - (err: Error) => { - should.not.exist(err); - } - ); - - await delay(4000); - - await receiveListener.stop(); - - await testPeekMsgsLength(subscriptionClient, 0); - const deadLetterSubscriptionPath = Namespace.getDeadLetterSubcriptionPathForSubcription( - topicClient.name, - subscriptionClient.subscriptionName - ); - - const deadletterSubscriptionClient = namespace.createSubscriptionClient( - deadLetterSubscriptionPath ? deadLetterSubscriptionPath : "", - subscriptionClient.subscriptionName - ); - - await testPeekMsgsLength(deadletterSubscriptionClient, 2); // Two messages in the DL - - const deadLetterMsgs = await deadletterSubscriptionClient.receiveBatch(2); - should.equal(Array.isArray(deadLetterMsgs), true); - should.equal(deadLetterMsgs.length, 2); - should.equal(deadLetterMsgs[0].messageId, testMessages[0].messageId); - should.equal(deadLetterMsgs[1].messageId, testMessages[1].messageId); - - await deadLetterMsgs[0].complete(); - await deadLetterMsgs[1].complete(); - - await testPeekMsgsLength(deadletterSubscriptionClient, 0); + await testAbandon(topicClient, subscriptionClient, false); }); - it("With auto-complete enabled, manual defer in the Queue by the user should not result in errors", async function(): Promise< + it("Queue with autoComplete: abandon() retains message with incremented deliveryCount", async function(): Promise< void > { - await queueClient.sendBatch(testMessages); - - let seq0: any = 0; - let seq1: any = 0; - const receiveListener = await queueClient.receive( - (msg: ServiceBusMessage) => { - if (msg.messageId === testMessages[0].messageId) { - seq0 = msg.sequenceNumber; - } else if (msg.messageId === testMessages[1].messageId) { - seq1 = msg.sequenceNumber; - } - return msg.defer(); - }, - (err: Error) => { - should.not.exist(err); - } - ); - - await delay(4000); - - await receiveListener.stop(); - const deferredMsg0 = await queueClient.receiveDeferredMessage(seq0); - const deferredMsg1 = await queueClient.receiveDeferredMessage(seq1); - if (!deferredMsg0) { - throw "No message received for sequence number"; - } - if (!deferredMsg1) { - throw "No message received for sequence number"; - } - should.equal(deferredMsg0.body, testMessages[0].body); - should.equal(deferredMsg0.messageId, testMessages[0].messageId); - - should.equal(deferredMsg1.body, testMessages[1].body); - should.equal(deferredMsg1.messageId, testMessages[1].messageId); - await deferredMsg0.complete(); - await deferredMsg1.complete(); - - await testPeekMsgsLength(queueClient, 0); + await testAbandon(queueClient, queueClient, true); }); - it("With auto-complete enabled, manual defer in the Subscription by the user should not result in errors", async function(): Promise< + it("Subscription with autoComplete: abandon() retains message with incremented deliveryCount", async function(): Promise< void > { - await topicClient.sendBatch(testMessages); - - let seq0: any = 0; - let seq1: any = 0; - const receiveListener = await subscriptionClient.receive( - (msg: ServiceBusMessage) => { - if (msg.messageId === testMessages[0].messageId) { - seq0 = msg.sequenceNumber; - } else if (msg.messageId === testMessages[1].messageId) { - seq1 = msg.sequenceNumber; - } - return msg.defer(); - }, - (err: Error) => { - should.not.exist(err); - } - ); - - await delay(4000); - - await receiveListener.stop(); - - const deferredMsg0 = await subscriptionClient.receiveDeferredMessage(seq0); - const deferredMsg1 = await subscriptionClient.receiveDeferredMessage(seq1); - if (!deferredMsg0) { - throw "No message received for sequence number"; - } - if (!deferredMsg1) { - throw "No message received for sequence number"; - } - should.equal(deferredMsg0.body, testMessages[0].body); - should.equal(deferredMsg0.messageId, testMessages[0].messageId); + await testAbandon(topicClient, subscriptionClient, true); + }); +}); - should.equal(deferredMsg1.body, testMessages[1].body); - should.equal(deferredMsg1.messageId, testMessages[1].messageId); - await deferredMsg0.complete(); - await deferredMsg1.complete(); +describe("Defer message", function(): void { + beforeEach(async () => { + await beforeEachTest(); + }); - await testPeekMsgsLength(subscriptionClient, 0); + afterEach(async () => { + await afterEachTest(); }); - it("With auto-complete disabled, deferring a message results in not getting the same message again from queue. The message is then gotten using receiveDefferedMessages", async function(): Promise< - void - > { - await queueClient.sendBatch(testMessages); + async function testDefer( + senderClient: QueueClient | TopicClient, + receiverClient: QueueClient | SubscriptionClient, + autoComplete: boolean + ): Promise { + await senderClient.sendBatch(testMessages); let seq0: any = 0; let seq1: any = 0; - const receiveListener = await queueClient.receive( + const receiveListener = await receiverClient.receive( (msg: ServiceBusMessage) => { if (msg.messageId === testMessages[0].messageId) { seq0 = msg.sequenceNumber; @@ -635,14 +416,14 @@ describe("Streaming Receiver from Queue/Subscription", function(): void { (err: Error) => { should.not.exist(err); }, - { autoComplete: false } + { autoComplete } ); await delay(4000); await receiveListener.stop(); - const deferredMsg0 = await queueClient.receiveDeferredMessage(seq0); - const deferredMsg1 = await queueClient.receiveDeferredMessage(seq1); + const deferredMsg0 = await receiverClient.receiveDeferredMessage(seq0); + const deferredMsg1 = await receiverClient.receiveDeferredMessage(seq1); if (!deferredMsg0) { throw "No message received for sequence number"; } @@ -657,137 +438,109 @@ describe("Streaming Receiver from Queue/Subscription", function(): void { await deferredMsg0.complete(); await deferredMsg1.complete(); - await testPeekMsgsLength(queueClient, 0); + await testPeekMsgsLength(receiverClient, 0); + } + it("Queue: defer() moves message to deferred queue", async function(): Promise { + await testDefer(queueClient, queueClient, false); + }); + + it("Subscription: defer() moves message to deferred queue", async function(): Promise { + await testDefer(topicClient, subscriptionClient, false); }); - it("With auto-complete disabled, deferring a message results in not getting the same message again from subscription. The message is then gotten using receiveDefferedMessages", async function(): Promise< + it("Queue with autoComplete: defer() moves message to deferred queue", async function(): Promise< void > { - await topicClient.sendBatch(testMessages); - - let seq0: any = 0; - let seq1: any = 0; - const receiveListener = await subscriptionClient.receive( - (msg: ServiceBusMessage) => { - if (msg.messageId === testMessages[0].messageId) { - seq0 = msg.sequenceNumber; - } else if (msg.messageId === testMessages[1].messageId) { - seq1 = msg.sequenceNumber; - } - return msg.defer(); - }, - (err: Error) => { - should.not.exist(err); - }, - { autoComplete: false } - ); - - await delay(4000); - - await receiveListener.stop(); + await testDefer(queueClient, queueClient, true); + }); - const deferredMsg0 = await subscriptionClient.receiveDeferredMessage(seq0); - const deferredMsg1 = await subscriptionClient.receiveDeferredMessage(seq1); - if (!deferredMsg0) { - throw "No message received for sequence number"; - } - if (!deferredMsg1) { - throw "No message received for sequence number"; - } - should.equal(deferredMsg0.body, testMessages[0].body); - should.equal(deferredMsg0.messageId, testMessages[0].messageId); + it("Subscription with autoComplete: defer() moves message to deferred queue", async function(): Promise< + void + > { + await testDefer(topicClient, subscriptionClient, true); + }); +}); - should.equal(deferredMsg1.body, testMessages[1].body); - should.equal(deferredMsg1.messageId, testMessages[1].messageId); - await deferredMsg0.complete(); - await deferredMsg1.complete(); +describe("Deadletter message", function(): void { + beforeEach(async () => { + await beforeEachTest(); + }); - await testPeekMsgsLength(subscriptionClient, 0); + afterEach(async () => { + await afterEachTest(); }); - it("With auto-complete disabled, dead lettering the message results in not getting the same message again from queue. The message is then gotten only from the dead letter", async function(): Promise< - void - > { - await queueClient.sendBatch(testMessages); - await testPeekMsgsLength(queueClient, 2); - const receiveListener = await queueClient.receive( + async function testDeadletter( + senderClient: QueueClient | TopicClient, + receiverClient: QueueClient | SubscriptionClient, + deadletterClient: QueueClient | SubscriptionClient, + autoComplete: boolean + ): Promise { + await senderClient.sendBatch(testMessages); + await testPeekMsgsLength(receiverClient, 2); + const receiveListener = await receiverClient.receive( (msg: ServiceBusMessage) => { return msg.deadLetter(); }, (err: Error) => { should.not.exist(err); }, - { autoComplete: false } + { autoComplete } ); await delay(4000); await receiveListener.stop(); - await testPeekMsgsLength(queueClient, 0); + await testPeekMsgsLength(receiverClient, 0); - const deadLetterQueuePath = Namespace.getDeadLetterQueuePathForQueue(queueClient.name); - const deadletterQueueClient = namespace.createQueueClient(deadLetterQueuePath); - - const deadLetterMsgs = await deadletterQueueClient.receiveBatch(2); + const deadLetterMsgs = await deadletterClient.receiveBatch(2); should.equal(Array.isArray(deadLetterMsgs), true); - should.equal(deadLetterMsgs.length, 2); - should.equal(deadLetterMsgs[0].messageId, testMessages[0].messageId); - should.equal(deadLetterMsgs[1].messageId, testMessages[1].messageId); + should.equal(deadLetterMsgs.length, testMessages.length); + should.equal(testMessages.some((x) => deadLetterMsgs[0].messageId === x.messageId), true); + should.equal(testMessages.some((x) => deadLetterMsgs[1].messageId === x.messageId), true); await deadLetterMsgs[0].complete(); await deadLetterMsgs[1].complete(); - await testPeekMsgsLength(deadletterQueueClient, 0); + await testPeekMsgsLength(deadletterClient, 0); + } + + it("Queue: deadLetter() moves message to deadletter queue", async function(): Promise { + await testDeadletter(queueClient, queueClient, deadletterQueueClient, false); }); - it("With auto-complete disabled, dead lettering the message results in not getting the same message again from subscription. The message is then gotten only from the dead letter", async function(): Promise< + it("Subscription: deadLetter() moves message to deadletter queue", async function(): Promise< void > { - await topicClient.sendBatch(testMessages); - - const receiveListener = await subscriptionClient.receive( - (msg: ServiceBusMessage) => { - return msg.deadLetter(); - }, - (err: Error) => { - should.not.exist(err); - }, - { autoComplete: false } - ); - - await delay(4000); - - await receiveListener.stop(); - - await testPeekMsgsLength(subscriptionClient, 0); - const deadLetterSubscriptionPath = Namespace.getDeadLetterSubcriptionPathForSubcription( - topicClient.name, - subscriptionClient.subscriptionName - ); - - const deadletterSubscriptionClient = namespace.createSubscriptionClient( - deadLetterSubscriptionPath ? deadLetterSubscriptionPath : "", - subscriptionClient.subscriptionName - ); + await testDeadletter(topicClient, subscriptionClient, deadletterSubscriptionClient, false); + }); - await testPeekMsgsLength(deadletterSubscriptionClient, 2); // Two messages in the DL + it("Queue with autoComplete: deadLetter() moves message to deadletter queue", async function(): Promise< + void + > { + await testDeadletter(queueClient, queueClient, deadletterQueueClient, false); + }); - const deadLetterMsgs = await deadletterSubscriptionClient.receiveBatch(2); - should.equal(Array.isArray(deadLetterMsgs), true); - should.equal(deadLetterMsgs.length, 2); - should.equal(deadLetterMsgs[0].messageId, testMessages[0].messageId); - should.equal(deadLetterMsgs[1].messageId, testMessages[1].messageId); + it("Subscription with autoComplete: deadLetter() moves message to deadletter queue", async function(): Promise< + void + > { + await testDeadletter(topicClient, subscriptionClient, deadletterSubscriptionClient, false); + }); +}); - await deadLetterMsgs[0].complete(); - await deadLetterMsgs[1].complete(); +describe("Multiple Streaming Receivers", function(): void { + beforeEach(async () => { + await beforeEachTest(); + }); - await testPeekMsgsLength(deadletterSubscriptionClient, 0); + afterEach(async () => { + await afterEachTest(); }); - it("Second Streaming Receiver call should fail if the first one is not stopped for Queues", async function(): Promise< - void - > { - const receiveListener: ReceiveHandler = await queueClient.receive( + async function testMultipleReceiveCalls( + receiverClient: QueueClient | SubscriptionClient + ): Promise { + const receiveListener: ReceiveHandler = await receiverClient.receive( (msg: ServiceBusMessage) => { return msg.complete(); }, @@ -797,7 +550,7 @@ describe("Streaming Receiver from Queue/Subscription", function(): void { ); await delay(5000); try { - const receiveListener2 = await queueClient.receive( + const receiveListener2 = await receiverClient.receive( (msg: ServiceBusMessage) => { return Promise.resolve(); }, @@ -811,35 +564,17 @@ describe("Streaming Receiver from Queue/Subscription", function(): void { } await receiveListener.stop(); + } + + it("Second Streaming Receiver call should fail if the first one is not stopped for Queues", async function(): Promise< + void + > { + await testMultipleReceiveCalls(queueClient); }); it("Second Streaming Receiver call should fail if the first one is not stopped for Subscriptions", async function(): Promise< void > { - const receiveListener: ReceiveHandler = await subscriptionClient.receive( - (msg: ServiceBusMessage) => { - return msg.complete(); - }, - (err: Error) => { - should.not.exist(err); - } - ); - await delay(5000); - - try { - const receiveListener2 = await subscriptionClient.receive( - (msg: ServiceBusMessage) => { - return Promise.resolve(); - }, - (err: Error) => { - should.exist(err); - } - ); - await receiveListener2.stop(); - } catch (err) { - should.equal(!err.message.search("has already been created for the Subscription"), false); - } - - await receiveListener.stop(); + await testMultipleReceiveCalls(subscriptionClient); }); });