Skip to content

Commit

Permalink
adding more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
thetif committed Jan 15, 2025
1 parent 065087a commit 12c5cff
Show file tree
Hide file tree
Showing 18 changed files with 739 additions and 59 deletions.
13 changes: 13 additions & 0 deletions lib/lambda/getPackageActions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@ describe("getPackageActions Handler", () => {
expect(res.statusCode).toEqual(400);
});

it("should return 500 if event body is invalid", async () => {
const event = {
body: {},
requestContext: getRequestContext(),
} as APIGatewayEvent;

const res = await handler(event);

expect(res).toBeTruthy();
expect(res.statusCode).toEqual(500);
expect(res.body).toEqual(JSON.stringify({ message: "Internal server error" }));
});

it("should return 401 if not authorized to view resources from the state", async () => {
const event = {
body: JSON.stringify({ id: HI_TEST_ITEM_ID }),
Expand Down
5 changes: 5 additions & 0 deletions lib/lambda/processEmails.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ export async function processAndSendEmails(record: any, id: string, config: Proc
const sec = await getSecret(config.emailAddressLookupSecretName);

const item = await os.getItem(config.osDomain, getNamespace("main"), id);
if (!item?.found || !item?._source) {
console.log(`The package was not found for id: ${id}. Doing nothing.`);
return;
}

const cpocEmail = [...getCpocEmail(item)];
const srtEmails = [...getSrtEmails(item)];

Expand Down
68 changes: 68 additions & 0 deletions lib/lambda/sinkCpocs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ import {
createKafkaRecord,
OPENSEARCH_DOMAIN,
OPENSEARCH_INDEX_NAMESPACE,
rateLimitBulkUpdateDataHandler,
errorBulkUpdateDataHandler,
} from "mocks";
import { mockedServiceServer as mockedServer } from "mocks/server";
import cpocs, { MUHAMMAD_BASHAR_ID } from "mocks/data/cpocs";
import { Client } from "@opensearch-project/opensearch";

const OPENSEARCH_INDEX = `${OPENSEARCH_INDEX_NAMESPACE}cpocs`;
const TOPIC = "--mako--branch-name--aws.seatool.debezium.cdc.SEA.dbo.Officers";
Expand Down Expand Up @@ -221,4 +225,68 @@ describe("test sync cpoc", () => {
},
]);
});

it("should succeed after receiving a rate limit exceeded error", async () => {
const osBulkSpy = vi.spyOn(Client.prototype, "bulk");
mockedServer.use(rateLimitBulkUpdateDataHandler);

await handler(
createKafkaEvent({
[`${TOPIC}-xyz`]: [
createKafkaRecord({
topic: `${TOPIC}-xyz`,
key: MUHAMMAD_BASHAR_KEY,
value: convertObjToBase64({
payload: {
after: {
Officer_ID: MUHAMMAD_BASHAR_ID,
First_Name: MUHAMMAD_BASHAR._source?.firstName,
Last_Name: MUHAMMAD_BASHAR._source?.lastName,
Email: MUHAMMAD_BASHAR._source?.email,
},
},
}),
}),
],
}),
{} as Context,
vi.fn(),
);

expect(bulkUpdateDataSpy).toHaveBeenCalledWith(OPENSEARCH_DOMAIN, OPENSEARCH_INDEX, [
{
...MUHAMMAD_BASHAR._source,
},
]);
expect(osBulkSpy).toHaveBeenCalledTimes(2);
});

it("should succeed after receiving a rate limit exceeded error", async () => {
mockedServer.use(errorBulkUpdateDataHandler);

await expect(() =>
handler(
createKafkaEvent({
[`${TOPIC}-xyz`]: [
createKafkaRecord({
topic: `${TOPIC}-xyz`,
key: MUHAMMAD_BASHAR_KEY,
value: convertObjToBase64({
payload: {
after: {
Officer_ID: MUHAMMAD_BASHAR_ID,
First_Name: MUHAMMAD_BASHAR._source?.firstName,
Last_Name: MUHAMMAD_BASHAR._source?.lastName,
Email: MUHAMMAD_BASHAR._source?.email,
},
},
}),
}),
],
}),
{} as Context,
vi.fn(),
),
).rejects.toThrowError("Response Error");
});
});
136 changes: 136 additions & 0 deletions lib/lambda/sinkInsights.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import { describe, expect, it, vi, afterEach } from "vitest";
import { handler } from "./sinkInsights";
import { Context } from "aws-lambda";
import * as os from "libs/opensearch-lib";
import * as sink from "../libs/sink-lib";
import {
convertObjToBase64,
createKafkaEvent,
createKafkaRecord,
OPENSEARCH_DOMAIN,
OPENSEARCH_INDEX_NAMESPACE,
} from "mocks";

const OPENSEARCH_INDEX = `${OPENSEARCH_INDEX_NAMESPACE}insights`;
const TEST_INSIGHT_ID = "42";
const TEST_INSIGHT_KEY = Buffer.from(TEST_INSIGHT_ID).toString("base64");
const TOPIC = `--mako--branch-name--aws.seatool.ksql.onemac.three.agg.State_Plan-${TEST_INSIGHT_ID}`;

describe("test sync types", () => {
const bulkUpdateDataSpy = vi.spyOn(os, "bulkUpdateData");
const logErrorSpy = vi.spyOn(sink, "logError");

afterEach(() => {
vi.clearAllMocks();
});

it("should throw an error if the topic is undefined", async () => {
await expect(() =>
handler(
createKafkaEvent({
undefined: [],
}),
{} as Context,
vi.fn(),
),
).rejects.toThrowError("topic (undefined) is invalid");

expect(logErrorSpy).toHaveBeenCalledWith({ type: sink.ErrorType.BADTOPIC });
expect(logErrorSpy).toHaveBeenCalledWith(
expect.objectContaining({
type: sink.ErrorType.UNKNOWN,
}),
);
});

it("should throw an error if the topic is invalid", async () => {
await expect(() =>
handler(
createKafkaEvent({
"invalid-topic": [],
}),
{} as Context,
vi.fn(),
),
).rejects.toThrowError("topic (invalid-topic) is invalid");

expect(logErrorSpy).toHaveBeenCalledWith({ type: sink.ErrorType.BADTOPIC });
expect(logErrorSpy).toHaveBeenCalledWith(
expect.objectContaining({
type: sink.ErrorType.UNKNOWN,
}),
);
});

it("should skip if the key is invalid", async () => {
await handler(
createKafkaEvent({
[TOPIC]: [
createKafkaRecord({
topic: TOPIC,
// @ts-expect-error
key: undefined,
value: convertObjToBase64({
test: "value",
}),
}),
],
}),
{} as Context,
vi.fn(),
);

expect(bulkUpdateDataSpy).toHaveBeenCalledWith(OPENSEARCH_DOMAIN, OPENSEARCH_INDEX, []);
expect(logErrorSpy).toHaveBeenCalledWith(
expect.objectContaining({
type: sink.ErrorType.BADPARSE,
}),
);
});

it("should skip if the record has no value", async () => {
await handler(
createKafkaEvent({
[TOPIC]: [
createKafkaRecord({
topic: TOPIC,
key: TEST_INSIGHT_KEY,
// @ts-expect-error needs to be undefined for test
value: undefined,
}),
],
}),
{} as Context,
vi.fn(),
);

expect(bulkUpdateDataSpy).toHaveBeenCalledWith(OPENSEARCH_DOMAIN, OPENSEARCH_INDEX, []);
expect(logErrorSpy).not.toHaveBeenCalled();
});

it("should handle a valid record", async () => {
await handler(
createKafkaEvent({
[TOPIC]: [
createKafkaRecord({
topic: TOPIC,
key: TEST_INSIGHT_KEY,
value: convertObjToBase64({
test: "value",
}),
}),
],
}),
{} as Context,
vi.fn(),
);

expect(bulkUpdateDataSpy).toHaveBeenCalledWith(OPENSEARCH_DOMAIN, OPENSEARCH_INDEX, [
{
id: TEST_INSIGHT_ID,
test: "value",
},
]);
expect(logErrorSpy).not.toHaveBeenCalled();
});
});
8 changes: 4 additions & 4 deletions lib/lambda/sinkInsights.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ export const handler: Handler<KafkaEvent> = async (event) => {
for (const topicPartition of Object.keys(event.records)) {
const topic = getTopic(topicPartition);
switch (topic) {
case undefined:
logError({ type: ErrorType.BADTOPIC });
throw new Error();
case "aws.seatool.ksql.onemac.three.agg.State_Plan":
await ksql(event.records[topicPartition], topicPartition);
break;
default:
logError({ type: ErrorType.BADTOPIC });
throw new Error(`topic (${topicPartition}) is invalid`);
}
}
} catch (error) {
Expand All @@ -30,7 +30,7 @@ const ksql = async (kafkaRecords: KafkaRecord[], topicPartition: string) => {
try {
if (!value) continue;

const id: string = JSON.parse(decodeBase64WithUtf8(key));
const id: string = decodeBase64WithUtf8(key);
const record = JSON.parse(decodeBase64WithUtf8(value));
docs.push({ ...record, id });
} catch (error) {
Expand Down
Loading

0 comments on commit 12c5cff

Please sign in to comment.