-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Service Bus] Better errors for settling deferred msg in ReceiveAndDelete mode #10396
Changes from 3 commits
4288eec
b170f33
fe6f7d7
6fec199
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -328,13 +328,16 @@ describe("receive and delete", () => { | |||||||||||
}); | ||||||||||||
|
||||||||||||
describe("Receive Deferred messages in ReceiveAndDelete mode", function(): void { | ||||||||||||
let sequenceNumber: Long; | ||||||||||||
let entityNames: EntityName; | ||||||||||||
|
||||||||||||
afterEach(async () => { | ||||||||||||
await afterEachTest(); | ||||||||||||
}); | ||||||||||||
async function deferMessage(useSessions?: boolean): Promise<void> { | ||||||||||||
const testMessages = useSessions ? TestMessage.getSessionSample() : TestMessage.getSample(); | ||||||||||||
async function deferMessage(testClientType: TestClientType): Promise<Long> { | ||||||||||||
entityNames = await beforeEachTest(testClientType, "peekLock"); | ||||||||||||
const testMessages = entityNames.usesSessions | ||||||||||||
? TestMessage.getSessionSample() | ||||||||||||
: TestMessage.getSample(); | ||||||||||||
await sender.sendMessages(testMessages); | ||||||||||||
const batch = await receiver.receiveMessages(1); | ||||||||||||
const msgs = batch; | ||||||||||||
|
@@ -349,60 +352,180 @@ describe("receive and delete", () => { | |||||||||||
); | ||||||||||||
should.equal(msgs[0].deliveryCount, 0, "DeliveryCount is different than expected"); | ||||||||||||
|
||||||||||||
sequenceNumber = msgs[0].sequenceNumber!; | ||||||||||||
await (msgs[0] as ReceivedMessageWithLock).defer(); | ||||||||||||
return msgs[0].sequenceNumber!; | ||||||||||||
} | ||||||||||||
|
||||||||||||
async function receiveDeferredMessage(): Promise<void> { | ||||||||||||
async function testDeferredMessage(testClientType: TestClientType): Promise<ReceivedMessage> { | ||||||||||||
const sequenceNumber = await deferMessage(testClientType); | ||||||||||||
await receiver.close(); | ||||||||||||
receiver = await serviceBusClient.test.getReceiveAndDeleteReceiver(entityNames); | ||||||||||||
|
||||||||||||
const [deferredMsg] = await receiver.receiveDeferredMessages(sequenceNumber); | ||||||||||||
if (!deferredMsg) { | ||||||||||||
throw `No message received for sequence number ${sequenceNumber}`; | ||||||||||||
} | ||||||||||||
|
||||||||||||
should.equal(deferredMsg!.deliveryCount, 1, "DeliveryCount is different than expected"); | ||||||||||||
await testPeekMsgsLength(receiver, 0); | ||||||||||||
} | ||||||||||||
|
||||||||||||
async function deferAndReceiveMessage(testClientType: TestClientType) { | ||||||||||||
const entityNames = await beforeEachTest(testClientType, "peekLock"); | ||||||||||||
await deferMessage(entityNames.usesSessions); | ||||||||||||
await receiver.close(); | ||||||||||||
receiver = await serviceBusClient.test.getReceiveAndDeleteReceiver(entityNames); | ||||||||||||
await receiveDeferredMessage(); | ||||||||||||
return deferredMsg; | ||||||||||||
} | ||||||||||||
|
||||||||||||
/* | ||||||||||||
// The below are commented due to service bug described in https://github.com/Azure/azure-sdk-for-js/issues/2268 | ||||||||||||
it("Partitioned Queue: No settlement of the message removes message", async function(): Promise< | ||||||||||||
void | ||||||||||||
> { | ||||||||||||
await deferAndReceiveMessage(TestClientType.PartitionedQueue); | ||||||||||||
await testDeferredMessage(TestClientType.PartitionedQueue); | ||||||||||||
}); | ||||||||||||
|
||||||||||||
it("Partitioned Subscription: No settlement of the message removes message", async function(): Promise< | ||||||||||||
void | ||||||||||||
> { | ||||||||||||
await deferAndReceiveMessage(TestClientType.PartitionedSubscription); | ||||||||||||
await testDeferredMessage(TestClientType.PartitionedSubscription); | ||||||||||||
}); | ||||||||||||
*/ | ||||||||||||
|
||||||||||||
it("Unpartitioned Queue: No settlement of the message removes message", async function(): Promise< | ||||||||||||
void | ||||||||||||
> { | ||||||||||||
await deferAndReceiveMessage(TestClientType.UnpartitionedQueue); | ||||||||||||
await testDeferredMessage(TestClientType.UnpartitionedQueue); | ||||||||||||
}); | ||||||||||||
|
||||||||||||
it("Unpartitioned Subscription: No settlement of the message removes message", async function(): Promise< | ||||||||||||
void | ||||||||||||
> { | ||||||||||||
await deferAndReceiveMessage(TestClientType.UnpartitionedSubscription); | ||||||||||||
await testDeferredMessage(TestClientType.UnpartitionedSubscription); | ||||||||||||
}); | ||||||||||||
|
||||||||||||
it( | ||||||||||||
withSessionTestClientType + ": No settlement of the message removes message", | ||||||||||||
async function(): Promise<void> { | ||||||||||||
await deferAndReceiveMessage(withSessionTestClientType); | ||||||||||||
await testDeferredMessage(withSessionTestClientType); | ||||||||||||
} | ||||||||||||
); | ||||||||||||
}); | ||||||||||||
|
||||||||||||
describe("Settlement of deferred msg in ReceiveAndDelete mode", () => { | ||||||||||||
afterEach(async () => { | ||||||||||||
await afterEachTest(); | ||||||||||||
}); | ||||||||||||
|
||||||||||||
let entityNames: EntityName; | ||||||||||||
|
||||||||||||
async function testDeferredMessage(testClientType: TestClientType): Promise<ReceivedMessage> { | ||||||||||||
entityNames = await beforeEachTest(testClientType, "peekLock"); | ||||||||||||
|
||||||||||||
// send message | ||||||||||||
const testMessage = entityNames.usesSessions | ||||||||||||
? TestMessage.getSessionSample() | ||||||||||||
: TestMessage.getSample(); | ||||||||||||
await sender.sendMessages(testMessage); | ||||||||||||
|
||||||||||||
// receive and defer the message | ||||||||||||
const [msg] = await receiver.receiveMessages(1); | ||||||||||||
await (msg as ReceivedMessageWithLock).defer(); | ||||||||||||
const sequenceNumber = msg.sequenceNumber!; | ||||||||||||
await receiver.close(); | ||||||||||||
|
||||||||||||
// Receive the deferred message in ReceiveAndDelete mode | ||||||||||||
receiver = await serviceBusClient.test.getReceiveAndDeleteReceiver(entityNames); | ||||||||||||
const [deferredMsg] = await receiver.receiveDeferredMessages(sequenceNumber); | ||||||||||||
if (!deferredMsg) { | ||||||||||||
throw `No message received for sequence number ${sequenceNumber}`; | ||||||||||||
} | ||||||||||||
|
||||||||||||
return deferredMsg; | ||||||||||||
} | ||||||||||||
|
||||||||||||
const testError = (err: Error, operation: DispositionType): void => { | ||||||||||||
expect(err.message.toLowerCase(), "ErrorMessage is different than expected").includes( | ||||||||||||
`failed to ${operation} the message as the operation is only supported in \'peeklock\' receive mode.` | ||||||||||||
); | ||||||||||||
}; | ||||||||||||
|
||||||||||||
async function testSettlement( | ||||||||||||
testClienttype: TestClientType, | ||||||||||||
operation: DispositionType | ||||||||||||
): Promise<void> { | ||||||||||||
const deferredMsg = await testDeferredMessage(testClienttype); | ||||||||||||
// we have to force this cast - the type system doesn't allow this if you've chosen receiveAndDelete | ||||||||||||
// as your lock mode. | ||||||||||||
const msg = deferredMsg as ReceivedMessageWithLock; | ||||||||||||
|
||||||||||||
try { | ||||||||||||
if (operation === DispositionType.complete) { | ||||||||||||
await msg.complete(); | ||||||||||||
} else if (operation === DispositionType.abandon) { | ||||||||||||
await msg.abandon(); | ||||||||||||
} else if (operation === DispositionType.deadletter) { | ||||||||||||
await msg.deadLetter(); | ||||||||||||
} else if (operation === DispositionType.defer) { | ||||||||||||
await msg.defer(); | ||||||||||||
} | ||||||||||||
} catch (err) { | ||||||||||||
errorWasThrown = true; | ||||||||||||
testError(err, operation); | ||||||||||||
} | ||||||||||||
|
||||||||||||
should.equal(errorWasThrown, true, "Error thrown flag must be true"); | ||||||||||||
} | ||||||||||||
|
||||||||||||
it(noSessionTestClientType + ": complete() throws error", async function(): Promise<void> { | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would it be simpler to just do the [].forEach(() => {}) trick instead? Like this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See #9950 (comment) |
||||||||||||
await testSettlement(noSessionTestClientType, DispositionType.complete); | ||||||||||||
}); | ||||||||||||
|
||||||||||||
it(withSessionTestClientType + ": complete() throws error", async function(): Promise<void> { | ||||||||||||
await testSettlement(withSessionTestClientType, DispositionType.complete); | ||||||||||||
}); | ||||||||||||
|
||||||||||||
it(noSessionTestClientType + ": abandon() throws error", async function(): Promise<void> { | ||||||||||||
await testSettlement(noSessionTestClientType, DispositionType.abandon); | ||||||||||||
}); | ||||||||||||
|
||||||||||||
it(withSessionTestClientType + ": abandon() throws error", async function(): Promise<void> { | ||||||||||||
await testSettlement(withSessionTestClientType, DispositionType.abandon); | ||||||||||||
}); | ||||||||||||
|
||||||||||||
it(noSessionTestClientType + ": defer() throws error", async function(): Promise<void> { | ||||||||||||
await testSettlement(noSessionTestClientType, DispositionType.defer); | ||||||||||||
}); | ||||||||||||
|
||||||||||||
it(withSessionTestClientType + ": defer() throws error", async function(): Promise<void> { | ||||||||||||
await testSettlement(withSessionTestClientType, DispositionType.defer); | ||||||||||||
}); | ||||||||||||
|
||||||||||||
it(noSessionTestClientType + ": deadLetter() throws error", async function(): Promise<void> { | ||||||||||||
await testSettlement(noSessionTestClientType, DispositionType.deadletter); | ||||||||||||
}); | ||||||||||||
|
||||||||||||
it(withSessionTestClientType + ": deadLetter() throws error", async function(): Promise<void> { | ||||||||||||
await testSettlement(withSessionTestClientType, DispositionType.deadletter); | ||||||||||||
}); | ||||||||||||
|
||||||||||||
async function testRenewLock(testClienttype: TestClientType): Promise<void> { | ||||||||||||
const deferredMsg = await testDeferredMessage(testClienttype); | ||||||||||||
// we have to force this cast - the type system doesn't allow this if you've chosen receiveAndDelete | ||||||||||||
// as your lock mode. | ||||||||||||
|
||||||||||||
// have to cast it - the type system doesn't allow us to call into this method otherwise. | ||||||||||||
Comment on lines
+509
to
+512
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Repetitive?
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no idea, I copied the entire test set from the ones we have for "normal" messages in this file :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, the problem is that the Receiver isn't being typed more specifically. We should fix this at some point to allow the receiver to be scoped properly but I think (grossly) this is the best we can do for now. |
||||||||||||
await (deferredMsg as ReceivedMessageWithLock).renewLock().catch((err) => { | ||||||||||||
should.equal( | ||||||||||||
err.message, | ||||||||||||
getErrorMessageNotSupportedInReceiveAndDeleteMode("renew the lock on the message"), | ||||||||||||
"ErrorMessage is different than expected" | ||||||||||||
); | ||||||||||||
errorWasThrown = true; | ||||||||||||
}); | ||||||||||||
|
||||||||||||
should.equal(errorWasThrown, true, "Error thrown flag must be true"); | ||||||||||||
} | ||||||||||||
|
||||||||||||
it(noSessionTestClientType + ": Renew message lock throws error", async function(): Promise< | ||||||||||||
void | ||||||||||||
> { | ||||||||||||
await testRenewLock(noSessionTestClientType); | ||||||||||||
}); | ||||||||||||
}); | ||||||||||||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just pop a comment in here saying this is a workaround (because eventually we can get rid of this, right?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment in 6fec199