-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmessage-loss-repro.ts
99 lines (88 loc) · 2.76 KB
/
message-loss-repro.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import {
delay,
ProcessErrorArgs,
ServiceBusAdministrationClient,
ServiceBusClient,
ServiceBusReceivedMessage,
ServiceBusReceiver,
ServiceBusSender,
} from "@azure/service-bus";
// Load the .env file if it exists
import * as dotenv from "dotenv";
dotenv.config();
async function checkWithTimeout(
predicate: () => boolean | Promise<boolean>,
delayBetweenRetriesInMilliseconds: number = 1000,
maxWaitTimeInMilliseconds: number = 10000
): Promise<boolean> {
const maxTime = Date.now() + maxWaitTimeInMilliseconds;
while (Date.now() < maxTime) {
if (await predicate()) return true;
await delay(delayBetweenRetriesInMilliseconds);
}
return false;
}
async function sendMessages(
sender: ServiceBusSender,
numberOfMessagesToSend: number
) {
let current = 0;
const messageBodies = [];
while (current < numberOfMessagesToSend) {
const batch = await sender.createMessageBatch();
let body = `message-${current}`;
while (
current < numberOfMessagesToSend &&
batch.tryAddMessage({
body,
})
) {
messageBodies.push(body);
current++;
body = `message-${current}`;
}
await sender.sendMessages(batch);
}
return messageBodies;
}
async function receiveMessages(
receiver: ServiceBusReceiver,
numberOfMessagesToReceive: number
) {
let messages: ServiceBusReceivedMessage[] = [];
while (messages.length < numberOfMessagesToReceive) {
messages = messages.concat(
await receiver.receiveMessages(50, { maxWaitTimeInMs: 1500 })
);
console.log(`...Received ${messages.length} messages in total`);
}
console.log("Receiving done!");
return messages;
}
async function main() {
// Define connection string and related Service Bus entity names here
const connectionString =
process.env.SERVICEBUS_CONNECTION_STRING || "<connection string>";
const queueName = `qqqq-${Math.floor(Math.random() * 9000 + 1000)}`;
const serviceBusAdministrationClient = new ServiceBusAdministrationClient(
connectionString
);
const createQueueResponse = await serviceBusAdministrationClient.createQueue(
queueName,
{ enablePartitioning: true }
);
console.log("Created queue with name - ", createQueueResponse.name);
const numberOfMessagesToSend = 2000;
const sbClient = new ServiceBusClient(connectionString);
const sender = sbClient.createSender(queueName);
const receiver = sbClient.createReceiver(queueName, {
receiveMode: "receiveAndDelete",
});
await sendMessages(sender, numberOfMessagesToSend);
await receiveMessages(receiver, numberOfMessagesToSend);
await sbClient.close();
await serviceBusAdministrationClient.deleteQueue(queueName);
}
main().catch((err) => console.log(err));