Skip to content

Commit

Permalink
added region to new Lambda
Browse files Browse the repository at this point in the history
  • Loading branch information
thetif committed Jan 6, 2025
1 parent 081dfd7 commit 8ce0a74
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 24 deletions.
28 changes: 10 additions & 18 deletions lib/lambda/checkConsumerLag.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import { Handler } from "aws-lambda";
import { Kafka } from "kafkajs";
import {
LambdaClient,
ListEventSourceMappingsCommand,
} from "@aws-sdk/client-lambda";
import { LambdaClient, ListEventSourceMappingsCommand } from "@aws-sdk/client-lambda";

export const handler: Handler = async (event, _, callback) => {
const response = {
Expand All @@ -15,12 +12,12 @@ export const handler: Handler = async (event, _, callback) => {
let errorResponse = null;
try {
const triggerInfo: any[] = [];
const lambdaClient = new LambdaClient({});
const lambdaClient = new LambdaClient({
region: process.env.region,
});
for (const trigger of event.triggers) {
for (const topic of [...new Set(trigger.topics)]) {
console.log(
`Getting consumer groups for function: ${trigger.function} and topic ${topic}`,
);
console.log(`Getting consumer groups for function: ${trigger.function} and topic ${topic}`);
const lambdaResponse = await lambdaClient.send(
new ListEventSourceMappingsCommand({
FunctionName: trigger.function,
Expand All @@ -34,11 +31,9 @@ export const handler: Handler = async (event, _, callback) => {
`ERROR: No event source mapping found for function ${trigger.function} and topic ${topic}`,
);
}
const mappingForCurrentTopic =
lambdaResponse.EventSourceMappings.filter(
(mapping) =>
mapping.Topics && mapping.Topics.includes(topic as string),
);
const mappingForCurrentTopic = lambdaResponse.EventSourceMappings.filter(
(mapping) => mapping.Topics && mapping.Topics.includes(topic as string),
);
if (!mappingForCurrentTopic || mappingForCurrentTopic.length === 0) {
throw new Error(
`ERROR: No event source mapping found for function ${trigger.function} and topic ${topic}`,
Expand All @@ -50,8 +45,7 @@ export const handler: Handler = async (event, _, callback) => {
);
}
const groupId =
mappingForCurrentTopic[0]?.SelfManagedKafkaEventSourceConfig
?.ConsumerGroupId;
mappingForCurrentTopic[0]?.SelfManagedKafkaEventSourceConfig?.ConsumerGroupId;
if (!groupId) {
throw new Error(
`ERROR: No ConsumerGroupId found for function ${trigger.function} and topic ${topic}`,
Expand Down Expand Up @@ -98,9 +92,7 @@ export const handler: Handler = async (event, _, callback) => {
}
await admin.disconnect();
response.stable = statuses.every((status) => status === "Stable");
response.current = Object.values(offsets).every(
(o) => o.latestOffset === o.currentOffset,
);
response.current = Object.values(offsets).every((o) => o.latestOffset === o.currentOffset);
response.ready = response.stable && response.current;
} catch (error: any) {
response.statusCode = 500;
Expand Down
4 changes: 3 additions & 1 deletion lib/lambda/createTriggers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ export const handler: Handler = async (event, _, callback) => {
};
let errorResponse = null;
try {
const lambdaClient = new LambdaClient({});
const lambdaClient = new LambdaClient({
region: process.env.region,
});
const uuidsToCheck = [];
for (const trigger of event.triggers) {
for (const topic of [...new Set(trigger.topics)]) {
Expand Down
8 changes: 4 additions & 4 deletions lib/lambda/deleteTriggers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ export const handler: Handler = async (event, _, callback) => {
};

export const deleteAllTriggersForFunctions = async (functions: string[]) => {
const lambdaClient = new LambdaClient({});
const lambdaClient = new LambdaClient({
region: process.env.region,
});
const uuidsToCheck = [];
for (const functionName of functions) {
const response = await lambdaClient.send(
new ListEventSourceMappingsCommand({ FunctionName: functionName }),
);
for (const eventSourceMapping of response.EventSourceMappings || []) {
if (eventSourceMapping.SelfManagedKafkaEventSourceConfig) {
console.log(
`Disabling all Kafka triggers for function: ${functionName}`,
);
console.log(`Disabling all Kafka triggers for function: ${functionName}`);
await lambdaClient.send(
new DeleteEventSourceMappingCommand({
UUID: eventSourceMapping.UUID,
Expand Down
2 changes: 1 addition & 1 deletion lib/lambda/runReindex.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { describe, it, expect, vi, afterEach } from "vitest";
import { Context } from "aws-lambda";
import { handler } from "./runReindex";
import { Context } from "aws-lambda";
import { CLOUDFORMATION_NOTIFICATION_DOMAIN } from "mocks";
import { SFNClient } from "@aws-sdk/client-sfn";
import * as cfn from "cfn-response-async";
Expand Down

0 comments on commit 8ce0a74

Please sign in to comment.