-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathsinkMain.test.ts
74 lines (61 loc) · 2.28 KB
/
sinkMain.test.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import { describe, it, expect, vi, afterEach } from "vitest";
import { handler } from "./sinkMain";
import * as sinkMainProcessors from "./sinkMainProcessors";
import { KafkaEvent } from "shared-types";
const createKafkaEvent = (records: KafkaEvent["records"]) => ({
eventSource: "SelfManagedKafka",
bootstrapServers: "kafka",
records,
});
describe("sinkMain handler", () => {
vi.stubEnv("osDomain", "os-domain");
vi.stubEnv("indexNamespace", "index-namespace");
afterEach(() => {
vi.restoreAllMocks();
vi.resetModules();
});
it("handles aws.onemac.migration.cdc topic successfully", async () => {
const spiedOnProcessAndIndex = vi
.spyOn(sinkMainProcessors, "insertOneMacRecordsFromKafkaIntoMako")
.mockImplementation(vi.fn());
await handler(
createKafkaEvent({ "aws.onemac.migration.cdc-0": [] }),
expect.anything(),
vi.fn(),
);
expect(spiedOnProcessAndIndex).toBeCalledWith([], "aws.onemac.migration.cdc-0");
});
it("handles aws.seatool.ksql.onemac.three.agg.State_Plan topic successfully", async () => {
const spiedOnProcessAndIndex = vi
.spyOn(sinkMainProcessors, "insertNewSeatoolRecordsFromKafkaIntoMako")
.mockImplementation(vi.fn());
await handler(
createKafkaEvent({ "aws.seatool.ksql.onemac.three.agg.State_Plan-0": [] }),
expect.anything(),
vi.fn(),
);
expect(spiedOnProcessAndIndex).toBeCalledWith(
[],
"aws.seatool.ksql.onemac.three.agg.State_Plan-0",
);
});
it("handles aws.seatool.debezium.changed_date.SEA.dbo.State_Plan topic successfully", async () => {
const spiedOnProcessAndIndex = vi
.spyOn(sinkMainProcessors, "syncSeatoolRecordDatesFromKafkaWithMako")
.mockImplementation(vi.fn());
await handler(
createKafkaEvent({ "aws.seatool.debezium.changed_date.SEA.dbo.State_Plan-0": [] }),
expect.anything(),
vi.fn(),
);
expect(spiedOnProcessAndIndex).toBeCalledWith(
[],
"aws.seatool.debezium.changed_date.SEA.dbo.State_Plan-0",
);
});
it("throws error with invalid topic partition", async () => {
await expect(
handler(createKafkaEvent({ "invalid-topic-partition": [] }), expect.anything(), vi.fn()),
).rejects.toThrowError("topic (invalid-topic-partition) is invalid");
});
});