diff --git a/lib/lambda/getPackageActions.test.ts b/lib/lambda/getPackageActions.test.ts index 3d77dbdfc..373d58b1b 100644 --- a/lib/lambda/getPackageActions.test.ts +++ b/lib/lambda/getPackageActions.test.ts @@ -19,6 +19,19 @@ describe("getPackageActions Handler", () => { expect(res.statusCode).toEqual(400); }); + it("should return 500 if event body is invalid", async () => { + const event = { + body: {}, + requestContext: getRequestContext(), + } as APIGatewayEvent; + + const res = await handler(event); + + expect(res).toBeTruthy(); + expect(res.statusCode).toEqual(500); + expect(res.body).toEqual(JSON.stringify({ message: "Internal server error" })); + }); + it("should return 401 if not authorized to view resources from the state", async () => { const event = { body: JSON.stringify({ id: HI_TEST_ITEM_ID }), diff --git a/lib/lambda/processEmails.ts b/lib/lambda/processEmails.ts index 0d62cd5e0..29a86325a 100644 --- a/lib/lambda/processEmails.ts +++ b/lib/lambda/processEmails.ts @@ -175,6 +175,11 @@ export async function processAndSendEmails(record: any, id: string, config: Proc const sec = await getSecret(config.emailAddressLookupSecretName); const item = await os.getItem(config.osDomain, getNamespace("main"), id); + if (!item?.found || !item?._source) { + console.log(`The package was not found for id: ${id}. Doing nothing.`); + return; + } + const cpocEmail = [...getCpocEmail(item)]; const srtEmails = [...getSrtEmails(item)]; diff --git a/lib/lambda/sinkCpocs.test.ts b/lib/lambda/sinkCpocs.test.ts index f03453de0..147ef783c 100644 --- a/lib/lambda/sinkCpocs.test.ts +++ b/lib/lambda/sinkCpocs.test.ts @@ -9,8 +9,12 @@ import { createKafkaRecord, OPENSEARCH_DOMAIN, OPENSEARCH_INDEX_NAMESPACE, + rateLimitBulkUpdateDataHandler, + errorBulkUpdateDataHandler, } from "mocks"; +import { mockedServiceServer as mockedServer } from "mocks/server"; import cpocs, { MUHAMMAD_BASHAR_ID } from "mocks/data/cpocs"; +import { Client } from "@opensearch-project/opensearch"; const OPENSEARCH_INDEX = `${OPENSEARCH_INDEX_NAMESPACE}cpocs`; const TOPIC = "--mako--branch-name--aws.seatool.debezium.cdc.SEA.dbo.Officers"; @@ -221,4 +225,68 @@ describe("test sync cpoc", () => { }, ]); }); + + it("should succeed after receiving a rate limit exceeded error", async () => { + const osBulkSpy = vi.spyOn(Client.prototype, "bulk"); + mockedServer.use(rateLimitBulkUpdateDataHandler); + + await handler( + createKafkaEvent({ + [`${TOPIC}-xyz`]: [ + createKafkaRecord({ + topic: `${TOPIC}-xyz`, + key: MUHAMMAD_BASHAR_KEY, + value: convertObjToBase64({ + payload: { + after: { + Officer_ID: MUHAMMAD_BASHAR_ID, + First_Name: MUHAMMAD_BASHAR._source?.firstName, + Last_Name: MUHAMMAD_BASHAR._source?.lastName, + Email: MUHAMMAD_BASHAR._source?.email, + }, + }, + }), + }), + ], + }), + {} as Context, + vi.fn(), + ); + + expect(bulkUpdateDataSpy).toHaveBeenCalledWith(OPENSEARCH_DOMAIN, OPENSEARCH_INDEX, [ + { + ...MUHAMMAD_BASHAR._source, + }, + ]); + expect(osBulkSpy).toHaveBeenCalledTimes(2); + }); + + it("should succeed after receiving a rate limit exceeded error", async () => { + mockedServer.use(errorBulkUpdateDataHandler); + + await expect(() => + handler( + createKafkaEvent({ + [`${TOPIC}-xyz`]: [ + createKafkaRecord({ + topic: `${TOPIC}-xyz`, + key: MUHAMMAD_BASHAR_KEY, + value: convertObjToBase64({ + payload: { + after: { + Officer_ID: MUHAMMAD_BASHAR_ID, + First_Name: MUHAMMAD_BASHAR._source?.firstName, + Last_Name: MUHAMMAD_BASHAR._source?.lastName, + Email: MUHAMMAD_BASHAR._source?.email, + }, + }, + }), + }), + ], + }), + {} as Context, + vi.fn(), + ), + ).rejects.toThrowError("Response Error"); + }); }); diff --git a/lib/lambda/sinkInsights.test.ts b/lib/lambda/sinkInsights.test.ts new file mode 100644 index 000000000..fa2eac396 --- /dev/null +++ b/lib/lambda/sinkInsights.test.ts @@ -0,0 +1,136 @@ +import { describe, expect, it, vi, afterEach } from "vitest"; +import { handler } from "./sinkInsights"; +import { Context } from "aws-lambda"; +import * as os from "libs/opensearch-lib"; +import * as sink from "../libs/sink-lib"; +import { + convertObjToBase64, + createKafkaEvent, + createKafkaRecord, + OPENSEARCH_DOMAIN, + OPENSEARCH_INDEX_NAMESPACE, +} from "mocks"; + +const OPENSEARCH_INDEX = `${OPENSEARCH_INDEX_NAMESPACE}insights`; +const TEST_INSIGHT_ID = "42"; +const TEST_INSIGHT_KEY = Buffer.from(TEST_INSIGHT_ID).toString("base64"); +const TOPIC = `--mako--branch-name--aws.seatool.ksql.onemac.three.agg.State_Plan-${TEST_INSIGHT_ID}`; + +describe("test sync types", () => { + const bulkUpdateDataSpy = vi.spyOn(os, "bulkUpdateData"); + const logErrorSpy = vi.spyOn(sink, "logError"); + + afterEach(() => { + vi.clearAllMocks(); + }); + + it("should throw an error if the topic is undefined", async () => { + await expect(() => + handler( + createKafkaEvent({ + undefined: [], + }), + {} as Context, + vi.fn(), + ), + ).rejects.toThrowError("topic (undefined) is invalid"); + + expect(logErrorSpy).toHaveBeenCalledWith({ type: sink.ErrorType.BADTOPIC }); + expect(logErrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + type: sink.ErrorType.UNKNOWN, + }), + ); + }); + + it("should throw an error if the topic is invalid", async () => { + await expect(() => + handler( + createKafkaEvent({ + "invalid-topic": [], + }), + {} as Context, + vi.fn(), + ), + ).rejects.toThrowError("topic (invalid-topic) is invalid"); + + expect(logErrorSpy).toHaveBeenCalledWith({ type: sink.ErrorType.BADTOPIC }); + expect(logErrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + type: sink.ErrorType.UNKNOWN, + }), + ); + }); + + it("should skip if the key is invalid", async () => { + await handler( + createKafkaEvent({ + [TOPIC]: [ + createKafkaRecord({ + topic: TOPIC, + // @ts-expect-error + key: undefined, + value: convertObjToBase64({ + test: "value", + }), + }), + ], + }), + {} as Context, + vi.fn(), + ); + + expect(bulkUpdateDataSpy).toHaveBeenCalledWith(OPENSEARCH_DOMAIN, OPENSEARCH_INDEX, []); + expect(logErrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + type: sink.ErrorType.BADPARSE, + }), + ); + }); + + it("should skip if the record has no value", async () => { + await handler( + createKafkaEvent({ + [TOPIC]: [ + createKafkaRecord({ + topic: TOPIC, + key: TEST_INSIGHT_KEY, + // @ts-expect-error needs to be undefined for test + value: undefined, + }), + ], + }), + {} as Context, + vi.fn(), + ); + + expect(bulkUpdateDataSpy).toHaveBeenCalledWith(OPENSEARCH_DOMAIN, OPENSEARCH_INDEX, []); + expect(logErrorSpy).not.toHaveBeenCalled(); + }); + + it("should handle a valid record", async () => { + await handler( + createKafkaEvent({ + [TOPIC]: [ + createKafkaRecord({ + topic: TOPIC, + key: TEST_INSIGHT_KEY, + value: convertObjToBase64({ + test: "value", + }), + }), + ], + }), + {} as Context, + vi.fn(), + ); + + expect(bulkUpdateDataSpy).toHaveBeenCalledWith(OPENSEARCH_DOMAIN, OPENSEARCH_INDEX, [ + { + id: TEST_INSIGHT_ID, + test: "value", + }, + ]); + expect(logErrorSpy).not.toHaveBeenCalled(); + }); +}); diff --git a/lib/lambda/sinkInsights.ts b/lib/lambda/sinkInsights.ts index 9b3e0c27e..54726c64a 100644 --- a/lib/lambda/sinkInsights.ts +++ b/lib/lambda/sinkInsights.ts @@ -9,12 +9,12 @@ export const handler: Handler = async (event) => { for (const topicPartition of Object.keys(event.records)) { const topic = getTopic(topicPartition); switch (topic) { - case undefined: - logError({ type: ErrorType.BADTOPIC }); - throw new Error(); case "aws.seatool.ksql.onemac.three.agg.State_Plan": await ksql(event.records[topicPartition], topicPartition); break; + default: + logError({ type: ErrorType.BADTOPIC }); + throw new Error(`topic (${topicPartition}) is invalid`); } } } catch (error) { @@ -30,7 +30,7 @@ const ksql = async (kafkaRecords: KafkaRecord[], topicPartition: string) => { try { if (!value) continue; - const id: string = JSON.parse(decodeBase64WithUtf8(key)); + const id: string = decodeBase64WithUtf8(key); const record = JSON.parse(decodeBase64WithUtf8(value)); docs.push({ ...record, id }); } catch (error) { diff --git a/lib/lambda/sinkLegacyInsights.test.ts b/lib/lambda/sinkLegacyInsights.test.ts new file mode 100644 index 000000000..ce223ee34 --- /dev/null +++ b/lib/lambda/sinkLegacyInsights.test.ts @@ -0,0 +1,210 @@ +import { describe, expect, it, vi, afterEach } from "vitest"; +import { handler } from "./sinkLegacyInsights"; +import { Context } from "aws-lambda"; +import * as os from "libs/opensearch-lib"; +import * as sink from "../libs/sink-lib"; +import { + convertObjToBase64, + createKafkaEvent, + createKafkaRecord, + OPENSEARCH_DOMAIN, + OPENSEARCH_INDEX_NAMESPACE, +} from "mocks"; + +const OPENSEARCH_INDEX = `${OPENSEARCH_INDEX_NAMESPACE}legacyinsights`; +const TEST_INSIGHT_ID = "42"; +const TEST_INSIGHT_KEY = Buffer.from(TEST_INSIGHT_ID).toString("base64"); +const TOPIC = `--mako--branch-name--aws.onemac.migration.cdc-${TEST_INSIGHT_ID}`; + +describe("test sync types", () => { + const bulkUpdateDataSpy = vi.spyOn(os, "bulkUpdateData"); + const logErrorSpy = vi.spyOn(sink, "logError"); + + afterEach(() => { + vi.clearAllMocks(); + }); + + it("should throw an error if the topic is undefined", async () => { + await expect(() => + handler( + createKafkaEvent({ + undefined: [], + }), + {} as Context, + vi.fn(), + ), + ).rejects.toThrowError("topic (undefined) is invalid"); + + expect(logErrorSpy).toHaveBeenCalledWith({ type: sink.ErrorType.BADTOPIC }); + expect(logErrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + type: sink.ErrorType.UNKNOWN, + }), + ); + }); + + it("should throw an error if the topic is invalid", async () => { + await expect(() => + handler( + createKafkaEvent({ + "invalid-topic": [], + }), + {} as Context, + vi.fn(), + ), + ).rejects.toThrowError("topic (invalid-topic) is invalid"); + + expect(logErrorSpy).toHaveBeenCalledWith({ type: sink.ErrorType.BADTOPIC }); + expect(logErrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + type: sink.ErrorType.UNKNOWN, + }), + ); + }); + + it("should skip if the key is invalid", async () => { + await handler( + createKafkaEvent({ + [TOPIC]: [ + createKafkaRecord({ + topic: TOPIC, + // @ts-expect-error + key: undefined, + value: convertObjToBase64({ + test: "value", + }), + }), + ], + }), + {} as Context, + vi.fn(), + ); + + expect(bulkUpdateDataSpy).toHaveBeenCalledWith(OPENSEARCH_DOMAIN, OPENSEARCH_INDEX, []); + expect(logErrorSpy).toHaveBeenCalledWith( + expect.objectContaining({ + type: sink.ErrorType.BADPARSE, + }), + ); + }); + + it("should delete the record if the value is undefined", async () => { + await handler( + createKafkaEvent({ + [TOPIC]: [ + createKafkaRecord({ + topic: TOPIC, + key: TEST_INSIGHT_KEY, + // @ts-expect-error needs to be undefined for test + value: undefined, + }), + ], + }), + {} as Context, + vi.fn(), + ); + + expect(bulkUpdateDataSpy).toHaveBeenCalledWith(OPENSEARCH_DOMAIN, OPENSEARCH_INDEX, [ + { + id: TEST_INSIGHT_ID, + hardDeletedFromLegacy: true, + }, + ]); + expect(logErrorSpy).not.toHaveBeenCalled(); + }); + + it("should skip if the record does not have sk field", async () => { + await handler( + createKafkaEvent({ + [TOPIC]: [ + createKafkaRecord({ + topic: TOPIC, + key: TEST_INSIGHT_KEY, + value: convertObjToBase64({ + test: "value", + }), + }), + ], + }), + {} as Context, + vi.fn(), + ); + + expect(bulkUpdateDataSpy).toHaveBeenCalledWith(OPENSEARCH_DOMAIN, OPENSEARCH_INDEX, []); + expect(logErrorSpy).not.toHaveBeenCalled(); + }); + + it("should handle a valid record for package change", async () => { + await handler( + createKafkaEvent({ + [TOPIC]: [ + createKafkaRecord({ + topic: TOPIC, + key: TEST_INSIGHT_KEY, + value: convertObjToBase64({ + test: "value", + sk: "Package", + }), + offset: 0, + }), + ], + }), + {} as Context, + vi.fn(), + ); + + expect(bulkUpdateDataSpy).toHaveBeenCalledWith(OPENSEARCH_DOMAIN, OPENSEARCH_INDEX, [ + { + id: TEST_INSIGHT_ID, + test: "value", + sk: "Package", + approvedEffectiveDate: null, + changedDate: null, + finalDispositionDate: null, + proposedDate: null, + proposedEffectiveDate: null, + statusDate: null, + submissionDate: null, + hardDeletedFromLegacy: null, + }, + ]); + expect(logErrorSpy).not.toHaveBeenCalled(); + }); + + it("should handle a valid record for offset change", async () => { + await handler( + createKafkaEvent({ + [TOPIC]: [ + createKafkaRecord({ + topic: TOPIC, + key: TEST_INSIGHT_KEY, + value: convertObjToBase64({ + test: "value", + sk: "Offset", + }), + offset: 3, + }), + ], + }), + {} as Context, + vi.fn(), + ); + + expect(bulkUpdateDataSpy).toHaveBeenCalledWith(OPENSEARCH_DOMAIN, OPENSEARCH_INDEX, [ + { + id: "3", + test: "value", + sk: "Offset", + approvedEffectiveDate: null, + changedDate: null, + finalDispositionDate: null, + proposedDate: null, + proposedEffectiveDate: null, + statusDate: null, + submissionDate: null, + hardDeletedFromLegacy: null, + }, + ]); + expect(logErrorSpy).not.toHaveBeenCalled(); + }); +}); diff --git a/lib/lambda/sinkLegacyInsights.ts b/lib/lambda/sinkLegacyInsights.ts index 023238f32..913908b99 100644 --- a/lib/lambda/sinkLegacyInsights.ts +++ b/lib/lambda/sinkLegacyInsights.ts @@ -9,12 +9,12 @@ export const handler: Handler = async (event) => { for (const topicPartition of Object.keys(event.records)) { const topic = getTopic(topicPartition); switch (topic) { - case undefined: - logError({ type: ErrorType.BADTOPIC }); - throw new Error(); case "aws.onemac.migration.cdc": await onemac(event.records[topicPartition], topicPartition); break; + default: + logError({ type: ErrorType.BADTOPIC }); + throw new Error(`topic (${topicPartition}) is invalid`); } } } catch (error) { diff --git a/lib/lambda/sinkMainProcessors.test.ts b/lib/lambda/sinkMainProcessors.test.ts index 2f0dae253..a51070692 100644 --- a/lib/lambda/sinkMainProcessors.test.ts +++ b/lib/lambda/sinkMainProcessors.test.ts @@ -14,9 +14,12 @@ import { OPENSEARCH_INDEX_NAMESPACE, TEST_ITEM_ID, EXISTING_ITEM_TEMPORARY_EXTENSION_ID, + NOT_FOUND_ITEM_ID, convertObjToBase64, createKafkaRecord, + errorMainMultiDocumentHandler, } from "mocks"; +import { mockedServiceServer as mockedServer } from "mocks/server"; import { appkBase, capitatedInitial, @@ -613,6 +616,119 @@ describe("insertNewSeatoolRecordsFromKafkaIntoMako", () => { ]); }); + it("outputs kafka records into mako records if mako record is not found", async () => { + await insertNewSeatoolRecordsFromKafkaIntoMako( + [ + createKafkaRecord({ + topic: TOPIC, + key: Buffer.from(NOT_FOUND_ITEM_ID).toString("base64"), + value: convertObjToBase64({ + id: NOT_FOUND_ITEM_ID, + ACTION_OFFICERS: [ + { + FIRST_NAME: "John", + LAST_NAME: "Doe", + EMAIL: "john.doe@medicaid.gov", + OFFICER_ID: 12345, + DEPARTMENT: "State Plan Review", + PHONE: "202-555-1234", + }, + { + FIRST_NAME: "Emily", + LAST_NAME: "Rodriguez", + EMAIL: "emily.rodriguez@medicaid.gov", + OFFICER_ID: 12346, + DEPARTMENT: "Compliance Division", + PHONE: "202-555-5678", + }, + ], + LEAD_ANALYST: [ + { + FIRST_NAME: "Michael", + LAST_NAME: "Chen", + EMAIL: "michael.chen@cms.hhs.gov", + OFFICER_ID: 67890, + DEPARTMENT: "Medicaid Innovation Center", + PHONE: "202-555-9012", + }, + ], + STATE_PLAN: { + PLAN_TYPE: 123, + SPW_STATUS_ID: 4, + APPROVED_EFFECTIVE_DATE: TIMESTAMP, + CHANGED_DATE: EARLIER_TIMESTAMP, + SUMMARY_MEMO: "Sample summary", + TITLE_NAME: "Sample Title", + STATUS_DATE: EARLIER_TIMESTAMP, + SUBMISSION_DATE: TIMESTAMP, + LEAD_ANALYST_ID: 67890, + ACTUAL_EFFECTIVE_DATE: null, + PROPOSED_DATE: null, + STATE_CODE: "10", + }, + RAI: [], + ACTIONTYPES: [{ ACTION_NAME: "Initial Review", ACTION_ID: 1, PLAN_TYPE_ID: 123 }], + STATE_PLAN_SERVICETYPES: [{ SPA_TYPE_ID: 1, SPA_TYPE_NAME: "Type A" }], + STATE_PLAN_SERVICE_SUBTYPES: [{ TYPE_ID: 1, TYPE_NAME: "SubType X" }], + }), + }), + ], + TOPIC, + ); + + expect(bulkUpdateDataSpy).toBeCalledWith(OPENSEARCH_DOMAIN, OPENSEARCH_INDEX, [ + { + actionType: "Initial Review", + approvedEffectiveDate: ISO_DATETIME, + authority: "1915(c)", + changed_date: EARLIER_TIMESTAMP, + cmsStatus: "Approved", + description: "Sample summary", + finalDispositionDate: EARLIER_ISO_DATETIME, + id: NOT_FOUND_ITEM_ID, + initialIntakeNeeded: false, + leadAnalystEmail: "michael.chen@cms.hhs.gov", + leadAnalystName: "Michael Chen", + leadAnalystOfficerId: 67890, + locked: false, + proposedDate: null, + raiReceivedDate: null, + raiRequestedDate: null, + raiWithdrawEnabled: false, + raiWithdrawnDate: null, + reviewTeam: [ + { + email: "john.doe@medicaid.gov", + name: "John Doe", + }, + { + email: "emily.rodriguez@medicaid.gov", + name: "Emily Rodriguez", + }, + ], + seatoolStatus: "Approved", + secondClock: false, + state: "10", + stateStatus: "Approved", + statusDate: EARLIER_ISO_DATETIME, + subTypes: [ + { + TYPE_ID: 1, + TYPE_NAME: "SubType X", + }, + ], + subject: "Sample Title", + submissionDate: ISO_DATETIME, + types: [ + { + SPA_TYPE_ID: 1, + SPA_TYPE_NAME: "Type A", + }, + ], + }, + ]); + }); + it("outputs kafka records into mako records without changedDates", async () => { await insertNewSeatoolRecordsFromKafkaIntoMako( [ @@ -726,6 +842,121 @@ describe("insertNewSeatoolRecordsFromKafkaIntoMako", () => { ]); }); + it("handles errors in getting mako timestamps", async () => { + mockedServer.use(errorMainMultiDocumentHandler); + + await insertNewSeatoolRecordsFromKafkaIntoMako( + [ + createKafkaRecord({ + topic: TOPIC, + key: TEST_ITEM_KEY, + value: convertObjToBase64({ + id: TEST_ITEM_ID, + ACTION_OFFICERS: [ + { + FIRST_NAME: "John", + LAST_NAME: "Doe", + EMAIL: "john.doe@medicaid.gov", + OFFICER_ID: 12345, + DEPARTMENT: "State Plan Review", + PHONE: "202-555-1234", + }, + { + FIRST_NAME: "Emily", + LAST_NAME: "Rodriguez", + EMAIL: "emily.rodriguez@medicaid.gov", + OFFICER_ID: 12346, + DEPARTMENT: "Compliance Division", + PHONE: "202-555-5678", + }, + ], + LEAD_ANALYST: [ + { + FIRST_NAME: "Michael", + LAST_NAME: "Chen", + EMAIL: "michael.chen@cms.hhs.gov", + OFFICER_ID: 67890, + DEPARTMENT: "Medicaid Innovation Center", + PHONE: "202-555-9012", + }, + ], + STATE_PLAN: { + PLAN_TYPE: 123, + SPW_STATUS_ID: 4, + APPROVED_EFFECTIVE_DATE: TIMESTAMP, + CHANGED_DATE: EARLIER_TIMESTAMP, + SUMMARY_MEMO: "Sample summary", + TITLE_NAME: "Sample Title", + STATUS_DATE: EARLIER_TIMESTAMP, + SUBMISSION_DATE: TIMESTAMP, + LEAD_ANALYST_ID: 67890, + ACTUAL_EFFECTIVE_DATE: null, + PROPOSED_DATE: null, + STATE_CODE: "10", + }, + RAI: [], + ACTIONTYPES: [{ ACTION_NAME: "Initial Review", ACTION_ID: 1, PLAN_TYPE_ID: 123 }], + STATE_PLAN_SERVICETYPES: [{ SPA_TYPE_ID: 1, SPA_TYPE_NAME: "Type A" }], + STATE_PLAN_SERVICE_SUBTYPES: [{ TYPE_ID: 1, TYPE_NAME: "SubType X" }], + }), + }), + ], + TOPIC, + ); + + expect(bulkUpdateDataSpy).toBeCalledWith(OPENSEARCH_DOMAIN, OPENSEARCH_INDEX, [ + { + actionType: "Initial Review", + approvedEffectiveDate: ISO_DATETIME, + authority: "1915(c)", + changed_date: EARLIER_TIMESTAMP, + cmsStatus: "Approved", + description: "Sample summary", + finalDispositionDate: EARLIER_ISO_DATETIME, + id: TEST_ITEM_ID, + initialIntakeNeeded: false, + leadAnalystEmail: "michael.chen@cms.hhs.gov", + leadAnalystName: "Michael Chen", + leadAnalystOfficerId: 67890, + locked: false, + proposedDate: null, + raiReceivedDate: null, + raiRequestedDate: null, + raiWithdrawEnabled: false, + raiWithdrawnDate: null, + reviewTeam: [ + { + email: "john.doe@medicaid.gov", + name: "John Doe", + }, + { + email: "emily.rodriguez@medicaid.gov", + name: "Emily Rodriguez", + }, + ], + seatoolStatus: "Approved", + secondClock: false, + state: "10", + stateStatus: "Approved", + statusDate: EARLIER_ISO_DATETIME, + subTypes: [ + { + TYPE_ID: 1, + TYPE_NAME: "SubType X", + }, + ], + subject: "Sample Title", + submissionDate: ISO_DATETIME, + types: [ + { + SPA_TYPE_ID: 1, + SPA_TYPE_NAME: "Type A", + }, + ], + }, + ]); + }); + it("skips newer mako records", async () => { await insertNewSeatoolRecordsFromKafkaIntoMako( [ diff --git a/lib/lambda/submit/submissionPayloads/respond-to-rai.ts b/lib/lambda/submit/submissionPayloads/respond-to-rai.ts index 0d1827e53..3fd5ca4a4 100644 --- a/lib/lambda/submit/submissionPayloads/respond-to-rai.ts +++ b/lib/lambda/submit/submissionPayloads/respond-to-rai.ts @@ -31,7 +31,7 @@ export const respondToRai = async (event: APIGatewayEvent) => { const transformedData = events["respond-to-rai"].schema.parse({ ...parsedResult.data, - authority: item?._source.authority, + authority: item?._source?.authority, submitterName, submitterEmail, timestamp: Date.now(), diff --git a/lib/lambda/update/updatePackage.test.ts b/lib/lambda/update/updatePackage.test.ts index 89e4514a6..df208f9ae 100644 --- a/lib/lambda/update/updatePackage.test.ts +++ b/lib/lambda/update/updatePackage.test.ts @@ -212,10 +212,13 @@ describe("handler", () => { } as APIGatewayEvent; const result = await handler(noActionevent); - const expectedResult = "Cannot read properties of undefined (reading 'baseSchema')"; + expect(result?.statusCode).toStrictEqual(500); - expect(result?.body.message).toStrictEqual(expectedResult); + expect(result?.body).toStrictEqual({ + message: "Cannot read properties of undefined (reading 'baseSchema')", + }); }); + it("should fail to update a package - no topic name ", async () => { process.env.topicName = ""; const noActionevent = { diff --git a/lib/libs/api/package/itemExists.ts b/lib/libs/api/package/itemExists.ts index c5172f161..0610657f7 100644 --- a/lib/libs/api/package/itemExists.ts +++ b/lib/libs/api/package/itemExists.ts @@ -14,7 +14,7 @@ export async function itemExists(params: { : getNamespace("main"); const packageResult = await os.getItem(domain, index, params.id); - return !!packageResult?._source; + return packageResult?._source !== undefined && packageResult?._source !== null; } catch (error) { console.error(error); return false; diff --git a/lib/libs/handler-lib.ts b/lib/libs/handler-lib.ts index 8abfce1ad..7a13363ca 100644 --- a/lib/libs/handler-lib.ts +++ b/lib/libs/handler-lib.ts @@ -1,28 +1,3 @@ -import type { - APIGatewayEvent, - APIGatewayProxyResult, - Context, -} from "aws-lambda"; - -export const handler = async ( - handler: ( - event?: APIGatewayEvent, - context?: Context, - ) => Promise, -) => { - const handlerResponse = await handler(); - - const response: APIGatewayProxyResult = { - headers: { - "Access-Control-Allow-Headers": "Content-Type", - "Access-Control-Allow-Origin": "*", - "Access-Control-Allow-Methods": "OPTIONS,POST,GET,PUT,DELETE", - }, - ...handlerResponse, - }; - return () => response; -}; - export function response( currentResponse: { statusCode?: number; diff --git a/lib/libs/opensearch-lib.test.ts b/lib/libs/opensearch-lib.test.ts new file mode 100644 index 000000000..791caccbc --- /dev/null +++ b/lib/libs/opensearch-lib.test.ts @@ -0,0 +1,38 @@ +import { describe, it, expect, vi } from "vitest"; +import { updateFieldMapping, decodeUtf8 } from "./opensearch-lib"; +import { + OPENSEARCH_DOMAIN, + OPENSEARCH_INDEX_NAMESPACE, + errorUpdateFieldMappingHandler, +} from "mocks"; +import { mockedServiceServer as mockedServer } from "mocks/server"; + +const OPENSEARCH_INDEX = `${OPENSEARCH_INDEX_NAMESPACE}main`; + +describe("opensearch-lib tests", () => { + describe("updateFieldMapping tests", () => { + it("should handle a server error when updating a field mapping", async () => { + mockedServer.use(errorUpdateFieldMappingHandler); + + await expect(() => + updateFieldMapping(OPENSEARCH_DOMAIN, OPENSEARCH_INDEX, { + throw: "error", + }), + ).rejects.toThrowError("Response Error"); + }); + }); + + describe("decodeUtf8 tests", () => { + it("should handle decoding an invalid string", () => { + vi.stubGlobal("decodeURIComponent", () => { + throw new Error("Bad format"); + }); + + const value = "test%20value%%"; + const decoded = decodeUtf8(value); + expect(decoded).toEqual(value); + + vi.unstubAllGlobals(); + }); + }); +}); diff --git a/lib/libs/opensearch-lib.ts b/lib/libs/opensearch-lib.ts index 521b17d96..397172bb3 100644 --- a/lib/libs/opensearch-lib.ts +++ b/lib/libs/opensearch-lib.ts @@ -180,20 +180,13 @@ export async function getItem( index: opensearch.Index, id: string, ): Promise { - try { - client = client || (await getClient(host)); - const response = await client.get({ id, index }); - return decodeUtf8(response).body; - } catch (error) { - if ( - (error instanceof OpensearchErrors.ResponseError && error.statusCode === 404) || - error.meta?.statusCode === 404 - ) { - console.log("Error (404) retrieving in OpenSearch:", error); - return undefined; - } - throw error; + client = client || (await getClient(host)); + const response = await client.get({ id, index }); + const item = decodeUtf8(response).body; + if (item.found === false || !item._source) { + return undefined; } + return item; } export async function getItems(ids: string[]): Promise { @@ -210,18 +203,12 @@ export async function getItems(ids: string[]): Promise { }); return response.body.docs.reduce((acc, doc) => { - if (doc.found && doc._source) { - try { - return acc.concat(doc._source); - } catch (e) { - console.error(`Failed to parse JSON for document with ID ${doc._id}:`, e); - return acc; - } + if (doc && doc.found && doc._source) { + return acc.concat(doc._source); } else { console.error(`Document with ID ${doc._id} not found.`); + return acc; } - - return acc; }, []); } catch (e) { console.log({ e }); @@ -264,7 +251,7 @@ export async function updateFieldMapping( } } -function decodeUtf8(data: any): any { +export function decodeUtf8(data: any): any { if (typeof data === "string") { try { return decodeURIComponent(escape(data)); diff --git a/mocks/handlers/opensearch/index.ts b/mocks/handlers/opensearch/index.ts index 5d0c78199..48e8c921e 100644 --- a/mocks/handlers/opensearch/index.ts +++ b/mocks/handlers/opensearch/index.ts @@ -21,8 +21,10 @@ export { errorCreateIndexHandler, errorUpdateFieldMappingHandler, errorBulkUpdateDataHandler, + rateLimitBulkUpdateDataHandler, errorDeleteIndexHandler, } from "./indices"; +export { errorMainMultiDocumentHandler } from "./main"; export { errorSecurityRolesMappingHandler } from "./security"; export { errorSubtypeSearchHandler } from "./subtypes"; export { errorTypeSearchHandler } from "./types"; diff --git a/mocks/handlers/opensearch/indices.ts b/mocks/handlers/opensearch/indices.ts index 6a40734f8..ea306e18d 100644 --- a/mocks/handlers/opensearch/indices.ts +++ b/mocks/handlers/opensearch/indices.ts @@ -30,6 +30,12 @@ const defaultBulkUpdateDataHandler = http.post( () => new HttpResponse(null, { status: 200 }), ); +export const rateLimitBulkUpdateDataHandler = http.post( + "https://vpc-opensearchdomain-mock-domain.us-east-1.es.amazonaws.com/_bulk", + () => new HttpResponse("Rate limit exceeded", { status: 429 }), + { once: true }, +); + export const errorBulkUpdateDataHandler = http.post( "https://vpc-opensearchdomain-mock-domain.us-east-1.es.amazonaws.com/_bulk", () => new HttpResponse("Internal server error", { status: 500 }), diff --git a/mocks/handlers/opensearch/main.ts b/mocks/handlers/opensearch/main.ts index fdcd6efb7..eae507d28 100644 --- a/mocks/handlers/opensearch/main.ts +++ b/mocks/handlers/opensearch/main.ts @@ -42,6 +42,11 @@ const defaultMainMultiDocumentHandler = http.post( }, ); +export const errorMainMultiDocumentHandler = http.post( + "https://vpc-opensearchdomain-mock-domain.us-east-1.es.amazonaws.com/test-namespace-main/_mget", + () => new HttpResponse("Internal server error", { status: 500 }), +); + const defaultMainSearchHandler = http.post( "https://vpc-opensearchdomain-mock-domain.us-east-1.es.amazonaws.com/test-namespace-main/_search", async ({ request }) => { diff --git a/vitest.config.ts b/vitest.config.ts index b1518d5c6..18ce87308 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -16,6 +16,7 @@ export default defineConfig({ "build_run", ".cdk", "docs/**", + "**/vitest.setup.ts", "lib/libs/webforms/**", "lib/libs/email/mock-data/**", "react-app/src/features/webforms/**",