-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathcheckConsumerLag.ts
103 lines (101 loc) · 3.71 KB
/
checkConsumerLag.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import { Handler } from "aws-lambda";
import { Kafka } from "kafkajs";
import { LambdaClient, ListEventSourceMappingsCommand } from "@aws-sdk/client-lambda";
export const handler: Handler = async (event, _, callback) => {
const response = {
statusCode: 200,
stable: false,
current: false,
ready: false,
};
let errorResponse = null;
try {
const triggerInfo: any[] = [];
const lambdaClient = new LambdaClient({
region: process.env.region,
});
for (const trigger of event.triggers) {
for (const topic of [...new Set(trigger.topics)]) {
console.log(`Getting consumer groups for function: ${trigger.function} and topic ${topic}`);
const lambdaResponse = await lambdaClient.send(
new ListEventSourceMappingsCommand({
FunctionName: trigger.function,
}),
);
if (
!lambdaResponse.EventSourceMappings ||
lambdaResponse.EventSourceMappings.length === 0
) {
throw new Error(
`ERROR: No event source mapping found for function ${trigger.function} and topic ${topic}`,
);
}
const mappingForCurrentTopic = lambdaResponse.EventSourceMappings.filter(
(mapping) => mapping.Topics && mapping.Topics.includes(topic as string),
);
if (!mappingForCurrentTopic || mappingForCurrentTopic.length === 0) {
throw new Error(
`ERROR: No event source mapping found for function ${trigger.function} and topic ${topic}`,
);
}
if (mappingForCurrentTopic.length > 1) {
throw new Error(
`ERROR: Multiple event source mappings found for function ${trigger.function} and topic ${topic}`,
);
}
const groupId =
mappingForCurrentTopic[0]?.SelfManagedKafkaEventSourceConfig?.ConsumerGroupId;
if (!groupId) {
throw new Error(
`ERROR: No ConsumerGroupId found for function ${trigger.function} and topic ${topic}`,
);
}
triggerInfo.push({
groupId,
topics: [topic],
});
}
}
const kafka = new Kafka({
clientId: "consumerGroupResetter",
brokers: event.brokerString?.split(",") || [],
ssl: true,
});
const admin = kafka.admin();
await admin.connect();
// Get status for each consumer group
const info = await admin.describeGroups(triggerInfo.map((a) => a.groupId));
const statuses = info.groups.map((a) => a.state.toString());
// Get topic and group offset for each consumer group
const offsets: { [key: string]: any } = {};
for (const trigger of triggerInfo) {
for (const topic of trigger.topics) {
const groupId: string = trigger.groupId;
const topicOffsets = await admin.fetchTopicOffsets(topic);
const groupOffsets = await admin.fetchOffsets({
groupId,
topics: [topic],
});
// Assuming there's a single partition for simplicity.
const latestOffset = topicOffsets[0].offset;
const currentOffset = groupOffsets[0].partitions[0].offset;
offsets[groupId] = {
latestOffset,
currentOffset,
};
console.log(
`Topic: ${topic}, Group: ${groupId}, Latest Offset: ${latestOffset}, Current Offset: ${currentOffset}`,
);
}
}
await admin.disconnect();
response.stable = statuses.every((status) => status === "Stable");
response.current = Object.values(offsets).every((o) => o.latestOffset === o.currentOffset);
response.ready = response.stable && response.current;
} catch (error: any) {
response.statusCode = 500;
errorResponse = error;
} finally {
callback(errorResponse, response);
}
};