Skip to content

Commit

Permalink
Merge pull request #100 from guardian/ingestion-lambda-deduplicate-st…
Browse files Browse the repository at this point in the history
…ories

Ingestion-lambda: deduplicate incoming stories
  • Loading branch information
sb-dev authored Jan 16, 2025
2 parents 8bfa4dc + 70a550b commit 849eecc
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 17 deletions.
1 change: 1 addition & 0 deletions db/migrations/V9__external_id_unique_constraint.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE fingerpost_wire_entry ADD CONSTRAINT external_id_unique UNIQUE (external_id);
34 changes: 17 additions & 17 deletions ingestion-lambda/src/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,14 @@ export const main = async (event: SQSEvent): Promise<SQSBatchResponse> => {
const results = await Promise.all(
records.map(
async ({
messageId,
messageId: sqsMessageId,
messageAttributes,
body,
}): Promise<OperationResult> => {
const sqsMessageId = messageId;

try {
const fingerpostMessageId =
messageAttributes['Message-Id']?.stringValue;
const messageId = messageAttributes['Message-Id']?.stringValue;

if (!fingerpostMessageId) {
if (!messageId) {
await s3Client.send(
new PutObjectCommand({
Bucket: BUCKET_NAME,
Expand All @@ -102,23 +99,24 @@ export const main = async (event: SQSEvent): Promise<SQSBatchResponse> => {
await s3Client.send(
new PutObjectCommand({
Bucket: BUCKET_NAME,
Key: `${fingerpostMessageId}.json`,
Key: `${messageId}.json`,
Body: body,
}),
);

const snsMessageContent = safeBodyParse(body);

await sql`
INSERT INTO ${sql(tableName)}
(external_id, content)
VALUES
(${fingerpostMessageId}, ${snsMessageContent as never})
RETURNING id`.then((res) => {
if (res.length === 0) {
throw new Error('Failed to insert record into DB');
}
});
const result = await sql`
INSERT INTO ${sql(tableName)}
(external_id, content)
VALUES (${messageId}, ${snsMessageContent as never}) ON CONFLICT (external_id) DO NOTHING
RETURNING id`;

if (result.length === 0) {
console.warn(
`A record with the provided external_id (messageId: ${messageId}) already exists. No new data was inserted to prevent duplication.`,
);
}
} catch (e) {
const reason = e instanceof Error ? e.message : 'Unknown error';
return {
Expand All @@ -132,6 +130,7 @@ export const main = async (event: SQSEvent): Promise<SQSBatchResponse> => {
},
),
);

const batchItemFailures = results
.filter(
(result): result is OperationFailure => result.status === 'failure',
Expand All @@ -142,6 +141,7 @@ export const main = async (event: SQSEvent): Promise<SQSBatchResponse> => {
);
return { itemIdentifier: sqsMessageId };
});

console.log(
`Processed ${records.length} messages with ${batchItemFailures.length} failures`,
);
Expand Down

0 comments on commit 849eecc

Please sign in to comment.