Skip to content

Commit

Permalink
fix(os) helpers for getting domain and index for opensearch (#992)
Browse files Browse the repository at this point in the history
* created helpers for getting domain and index for opensearch
* fixed import
  • Loading branch information
thetif authored Jan 8, 2025
1 parent ea3e350 commit cdfae2a
Show file tree
Hide file tree
Showing 26 changed files with 238 additions and 218 deletions.
19 changes: 10 additions & 9 deletions lib/lambda/deleteIndex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,20 @@ export const handler: Handler = async (event, __, callback) => {
};
let errorResponse = null;
try {
if (!event.osDomain) throw "process.env.osDomain cannot be undefined";
const { osDomain, indexNamespace = "" } = event;
if (!osDomain) throw "osDomain cannot be undefined";

const indices: Index[] = [
`${event.indexNamespace}main`,
`${event.indexNamespace}changelog`,
`${event.indexNamespace}insights`,
`${event.indexNamespace}types`,
`${event.indexNamespace}subtypes`,
`${event.indexNamespace}legacyinsights`,
`${event.indexNamespace}cpocs`,
`${indexNamespace}main`,
`${indexNamespace}changelog`,
`${indexNamespace}insights`,
`${indexNamespace}types`,
`${indexNamespace}subtypes`,
`${indexNamespace}legacyinsights`,
`${indexNamespace}cpocs`,
];
for (const index of indices) {
await os.deleteIndex(event.osDomain, index);
await os.deleteIndex(osDomain, index);
}
} catch (error: any) {
response.statusCode = 500;
Expand Down
2 changes: 1 addition & 1 deletion lib/lambda/getAttachmentUrl.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ describe("Lambda Handler", () => {

expect(response).toHaveBeenCalledWith({
statusCode: 500,
body: { message: "ERROR: osDomain env variable is required" },
body: { message: "ERROR: process.env.osDomain must be defined" },
});
});

Expand Down
21 changes: 7 additions & 14 deletions lib/lambda/getAttachmentUrl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,25 @@ import { getSignedUrl } from "@aws-sdk/s3-request-presigner";

import { getStateFilter } from "../libs/api/auth/user";
import { getPackage, getPackageChangelog } from "../libs/api/package";
import { getDomain } from "libs/utils";

// Handler function to get Seatool data
export const handler = async (event: APIGatewayEvent) => {
if (!process.env.osDomain) {
try {
getDomain();
} catch (error) {
return response({
statusCode: 500,
body: { message: "ERROR: osDomain env variable is required" },
body: { message: `ERROR: ${error?.message || error}` },
});
}

if (!event.body) {
return response({
statusCode: 400,
body: { message: "Event body required" },
});
}
if (!process.env.osDomain) {
return response({
statusCode: 500,
body: { message: "Handler is missing process.env.osDomain env var" },
});
}

try {
const body = JSON.parse(event.body);
Expand Down Expand Up @@ -72,12 +70,7 @@ export const handler = async (event: APIGatewayEvent) => {
}

// Now we can generate the presigned url
const url = await generatePresignedUrl(
body.bucket,
body.key,
body.filename,
60,
);
const url = await generatePresignedUrl(body.bucket, body.key, body.filename, 60);

return response<unknown>({
statusCode: 200,
Expand Down
13 changes: 4 additions & 9 deletions lib/lambda/getCpocs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ import { handleOpensearchError } from "./utils";
import { APIGatewayEvent } from "aws-lambda";
import * as os from "libs/opensearch-lib";
import { response } from "libs/handler-lib";
import { getDomainAndNamespace } from "libs/utils";

// type GetCpocsBody = object;

export const queryCpocs = async () => {
if (!process.env.osDomain) {
throw new Error("process.env.osDomain must be defined");
}
const { index, domain } = getDomainAndNamespace("cpocs");

const query = {
size: 1000,
Expand All @@ -20,11 +19,7 @@ export const queryCpocs = async () => {
},
],
};
return await os.search(
process.env.osDomain,
`${process.env.indexNamespace}cpocs`,
query,
);
return await os.search(domain, index, query);
};

export const getCpocs = async (event: APIGatewayEvent) => {
Expand All @@ -47,7 +42,7 @@ export const getCpocs = async (event: APIGatewayEvent) => {
body: result,
});
} catch (err) {
return response(handleOpensearchError(err))
return response(handleOpensearchError(err));
}
};

Expand Down
11 changes: 5 additions & 6 deletions lib/lambda/getSubTypes.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import { handleOpensearchError } from "./utils";
import { handleOpensearchError } from "./utils";
import { APIGatewayEvent } from "aws-lambda";
import { response } from "libs/handler-lib";
import * as os from "libs/opensearch-lib";
import { getDomainAndNamespace } from "libs/utils";

type GetSubTypesBody = {
authorityId: string;
typeIds: string[];
};

export const querySubTypes = async (authorityId: string, typeIds: string[]) => {
if (!process.env.osDomain) {
throw new Error("process.env.osDomain must be defined");
}
const { index, domain } = getDomainAndNamespace("subtypes");

const query = {
size: 200,
Expand Down Expand Up @@ -49,7 +48,7 @@ export const querySubTypes = async (authorityId: string, typeIds: string[]) => {
],
};

return await os.search(process.env.osDomain, `${process.env.indexNamespace}subtypes`, query);
return await os.search(domain, index, query);
};

export const getSubTypes = async (event: APIGatewayEvent) => {
Expand All @@ -74,7 +73,7 @@ export const getSubTypes = async (event: APIGatewayEvent) => {
body: result,
});
} catch (err) {
return response(handleOpensearchError(err))
return response(handleOpensearchError(err));
}
};

Expand Down
11 changes: 3 additions & 8 deletions lib/lambda/getTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ import { handleOpensearchError } from "./utils";
import { APIGatewayEvent } from "aws-lambda";
import * as os from "libs/opensearch-lib";
import { response } from "libs/handler-lib";
import { getDomainAndNamespace } from "libs/utils";

type GetTypesBody = {
authorityId: string;
};

export const queryTypes = async (authorityId: string) => {
if (!process.env.osDomain) {
throw new Error("process.env.osDomain must be defined");
}
const { index, domain } = getDomainAndNamespace("types");

const query = {
size: 200,
Expand Down Expand Up @@ -42,11 +41,7 @@ export const queryTypes = async (authorityId: string) => {
},
],
};
return await os.search(
process.env.osDomain,
`${process.env.indexNamespace}types`,
query,
);
return await os.search(domain, index, query);
};

export const getTypes = async (event: APIGatewayEvent) => {
Expand Down
6 changes: 3 additions & 3 deletions lib/lambda/itemExists.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,17 @@ describe("Handler for checking if record exists", () => {
);
});

it("should return 500 error occurs during processing", async () => {
it("should return 200 and exists: false if an error occurs during processing", async () => {
const event = {
body: JSON.stringify({ id: GET_ERROR_ITEM_ID }),
} as APIGatewayEvent;

const res = await handler(event);

expect(res).toBeTruthy();
expect(res.statusCode).toEqual(500);
expect(res.statusCode).toEqual(200);
expect(res.body).toEqual(
JSON.stringify({ error: "Internal server error", message: "Response Error" }),
JSON.stringify({ message: "No record found for the given id", exists: false }),
);
});
});
2 changes: 1 addition & 1 deletion lib/lambda/search.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ describe("getSearchData Handler", () => {
const body = JSON.parse(res.body);
expect(body).toBeTruthy();
expect(body?.hits?.hits).toBeTruthy();
expect(body?.hits?.hits?.length).toEqual(12);
expect(body?.hits?.hits?.length).toEqual(13);
});

it("should handle errors during processing", async () => {
Expand Down
13 changes: 7 additions & 6 deletions lib/lambda/search.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
import { handleOpensearchError } from "./utils";
import { APIGatewayEvent } from "aws-lambda";
import { response } from "libs/handler-lib";
import { Index } from "shared-types/opensearch";
import { BaseIndex } from "shared-types/opensearch";
import { validateEnvVariable } from "shared-utils";
import { getStateFilter } from "../libs/api/auth/user";
import { getAppkChildren } from "../libs/api/package";
import * as os from "../libs/opensearch-lib";
import { getDomainAndNamespace } from "libs/utils";

// Handler function to search index
export const getSearchData = async (event: APIGatewayEvent) => {
validateEnvVariable("osDomain");

if (!event.pathParameters || !event.pathParameters.index) {
return response({
statusCode: 400,
body: { message: "Index path parameter required" },
});
}

const { domain, index } = getDomainAndNamespace(event.pathParameters.index as BaseIndex);

try {
let query: any = {};
if (event.body) {
Expand All @@ -42,11 +47,7 @@ export const getSearchData = async (event: APIGatewayEvent) => {
query.from = query.from || 0;
query.size = query.size || 100;

const results = await os.search(
process.env.osDomain as string,
`${process.env.indexNamespace}${event.pathParameters.index}` as Index,
query,
);
const results = await os.search(domain, index, query);

for (let i = 0; i < results?.hits?.hits?.length; i++) {
if (results.hits.hits[i]._source?.appkParent) {
Expand Down
4 changes: 2 additions & 2 deletions lib/lambda/sinkChangelog.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { Handler } from "aws-lambda";
import { decodeBase64WithUtf8 } from "shared-utils";
import { KafkaEvent, KafkaRecord, opensearch } from "shared-types";
import { ErrorType, bulkUpdateDataWrapper, getTopic, logError } from "../libs/sink-lib";
import { ErrorType, bulkUpdateDataWrapper, getTopic, logError } from "libs/sink-lib";
import {
transformUpdateValuesSchema,
transformDeleteSchema,
transformedUpdateIdSchema,
} from "./update/adminChangeSchemas";
import { getPackageChangelog } from "lib/libs/api/package";
import { getPackageChangelog } from "libs/api/package";

// One notable difference between this handler and sinkMain's...
// The order in which records are processed for the changelog doesn't matter.
Expand Down
2 changes: 1 addition & 1 deletion lib/lambda/sinkMain.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { describe, it, expect, vi, afterEach } from "vitest";
import { handler } from "./sinkMain";
import * as sinkMainProcessors from "./sinkMainProcessors";
import { KafkaEvent } from "lib/packages/shared-types";
import { KafkaEvent } from "shared-types";

const createKafkaEvent = (records: KafkaEvent["records"]) => ({
eventSource: "SelfManagedKafka",
Expand Down
2 changes: 1 addition & 1 deletion lib/lambda/sinkMainProcessors.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
import * as sinkLib from "libs";
import { Document, seatool } from "shared-types/opensearch/main";
import { offsetToUtc } from "shared-utils";
import { KafkaRecord } from "lib/packages/shared-types";
import { KafkaRecord } from "shared-types";

const convertObjToBase64 = (obj: object) => Buffer.from(JSON.stringify(obj)).toString("base64");

Expand Down
6 changes: 3 additions & 3 deletions lib/lambda/update/getPackageType.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { response } from "lib/libs/handler-lib";
import { events } from "lib/packages/shared-types";
import { getPackageChangelog } from "lib/libs/api/package";
import { response } from "libs/handler-lib";
import { events } from "shared-types";
import { getPackageChangelog } from "libs/api/package";

export const getPackageType = async (packageId: string) => {
// use event of current package to determine how ID should be formatted
Expand Down
2 changes: 1 addition & 1 deletion lib/lambda/update/updatePackage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { getPackage } from "libs/api/package";
import { produceMessage } from "libs/api/kafka";
import { ItemResult } from "shared-types/opensearch/main";
import { getPackageType } from "./getPackageType";
import { events } from "lib/packages/shared-types";
import { events } from "shared-types";
import { z } from "zod";

const sendDeleteMessage = async (packageId: string) => {
Expand Down
Loading

0 comments on commit cdfae2a

Please sign in to comment.