Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding tests to insight and legacyInsight #1030

Merged
merged 18 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions lib/lambda/getPackageActions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@ describe("getPackageActions Handler", () => {
expect(res.statusCode).toEqual(400);
});

it("should return 500 if event body is invalid", async () => {
const event = {
body: {},
requestContext: getRequestContext(),
} as APIGatewayEvent;

const res = await handler(event);

expect(res).toBeTruthy();
expect(res.statusCode).toEqual(500);
expect(res.body).toEqual(JSON.stringify({ message: "Internal server error" }));
});

it("should return 401 if not authorized to view resources from the state", async () => {
const event = {
body: JSON.stringify({ id: HI_TEST_ITEM_ID }),
Expand Down
5 changes: 5 additions & 0 deletions lib/lambda/processEmails.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ export async function processAndSendEmails(record: any, id: string, config: Proc
const sec = await getSecret(config.emailAddressLookupSecretName);

const item = await os.getItem(config.osDomain, getNamespace("main"), id);
if (!item?.found || !item?._source) {
console.log(`The package was not found for id: ${id}. Doing nothing.`);
return;
}

const cpocEmail = [...getCpocEmail(item)];
const srtEmails = [...getSrtEmails(item)];

Expand Down
68 changes: 68 additions & 0 deletions lib/lambda/sinkCpocs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ import {
createKafkaRecord,
OPENSEARCH_DOMAIN,
OPENSEARCH_INDEX_NAMESPACE,
rateLimitBulkUpdateDataHandler,
errorBulkUpdateDataHandler,
} from "mocks";
import { mockedServiceServer as mockedServer } from "mocks/server";
import cpocs, { MUHAMMAD_BASHAR_ID } from "mocks/data/cpocs";
import { Client } from "@opensearch-project/opensearch";

const OPENSEARCH_INDEX = `${OPENSEARCH_INDEX_NAMESPACE}cpocs`;
const TOPIC = "--mako--branch-name--aws.seatool.debezium.cdc.SEA.dbo.Officers";
Expand Down Expand Up @@ -221,4 +225,68 @@ describe("test sync cpoc", () => {
},
]);
});

it("should succeed after receiving a rate limit exceeded error", async () => {
const osBulkSpy = vi.spyOn(Client.prototype, "bulk");
mockedServer.use(rateLimitBulkUpdateDataHandler);

await handler(
createKafkaEvent({
[`${TOPIC}-xyz`]: [
createKafkaRecord({
topic: `${TOPIC}-xyz`,
key: MUHAMMAD_BASHAR_KEY,
value: convertObjToBase64({
payload: {
after: {
Officer_ID: MUHAMMAD_BASHAR_ID,
First_Name: MUHAMMAD_BASHAR._source?.firstName,
Last_Name: MUHAMMAD_BASHAR._source?.lastName,
Email: MUHAMMAD_BASHAR._source?.email,
},
},
}),
}),
],
}),
{} as Context,
vi.fn(),
);

expect(bulkUpdateDataSpy).toHaveBeenCalledWith(OPENSEARCH_DOMAIN, OPENSEARCH_INDEX, [
{
...MUHAMMAD_BASHAR._source,
},
]);
expect(osBulkSpy).toHaveBeenCalledTimes(2);
});

it("should succeed after receiving a rate limit exceeded error", async () => {
mockedServer.use(errorBulkUpdateDataHandler);

await expect(() =>
handler(
createKafkaEvent({
[`${TOPIC}-xyz`]: [
createKafkaRecord({
topic: `${TOPIC}-xyz`,
key: MUHAMMAD_BASHAR_KEY,
value: convertObjToBase64({
payload: {
after: {
Officer_ID: MUHAMMAD_BASHAR_ID,
First_Name: MUHAMMAD_BASHAR._source?.firstName,
Last_Name: MUHAMMAD_BASHAR._source?.lastName,
Email: MUHAMMAD_BASHAR._source?.email,
},
},
}),
}),
],
}),
{} as Context,
vi.fn(),
),
).rejects.toThrowError("Response Error");
});
});
136 changes: 136 additions & 0 deletions lib/lambda/sinkInsights.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import { describe, expect, it, vi, afterEach } from "vitest";
import { handler } from "./sinkInsights";
import { Context } from "aws-lambda";
import * as os from "libs/opensearch-lib";
import * as sink from "../libs/sink-lib";
import {
convertObjToBase64,
createKafkaEvent,
createKafkaRecord,
OPENSEARCH_DOMAIN,
OPENSEARCH_INDEX_NAMESPACE,
} from "mocks";

const OPENSEARCH_INDEX = `${OPENSEARCH_INDEX_NAMESPACE}insights`;
const TEST_INSIGHT_ID = "42";
const TEST_INSIGHT_KEY = Buffer.from(TEST_INSIGHT_ID).toString("base64");
const TOPIC = `--mako--branch-name--aws.seatool.ksql.onemac.three.agg.State_Plan-${TEST_INSIGHT_ID}`;

describe("test sync types", () => {
const bulkUpdateDataSpy = vi.spyOn(os, "bulkUpdateData");
const logErrorSpy = vi.spyOn(sink, "logError");

afterEach(() => {
vi.clearAllMocks();
});

it("should throw an error if the topic is undefined", async () => {
await expect(() =>
handler(
createKafkaEvent({
undefined: [],
}),
{} as Context,
vi.fn(),
),
).rejects.toThrowError("topic (undefined) is invalid");

expect(logErrorSpy).toHaveBeenCalledWith({ type: sink.ErrorType.BADTOPIC });
expect(logErrorSpy).toHaveBeenCalledWith(
expect.objectContaining({
type: sink.ErrorType.UNKNOWN,
}),
);
});

it("should throw an error if the topic is invalid", async () => {
await expect(() =>
handler(
createKafkaEvent({
"invalid-topic": [],
}),
{} as Context,
vi.fn(),
),
).rejects.toThrowError("topic (invalid-topic) is invalid");

expect(logErrorSpy).toHaveBeenCalledWith({ type: sink.ErrorType.BADTOPIC });
expect(logErrorSpy).toHaveBeenCalledWith(
expect.objectContaining({
type: sink.ErrorType.UNKNOWN,
}),
);
});

it("should skip if the key is invalid", async () => {
await handler(
createKafkaEvent({
[TOPIC]: [
createKafkaRecord({
topic: TOPIC,
// @ts-expect-error
key: undefined,
value: convertObjToBase64({
test: "value",
}),
}),
],
}),
{} as Context,
vi.fn(),
);

expect(bulkUpdateDataSpy).toHaveBeenCalledWith(OPENSEARCH_DOMAIN, OPENSEARCH_INDEX, []);
expect(logErrorSpy).toHaveBeenCalledWith(
expect.objectContaining({
type: sink.ErrorType.BADPARSE,
}),
);
});

it("should skip if the record has no value", async () => {
await handler(
createKafkaEvent({
[TOPIC]: [
createKafkaRecord({
topic: TOPIC,
key: TEST_INSIGHT_KEY,
// @ts-expect-error needs to be undefined for test
value: undefined,
}),
],
}),
{} as Context,
vi.fn(),
);

expect(bulkUpdateDataSpy).toHaveBeenCalledWith(OPENSEARCH_DOMAIN, OPENSEARCH_INDEX, []);
expect(logErrorSpy).not.toHaveBeenCalled();
});

it("should handle a valid record", async () => {
await handler(
createKafkaEvent({
[TOPIC]: [
createKafkaRecord({
topic: TOPIC,
key: TEST_INSIGHT_KEY,
value: convertObjToBase64({
test: "value",
}),
}),
],
}),
{} as Context,
vi.fn(),
);

expect(bulkUpdateDataSpy).toHaveBeenCalledWith(OPENSEARCH_DOMAIN, OPENSEARCH_INDEX, [
{
id: TEST_INSIGHT_ID,
test: "value",
},
]);
expect(logErrorSpy).not.toHaveBeenCalled();
});
});
8 changes: 4 additions & 4 deletions lib/lambda/sinkInsights.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ export const handler: Handler<KafkaEvent> = async (event) => {
for (const topicPartition of Object.keys(event.records)) {
const topic = getTopic(topicPartition);
switch (topic) {
case undefined:
logError({ type: ErrorType.BADTOPIC });
throw new Error();
case "aws.seatool.ksql.onemac.three.agg.State_Plan":
await ksql(event.records[topicPartition], topicPartition);
break;
default:
logError({ type: ErrorType.BADTOPIC });
throw new Error(`topic (${topicPartition}) is invalid`);
}
}
} catch (error) {
Expand All @@ -30,7 +30,7 @@ const ksql = async (kafkaRecords: KafkaRecord[], topicPartition: string) => {
try {
if (!value) continue;

const id: string = JSON.parse(decodeBase64WithUtf8(key));
const id: string = decodeBase64WithUtf8(key);
const record = JSON.parse(decodeBase64WithUtf8(value));
docs.push({ ...record, id });
} catch (error) {
Expand Down
Loading
Loading