Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(test): Create mock for calling lambdas #973

Merged
merged 11 commits into from
Jan 7, 2025
53 changes: 22 additions & 31 deletions lib/lambda/cfnNotify.test.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { afterEach, describe, expect, it, vi } from "vitest";
import { handler } from "./cfnNotify";
import { send, SUCCESS, FAILED } from "cfn-response-async";

vi.mock("cfn-response-async", () => ({
send: vi.fn(),
SUCCESS: "SUCCESS",
FAILED: "FAILED",
}));
import { Context } from "aws-lambda";
import { CLOUDFORMATION_NOTIFICATION_DOMAIN } from "mocks";
import * as cfn from "cfn-response-async";

describe("Lambda Handler", () => {
const cfnSpy = vi.spyOn(cfn, "send");
const callback = vi.fn();

beforeEach(() => {
afterEach(() => {
vi.clearAllMocks();
});

Expand All @@ -21,19 +18,21 @@ describe("Lambda Handler", () => {
Context: {
Execution: {
Input: {
cfnEvent: {},
cfnEvent: {
ResponseURL: CLOUDFORMATION_NOTIFICATION_DOMAIN,
},
cfnContext: {},
},
},
},
};

await handler(event, null, callback);
await handler(event, {} as Context, callback);

expect(send).toHaveBeenCalledWith(
expect(cfnSpy).toHaveBeenCalledWith(
event.Context.Execution.Input.cfnEvent,
event.Context.Execution.Input.cfnContext,
SUCCESS,
cfn.SUCCESS,
{},
"static",
);
Expand All @@ -46,19 +45,21 @@ describe("Lambda Handler", () => {
Context: {
Execution: {
Input: {
cfnEvent: {},
cfnEvent: {
ResponseURL: CLOUDFORMATION_NOTIFICATION_DOMAIN,
},
cfnContext: {},
},
},
},
};

await handler(event, null, callback);
await handler(event, {} as Context, callback);

expect(send).toHaveBeenCalledWith(
expect(cfnSpy).toHaveBeenCalledWith(
event.Context.Execution.Input.cfnEvent,
event.Context.Execution.Input.cfnContext,
FAILED,
cfn.FAILED,
{},
"static",
);
Expand All @@ -78,29 +79,19 @@ describe("Lambda Handler", () => {
},
};

await handler(event, null, callback);
await handler(event, {} as Context, callback);

expect(send).not.toHaveBeenCalled();
expect(cfnSpy).not.toHaveBeenCalled();
expect(callback).toHaveBeenCalledWith(null, { statusCode: 200 });
});

it("should handle errors and return statusCode 500", async () => {
const event = {
Success: true,
Context: {
Execution: {
Input: {
cfnEvent: {},
cfnContext: {},
},
},
},
Context: {},
};

// Simulate an error in send function
(send as vi.Mock).mockRejectedValue(new Error("Test error"));

await handler(event, null, callback);
await handler(event, {} as Context, callback);

expect(callback).toHaveBeenCalledWith(expect.any(Error), {
statusCode: 500,
Expand Down
1 change: 1 addition & 0 deletions lib/lambda/cfnNotify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export const handler: Handler = async (event, _, callback) => {
await send(cfnEvent, cfnContext, result, responseData, "static");
}
} catch (error: any) {
console.log({ error });
response.statusCode = 500;
errorResponse = error;
} finally {
Expand Down
202 changes: 60 additions & 142 deletions lib/lambda/checkConsumerLag.test.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { describe, it, expect, vi, afterEach } from "vitest";
import { handler } from "./checkConsumerLag";
import { Kafka } from "kafkajs";
import { Context } from "aws-lambda";
import {
TEST_FUNCTION_NAME,
TEST_TOPIC_NAME,
TEST_NONEXISTENT_TOPIC_NAME,
TEST_NONEXISTENT_FUNCTION_NAME,
TEST_MULTIPLE_TOPICS_FUNCTION_NAME,
TEST_MULTIPLE_TOPICS_TOPIC_NAME,
TEST_MISSING_CONSUMER_FUNCTION_NAME,
TEST_MISSING_CONSUMER_TOPIC_NAME,
} from "mocks";

const mockKafkaAdmin = {
connect: vi.fn(),
describeGroups: vi.fn().mockResolvedValue({
groups: [{ state: "Stable" }],
}),
fetchTopicOffsets: vi.fn().mockResolvedValue([{ offset: "100" }]),
fetchOffsets: vi
.fn()
.mockResolvedValue([{ partitions: [{ offset: "100" }] }]),
fetchOffsets: vi.fn().mockResolvedValue([{ partitions: [{ offset: "100" }] }]),
disconnect: vi.fn(),
};

Expand All @@ -20,43 +28,25 @@ vi.mock("kafkajs", () => ({
})),
}));

const mockLambdaClient = {
send: vi.fn(),
};

vi.mock("@aws-sdk/client-lambda", () => ({
LambdaClient: vi.fn().mockImplementation(() => mockLambdaClient),
ListEventSourceMappingsCommand: vi.fn(),
}));

describe("Lambda Handler", () => {
const callback = vi.fn();

beforeEach(() => {
afterEach(() => {
vi.clearAllMocks();
});

it("should handle successful execution with stable and current offsets", async () => {
const event = {
triggers: [
{
function: "test-function",
topics: ["test-topic"],
function: TEST_FUNCTION_NAME,
topics: [TEST_TOPIC_NAME],
},
],
brokerString: "broker1,broker2",
};

mockLambdaClient.send.mockResolvedValueOnce({
EventSourceMappings: [
{
Topics: ["test-topic"],
SelfManagedKafkaEventSourceConfig: { ConsumerGroupId: "test-group" },
},
],
});

await handler(event, null, callback);
await handler(event, {} as Context, callback);

expect(callback).toHaveBeenCalledWith(null, {
statusCode: 200,
Expand All @@ -66,144 +56,72 @@ describe("Lambda Handler", () => {
});
});

it("should handle missing event source mappings", async () => {
it.each([
[
"should handle missing function",
TEST_NONEXISTENT_FUNCTION_NAME,
TEST_TOPIC_NAME,
`ERROR: No event source mapping found for function ${TEST_NONEXISTENT_FUNCTION_NAME} and topic ${TEST_TOPIC_NAME}`,
],
[
"should handle missing topic",
TEST_FUNCTION_NAME,
TEST_NONEXISTENT_TOPIC_NAME,
`ERROR: No event source mapping found for function ${TEST_FUNCTION_NAME} and topic ${TEST_NONEXISTENT_TOPIC_NAME}`,
],
[
"should handle multiple event source mappings",
TEST_MULTIPLE_TOPICS_FUNCTION_NAME,
TEST_MULTIPLE_TOPICS_TOPIC_NAME,
`ERROR: Multiple event source mappings found for function ${TEST_MULTIPLE_TOPICS_FUNCTION_NAME} and topic ${TEST_MULTIPLE_TOPICS_TOPIC_NAME}`,
],
[
"should handle missing ConsumerGroupId",
TEST_MISSING_CONSUMER_FUNCTION_NAME,
TEST_MISSING_CONSUMER_TOPIC_NAME,
`ERROR: No ConsumerGroupId found for function ${TEST_MISSING_CONSUMER_FUNCTION_NAME} and topic ${TEST_MISSING_CONSUMER_TOPIC_NAME}`,
],
])("%s", async (_, funcName, topicName, errorMessage) => {
const event = {
triggers: [
{
function: "test-function",
topics: ["nonexistent-topic"],
function: funcName,
topics: [topicName],
},
],
brokerString: "broker1,broker2",
};

mockLambdaClient.send.mockResolvedValueOnce({
EventSourceMappings: [],
});

await handler(event, null, callback);
await handler(event, {} as Context, callback);

expect(callback).toHaveBeenCalledWith(
new Error(
"ERROR: No event source mapping found for function test-function and topic nonexistent-topic",
),
{
statusCode: 500,
stable: false,
current: false,
ready: false,
},
);
});

it("should handle multiple event source mappings", async () => {
const event = {
triggers: [
{
function: "test-function",
topics: ["test-topic"],
},
],
brokerString: "broker1,broker2",
};

mockLambdaClient.send.mockResolvedValueOnce({
EventSourceMappings: [
{
Topics: ["test-topic"],
SelfManagedKafkaEventSourceConfig: { ConsumerGroupId: "test-group" },
},
{
Topics: ["test-topic"],
SelfManagedKafkaEventSourceConfig: {
ConsumerGroupId: "test-group-2",
},
},
],
expect(callback).toHaveBeenCalledWith(new Error(errorMessage), {
statusCode: 500,
stable: false,
current: false,
ready: false,
});

await handler(event, null, callback);

expect(callback).toHaveBeenCalledWith(
new Error(
"ERROR: Multiple event source mappings found for function test-function and topic test-topic",
),
{
statusCode: 500,
stable: false,
current: false,
ready: false,
},
);
});

it("should handle kafka admin errors", async () => {
const event = {
triggers: [
{
function: "test-function",
topics: ["test-topic"],
function: TEST_FUNCTION_NAME,
topics: [TEST_TOPIC_NAME],
},
],
brokerString: "broker1,broker2",
};

const kafka = new Kafka({
clientId: "consumerGroupResetter",
brokers: event.brokerString?.split(",") || [],
ssl: true,
});

kafka.admin = vi.fn().mockReturnValueOnce({
connect: vi.fn(),
describeGroups: vi.fn().mockRejectedValue(new Error("Kafka admin error")),
fetchTopicOffsets: vi.fn(),
fetchOffsets: vi.fn(),
disconnect: vi.fn(),
});

await handler(event, null, callback);
mockKafkaAdmin.describeGroups.mockRejectedValueOnce(new Error("Kafka admin error"));

expect(callback).toHaveBeenCalledWith(
expect.any(Error),
expect.objectContaining({
statusCode: 500,
}),
);
});
await handler(event, {} as Context, callback);

it("should handle missing ConsumerGroupId", async () => {
const event = {
triggers: [
{
function: "test-function",
topics: ["test-topic"],
},
],
brokerString: "broker1,broker2",
};

mockLambdaClient.send.mockResolvedValueOnce({
EventSourceMappings: [
{
Topics: ["test-topic"],
SelfManagedKafkaEventSourceConfig: null,
},
],
expect(callback).toHaveBeenCalledWith(new Error(`Kafka admin error`), {
statusCode: 500,
stable: false,
current: false,
ready: false,
});

await handler(event, null, callback);

expect(callback).toHaveBeenCalledWith(
new Error(
"ERROR: No ConsumerGroupId found for function test-function and topic test-topic",
),
{
statusCode: 500,
stable: false,
current: false,
ready: false,
},
);
});
});
Loading
Loading