Skip to content

Commit

Permalink
fix(main date columns): Assert type of date columns (#450)
Browse files Browse the repository at this point in the history
* f

* m
  • Loading branch information
mdial89f authored Mar 15, 2024
1 parent 5c00dea commit 303b11d
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 53 deletions.
99 changes: 76 additions & 23 deletions src/libs/opensearch-lib.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,31 +37,84 @@ export async function updateData(host: string, indexObject: any) {
var response = await client.update(indexObject);
}

function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

interface Document {
id: string;
[key: string]: any;
}

export async function bulkUpdateData(
host: string,
index: opensearch.Index,
arrayOfDocuments: any
) {
// Skip if no documents have been supplied
index: string,
arrayOfDocuments: Document[],
): Promise<void> {
if (arrayOfDocuments.length === 0) {
console.log("No documents to update. Skipping bulk update operation.");
return;
}

client = client || (await getClient(host));
var response = await client.helpers.bulk({
datasource: arrayOfDocuments,
onDocument(doc: any) {
// The update operation always requires a tuple to be returned, with the
// first element being the action and the second being the update options.
return [
{
update: { _index: index, _id: doc.id },
},
{ doc_as_upsert: true },
];

const lastEntries = arrayOfDocuments.reduce(
(acc: { [id: string]: Document }, doc: Document) => {
acc[doc.id] = doc; // This will overwrite any previous entry with the same ID
return acc;
},
});
console.log(response);
{},
);
const filteredDocuments = Object.values(lastEntries);

const body: any[] = filteredDocuments.flatMap((doc) => [
{ update: { _index: index, _id: doc.id } }, // Action and metadata
{ doc: doc, doc_as_upsert: true }, // Document to update or upsert
]);

async function attemptBulkUpdate(
retries: number = 5,
delay: number = 1000,
): Promise<void> {
try {
const response = await client.bulk({ refresh: true, body: body });
if (response.body.errors) {
// Check for 429 status within response errors
const hasRateLimitErrors = response.body.items.some(
(item: any) => item.update.status === 429,
);

if (hasRateLimitErrors && retries > 0) {
console.log(`Rate limit exceeded, retrying in ${delay}ms...`);
await sleep(delay);
return attemptBulkUpdate(retries - 1, delay * 2); // Exponential backoff
} else if (!hasRateLimitErrors) {
// Handle or throw other errors normally
console.error(
"Bulk update errors:",
JSON.stringify(response.body.items, null, 2),
);
throw "ERROR: Bulk update had an error that was not rate related.";
}
} else {
console.log("Bulk update successful.");
}
} catch (error: any) {
if (error.statusCode === 429 && retries > 0) {
console.log(
`Rate limit exceeded, retrying in ${delay}ms...`,
error.message,
);
await sleep(delay);
return attemptBulkUpdate(retries - 1, delay * 2); // Exponential backoff
} else {
console.error("An error occurred:", error);
throw error;
}
}
}

await attemptBulkUpdate();
}

export async function deleteIndex(host: string, index: opensearch.Index) {
Expand All @@ -84,7 +137,7 @@ export async function mapRole(
host: string,
masterRoleToAssume: string,
osRoleName: string,
iamRoleName: string
iamRoleName: string,
) {
try {
const sts = new STSClient({
Expand All @@ -95,7 +148,7 @@ export async function mapRole(
RoleArn: masterRoleToAssume,
RoleSessionName: "RoleMappingSession",
ExternalId: "foo",
})
}),
);
const interceptor = aws4Interceptor({
options: {
Expand All @@ -117,7 +170,7 @@ export async function mapRole(
path: "/and_backend_roles",
value: [iamRoleName],
},
]
],
);
return patchResponse.data;
} catch (error) {
Expand All @@ -129,7 +182,7 @@ export async function mapRole(
export async function search(
host: string,
index: opensearch.Index,
query: any
query: any,
) {
client = client || (await getClient(host));
try {
Expand All @@ -146,7 +199,7 @@ export async function search(
export async function getItem(
host: string,
index: opensearch.Index,
id: string
id: string,
) {
client = client || (await getClient(host));
try {
Expand Down Expand Up @@ -174,7 +227,7 @@ export async function createIndex(host: string, index: opensearch.Index) {
export async function updateFieldMapping(
host: string,
index: opensearch.Index,
properties: object
properties: object,
) {
client = client || (await getClient(host));
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,10 @@ export const transform = () => {
};

export type Schema = ReturnType<typeof transform>;

export const tombstone = (id: string) => {
return {
id,
changedDate: null,
};
};
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
const getIdByAuthorityName = (authorityName: string) => {
try {
const authorityId = Object.keys(SEATOOL_AUTHORITIES).find(
(key) => SEATOOL_AUTHORITIES[key] === authorityName
(key) => SEATOOL_AUTHORITIES[key] === authorityName,
);
return authorityId ? parseInt(authorityId, 10) : null;
} catch (error) {
Expand All @@ -17,6 +17,11 @@ const getIdByAuthorityName = (authorityName: string) => {
}
};

const getDateStringOrNullFromEpoc = (epocDate: number | null | undefined) =>
epocDate !== null && epocDate !== undefined
? new Date(epocDate).toISOString()
: null;

export const transform = (id: string) => {
return onemacSchema.transform((data) => {
if (data.seaActionType === "Extend") {
Expand All @@ -42,9 +47,9 @@ export const transform = (id: string) => {
stateStatus: "Submitted",
cmsStatus: "Requested",
seatoolStatus: SEATOOL_STATUS.PENDING,
statusDate: data.statusDate,
submissionDate: data.submissionDate,
changedDate: data.changedDate,
statusDate: getDateStringOrNullFromEpoc(data.statusDate),
submissionDate: getDateStringOrNullFromEpoc(data.submissionDate),
changedDate: getDateStringOrNullFromEpoc(data.changedDate),
// type, subtype, subject, description will soon be collected and available in data; this will need updating then.
subject: null,
description: null,
Expand Down
29 changes: 17 additions & 12 deletions src/services/data/handlers/setupIndex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,21 @@ export const handler: Handler = async (_, __, callback) => {
};
let errorResponse = null;
try {
const indices = [
"main",
"changelog",
"insights",
"types",
"subtypes",
] as const;
for (const index of indices) {
await manageIndexResource({ index: index });
}
await manageIndexResource({
index: "main",
update: {
approvedEffectiveDate: { type: "date" },
changedDate: { type: "date" },
finalDispositionDate: { type: "date" },
proposedDate: { type: "date" },
statusDate: { type: "date" },
submissionDate: { type: "date" },
},
});
await manageIndexResource({ index: "changelog" });
await manageIndexResource({ index: "insights" });
await manageIndexResource({ index: "types" });
await manageIndexResource({ index: "subtypes" });
} catch (error: any) {
response.statusCode = 500;
errorResponse = error;
Expand All @@ -36,7 +41,7 @@ const manageIndexResource = async (resource: {

const createIndex = await os.createIndex(
process.env.osDomain,
resource.index
resource.index,
);
console.log(createIndex);

Expand All @@ -45,7 +50,7 @@ const manageIndexResource = async (resource: {
const updateFieldMapping = await os.updateFieldMapping(
process.env.osDomain,
resource.index,
resource.update
resource.update,
);
console.log(updateFieldMapping);
};
33 changes: 22 additions & 11 deletions src/services/data/handlers/sinkMain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@ export const handler: Handler<KafkaEvent> = async (event) => {
throw new Error();
case "aws.onemac.migration.cdc":
docs.push(
...(await onemac(event.records[topicPartition], topicPartition))
...(await onemac(event.records[topicPartition], topicPartition)),
);
break;
case "aws.seatool.ksql.onemac.agg.State_Plan":
docs.push(
...(await ksql(event.records[topicPartition], topicPartition))
...(await ksql(event.records[topicPartition], topicPartition)),
);
break;
case "aws.seatool.debezium.changed_date.SEA.dbo.State_Plan":
docs.push(
...(await changed_date(
event.records[topicPartition],
topicPartition
))
topicPartition,
)),
);
break;
}
Expand Down Expand Up @@ -140,7 +140,7 @@ const onemac = async (kafkaRecords: KafkaRecord[], topicPartition: string) => {
const result = (() => {
switch (record?.actionType) {
case "new-submission":
case undefined:
case undefined:
return opensearch.main.newSubmission
.transform(id)
.safeParse(record);
Expand All @@ -163,7 +163,7 @@ const onemac = async (kafkaRecords: KafkaRecord[], topicPartition: string) => {
})();
if (result === undefined) {
console.log(
`no action to take for ${id} action ${record.actionType}. Continuing...`
`no action to take for ${id} action ${record.actionType}. Continuing...`,
);
continue;
}
Expand All @@ -189,17 +189,27 @@ const onemac = async (kafkaRecords: KafkaRecord[], topicPartition: string) => {

const changed_date = async (
kafkaRecords: KafkaRecord[],
topicPartition: string
topicPartition: string,
) => {
const docs: any[] = [];
for (const kafkaRecord of kafkaRecords) {
const { value } = kafkaRecord;
const { key, value } = kafkaRecord;
try {
const decodedValue = Buffer.from(value, "base64").toString("utf-8");
const record = JSON.parse(decodedValue).payload.after;
if (!record) {
// Set id
const id: string = JSON.parse(decode(key)).payload.ID_Number;

// Handle delete events and continue
if (value === undefined) {
continue;
}

// Parse record
const decodedValue = Buffer.from(value, "base64").toString("utf-8");
const record = JSON.parse(decodedValue).payload.after;

// Handle tombstone events and continue
if (!record) continue;

const result = opensearch.main.changedDate.transform().safeParse(record);
if (!result.success) {
logError({
Expand All @@ -218,5 +228,6 @@ const changed_date = async (
});
}
}
console.log(JSON.stringify(docs, null, 2));
return docs;
};
4 changes: 2 additions & 2 deletions src/services/data/handlers/sinkSubtypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export const handler: Handler<KafkaEvent> = async (event) => {
throw new Error();
case "aws.seatool.debezium.cdc.SEA.dbo.Type":
docs.push(
...(await subtypes(event.records[topicPartition], topicPartition))
...(await subtypes(event.records[topicPartition], topicPartition)),
);
break;
}
Expand All @@ -40,7 +40,7 @@ export const handler: Handler<KafkaEvent> = async (event) => {

const subtypes = async (
kafkaRecords: KafkaRecord[],
topicPartition: string
topicPartition: string,
) => {
const docs: any[] = [];
for (const kafkaRecord of kafkaRecords) {
Expand Down
2 changes: 1 addition & 1 deletion src/services/data/handlers/sinkTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export const handler: Handler<KafkaEvent> = async (event) => {
throw new Error();
case "aws.seatool.debezium.cdc.SEA.dbo.SPA_Type":
docs.push(
...(await types(event.records[topicPartition], topicPartition))
...(await types(event.records[topicPartition], topicPartition)),
);
break;
}
Expand Down

0 comments on commit 303b11d

Please sign in to comment.