Skip to content

Commit

Permalink
fix(sinkSeatool tolerance): Make sinkSeatool more tolerant of process…
Browse files Browse the repository at this point in the history
…ing errors (#368)

* Try/Catch for the sink... essentially ignoring malformed test records, where the key was undefined

* adding try catch for changelog...
  • Loading branch information
mdial89f authored Feb 7, 2024
1 parent d5c9e6a commit decac40
Showing 1 changed file with 94 additions and 85 deletions.
179 changes: 94 additions & 85 deletions src/services/data/handlers/sinkOnemac.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,81 +19,85 @@ export const handler: Handler<KafkaEvent> = async (event) => {
export const onemac_main = async (event: KafkaEvent) => {
const records = Object.values(event.records).reduce((ACC, RECORDS) => {
RECORDS.forEach((REC) => {
const id: string = decode(REC.key);

// Handle deletes and return
if (!REC.value) {
ACC.push({
id,
additionalInformation: null,
raiWithdrawEnabled: null,
attachments: null,
submitterEmail: null,
submitterName: null,
});
return;
}

const record = JSON.parse(decode(REC.value));

// Handle legacy and return
if (record?.origin !== "micro") {
if (
record?.sk === "Package" && // Is a Package View
record?.submitterName && // Is originally from Legacy
record?.submitterName !== "-- --" // Is originally from Legacy
) {
const result = opensearch.main.legacySubmission
.transform(id)
.safeParse(record);

if (!result.success) {
return console.log(
"LEGACY Validation Error. The following record failed to parse: ",
JSON.stringify(record),
"Because of the following Reason(s):",
result.error.message
);
}

ACC.push(result.data);
try {
const id: string = decode(REC.key);

// Handle deletes and return
if (!REC.value) {
ACC.push({
id,
additionalInformation: null,
raiWithdrawEnabled: null,
attachments: null,
submitterEmail: null,
submitterName: null,
});
return;
}
return;
}

// Handle everything else
const result = (() => {
switch (record?.actionType) {
case undefined:
return opensearch.main.newSubmission
.transform(id)
.safeParse(record);
case Action.DISABLE_RAI_WITHDRAW:
case Action.ENABLE_RAI_WITHDRAW:
return opensearch.main.toggleWithdrawEnabled
.transform(id)
.safeParse(record);
case Action.WITHDRAW_RAI:
return opensearch.main.withdrawRai.transform(id).safeParse(record);
case Action.WITHDRAW_PACKAGE:
return opensearch.main.withdrawPackage
const record = JSON.parse(decode(REC.value));
// Handle legacy and return
if (record?.origin !== "micro") {
if (
record?.sk === "Package" && // Is a Package View
record?.submitterName && // Is originally from Legacy
record?.submitterName !== "-- --" // Is originally from Legacy
) {
const result = opensearch.main.legacySubmission
.transform(id)
.safeParse(record);
if (!result.success) {
return console.log(
"LEGACY Validation Error. The following record failed to parse: ",
JSON.stringify(record),
"Because of the following Reason(s):",
result.error.message
);
}

ACC.push(result.data);
}
return;
}
})();

if (!result) return;
// Handle everything else
const result = (() => {
switch (record?.actionType) {
case undefined:
return opensearch.main.newSubmission
.transform(id)
.safeParse(record);
case Action.DISABLE_RAI_WITHDRAW:
case Action.ENABLE_RAI_WITHDRAW:
return opensearch.main.toggleWithdrawEnabled
.transform(id)
.safeParse(record);
case Action.WITHDRAW_RAI:
return opensearch.main.withdrawRai
.transform(id)
.safeParse(record);
case Action.WITHDRAW_PACKAGE:
return opensearch.main.withdrawPackage
.transform(id)
.safeParse(record);
}
})();

if (!result?.success) {
return console.log(
"ONEMAC Validation Error. The following record failed to parse: ",
JSON.stringify(record),
"Because of the following Reason(s):",
result?.error.message
);
}
if (!result) return;

ACC.push(result.data);
if (!result?.success) {
return console.log(
"ONEMAC Validation Error. The following record failed to parse: ",
JSON.stringify(record),
"Because of the following Reason(s):",
result?.error.message
);
}

ACC.push(result.data);
} catch (error) {
console.log("SINK FAILURE: There was an error sinking a record.");
console.log("Unedited event key: " + REC.key);
}
});

return ACC;
Expand All @@ -109,23 +113,28 @@ export const onemac_main = async (event: KafkaEvent) => {
export const onemac_changelog = async (event: KafkaEvent) => {
const data = Object.values(event.records).reduce((ACC, RECORDS) => {
RECORDS.forEach((REC) => {
// Handle deletes and return
if (!REC.value) return;

const record = JSON.parse(decode(REC.value));

// Handle legacy and return
if (record?.origin !== "micro") return;

// Handle everything else
const packageId = decode(REC.key);
ACC.push({
...record,
...(!record?.actionType && { actionType: "new-submission" }), // new-submission custom actionType
timestamp: REC.timestamp,
id: `${packageId}-${REC.offset}`,
packageId,
});
try {
// Handle deletes and return
if (!REC.value) return;

const record = JSON.parse(decode(REC.value));

// Handle legacy and return
if (record?.origin !== "micro") return;

// Handle everything else
const packageId = decode(REC.key);
ACC.push({
...record,
...(!record?.actionType && { actionType: "new-submission" }), // new-submission custom actionType
timestamp: REC.timestamp,
id: `${packageId}-${REC.offset}`,
packageId,
});
} catch (error) {
console.log("SINK FAILURE: There was an error sinking a record.");
console.log("Unedited event key: " + REC.key);
}
});

return ACC;
Expand Down

0 comments on commit decac40

Please sign in to comment.