From 0cf59d1117ed558e53d28be99c2c791a08f63102 Mon Sep 17 00:00:00 2001 From: Tom Forbes Date: Tue, 17 Dec 2024 14:22:50 +0000 Subject: [PATCH] Refactor to use SQSBatchResponse (#244) * batch size of 1 * Refactor to use SQSBatchResponse * success * format * DLQ alarm --- .../support-reminders.test.ts.snap | 128 ++++++++++++++++++ cdk/lib/support-reminders.ts | 15 ++ src/cancel-reminders/lambda/lambda.ts | 12 +- src/create-reminder-signup/lambda/lambda.ts | 84 +++++------- src/lib/handler.ts | 24 +++- src/lib/models.ts | 9 +- .../lambda/lambda.ts | 12 +- 7 files changed, 211 insertions(+), 73 deletions(-) diff --git a/cdk/lib/__snapshots__/support-reminders.test.ts.snap b/cdk/lib/__snapshots__/support-reminders.test.ts.snap index 33f9ff1..cb8d0f8 100644 --- a/cdk/lib/__snapshots__/support-reminders.test.ts.snap +++ b/cdk/lib/__snapshots__/support-reminders.test.ts.snap @@ -6,6 +6,7 @@ exports[`The SupportReminders stack matches the snapshot 1`] = ` "gu:cdk:constructs": [ "GuVpcParameter", "GuSubnetListParameter", + "GuAlarm", "GuDistributionBucketParameter", "GuLambdaFunction", "GuLambdaFunction", @@ -1787,6 +1788,7 @@ exports[`The SupportReminders stack matches the snapshot 1`] = ` }, "createreminderssignupSqsEventSourceSupportRemindersCODEsupportremindersQueueD5BCDE9BE219E9BA": { "Properties": { + "BatchSize": 1, "EventSourceArn": { "Fn::GetAtt": [ "supportremindersQueue01C5AB8D", @@ -2908,6 +2910,68 @@ exports[`The SupportReminders stack matches the snapshot 1`] = ` "Type": "AWS::SQS::Queue", "UpdateReplacePolicy": "Delete", }, + "supportremindersalarm1BE4F065": { + "Properties": { + "ActionsEnabled": false, + "AlarmActions": [ + { + "Fn::Join": [ + "", + [ + "arn:aws:sns:", + { + "Ref": "AWS::Region", + }, + ":", + { + "Ref": "AWS::AccountId", + }, + ":alarms-handler-topic-PROD", + ], + ], + }, + ], + "AlarmDescription": "A reminder signup event failed and is now on the dead letter queue.", + "AlarmName": "support-reminders-CODE: failed event on the dead letter queue", + "ComparisonOperator": "GreaterThanThreshold", + "Dimensions": [ + { + "Name": "QueueName", + "Value": { + "Fn::GetAtt": [ + "deadletterssupportremindersQueue83E67347", + "QueueName", + ], + }, + }, + ], + "EvaluationPeriods": 24, + "MetricName": "ApproximateNumberOfMessagesVisible", + "Namespace": "AWS/SQS", + "Period": 60, + "Statistic": "Sum", + "Tags": [ + { + "Key": "gu:cdk:version", + "Value": "TEST", + }, + { + "Key": "gu:repo", + "Value": "guardian/support-reminders", + }, + { + "Key": "Stack", + "Value": "support", + }, + { + "Key": "Stage", + "Value": "CODE", + }, + ], + "Threshold": 0, + }, + "Type": "AWS::CloudWatch::Alarm", + }, }, } `; @@ -2918,6 +2982,7 @@ exports[`The SupportReminders stack matches the snapshot 2`] = ` "gu:cdk:constructs": [ "GuVpcParameter", "GuSubnetListParameter", + "GuAlarm", "GuDistributionBucketParameter", "GuLambdaFunction", "GuLambdaFunction", @@ -4791,6 +4856,7 @@ exports[`The SupportReminders stack matches the snapshot 2`] = ` }, "createreminderssignupSqsEventSourceSupportRemindersPRODsupportremindersQueue30985D82167501E9": { "Properties": { + "BatchSize": 1, "EventSourceArn": { "Fn::GetAtt": [ "supportremindersQueue01C5AB8D", @@ -5912,6 +5978,68 @@ exports[`The SupportReminders stack matches the snapshot 2`] = ` "Type": "AWS::SQS::Queue", "UpdateReplacePolicy": "Delete", }, + "supportremindersalarm1BE4F065": { + "Properties": { + "ActionsEnabled": true, + "AlarmActions": [ + { + "Fn::Join": [ + "", + [ + "arn:aws:sns:", + { + "Ref": "AWS::Region", + }, + ":", + { + "Ref": "AWS::AccountId", + }, + ":alarms-handler-topic-PROD", + ], + ], + }, + ], + "AlarmDescription": "A reminder signup event failed and is now on the dead letter queue.", + "AlarmName": "support-reminders-PROD: failed event on the dead letter queue", + "ComparisonOperator": "GreaterThanThreshold", + "Dimensions": [ + { + "Name": "QueueName", + "Value": { + "Fn::GetAtt": [ + "deadletterssupportremindersQueue83E67347", + "QueueName", + ], + }, + }, + ], + "EvaluationPeriods": 24, + "MetricName": "ApproximateNumberOfMessagesVisible", + "Namespace": "AWS/SQS", + "Period": 60, + "Statistic": "Sum", + "Tags": [ + { + "Key": "gu:cdk:version", + "Value": "TEST", + }, + { + "Key": "gu:repo", + "Value": "guardian/support-reminders", + }, + { + "Key": "Stack", + "Value": "support", + }, + { + "Key": "Stage", + "Value": "PROD", + }, + ], + "Threshold": 0, + }, + "Type": "AWS::CloudWatch::Alarm", + }, }, } `; diff --git a/cdk/lib/support-reminders.ts b/cdk/lib/support-reminders.ts index 2e871d7..b5d26d7 100644 --- a/cdk/lib/support-reminders.ts +++ b/cdk/lib/support-reminders.ts @@ -57,6 +57,20 @@ export class SupportReminders extends GuStack { retentionPeriod: Duration.days(14), }); + new GuAlarm(this, `${app}-alarm`, { + app: app, + snsTopicName: alarmsTopic, + alarmName: `${app}-${this.stage}: failed event on the dead letter queue`, + alarmDescription: `A reminder signup event failed and is now on the dead letter queue.`, + metric: deadLetterQueue + .metric('ApproximateNumberOfMessagesVisible') + .with({ statistic: 'Sum', period: Duration.minutes(1) }), + comparisonOperator: ComparisonOperator.GREATER_THAN_THRESHOLD, + threshold: 0, + evaluationPeriods: 24, + actionsEnabled: this.stage === 'PROD', + }); + const queue = new Queue(this, `${app}Queue`, { queueName, visibilityTimeout: Duration.minutes(2), @@ -71,6 +85,7 @@ export class SupportReminders extends GuStack { // SQS to Lambda event source mapping const eventSource = new SqsEventSource(queue, { reportBatchItemFailures: true, + batchSize: 1, }); const events=[eventSource]; diff --git a/src/cancel-reminders/lambda/lambda.ts b/src/cancel-reminders/lambda/lambda.ts index 9db6a31..c757c15 100644 --- a/src/cancel-reminders/lambda/lambda.ts +++ b/src/cancel-reminders/lambda/lambda.ts @@ -1,10 +1,10 @@ -import { APIGatewayProxyResult } from 'aws-lambda'; +import { APIGatewayProxyEvent, APIGatewayProxyResult } from 'aws-lambda'; import * as AWS from 'aws-sdk'; import * as SSM from 'aws-sdk/clients/ssm'; import { Pool } from 'pg'; import { createDatabaseConnectionPool } from '../../lib/db'; -import { getHandler } from '../../lib/handler'; -import { APIGatewayEvent, ValidationErrors } from '../../lib/models'; +import { getApiGatewayHandler } from '../../lib/handler'; +import { ValidationErrors } from '../../lib/models'; import { getDatabaseParamsFromSSM } from '../../lib/ssm'; import { cancelPendingSignups } from '../lib/db'; import { cancellationValidator } from './models'; @@ -23,11 +23,11 @@ const dbConnectionPoolPromise: Promise = getDatabaseParamsFromSSM( ).then(createDatabaseConnectionPool); export const run = async ( - event: APIGatewayEvent, + event: APIGatewayProxyEvent, ): Promise => { console.log('received event: ', event); - const cancellationRequest: unknown = JSON.parse(event.body); + const cancellationRequest: unknown = JSON.parse(event.body ?? ''); const validationErrors: ValidationErrors = []; if (!cancellationValidator(cancellationRequest, validationErrors)) { @@ -46,4 +46,4 @@ export const run = async ( return { headers, statusCode: 200, body: 'OK' }; }; -export const handler = getHandler(run); +export const handler = getApiGatewayHandler(run); diff --git a/src/create-reminder-signup/lambda/lambda.ts b/src/create-reminder-signup/lambda/lambda.ts index 5223660..b55a311 100644 --- a/src/create-reminder-signup/lambda/lambda.ts +++ b/src/create-reminder-signup/lambda/lambda.ts @@ -1,10 +1,10 @@ -import { APIGatewayProxyResult, SQSEvent } from 'aws-lambda'; +import { SQSBatchResponse, SQSEvent, SQSRecord } from 'aws-lambda'; import * as AWS from 'aws-sdk'; import * as SSM from 'aws-sdk/clients/ssm'; import { Pool, QueryResult } from 'pg'; import { createDatabaseConnectionPool } from '../../lib/db'; -import { getHandler } from '../../lib/handler'; -import { APIGatewayEvent, ValidationErrors } from '../../lib/models'; +import { getSQSHandler } from '../../lib/handler'; +import { ValidationErrors } from '../../lib/models'; import { getDatabaseParamsFromSSM, getParamFromSSM, @@ -22,13 +22,6 @@ import { recurringSignupValidator, } from './models'; -const headers = { - 'Content-Type': 'application/json', - 'Access-Control-Allow-Origin': '*', - 'Access-Control-Allow-Headers': '*', - 'Access-Control-Allow-Methods': '*', -}; - const ssm: SSM = new AWS.SSM({ region: 'eu-west-1' }); const dbConnectionPoolPromise: Promise = getDatabaseParamsFromSSM( @@ -39,32 +32,46 @@ const identityAccessTokenPromise: Promise = getParamFromSSM( `/support-reminders/idapi/${ssmStage}/accessToken`, ); -export const run = async (event: SQSEvent): Promise => { +export const run = async (event: SQSEvent): Promise => { console.log('received event: ', event); + // SQS event source is configured with batchSize = 1 + const record = event.Records[0]; + + // If there is an error, return the messageId to SQS + return processRecord(record) + .then(() => ({ + batchItemFailures: [], + })) + .catch((error) => { + console.log(error); + return { + batchItemFailures: [{ itemIdentifier: record.messageId }], + }; + }); +}; + +const processRecord = (record: SQSRecord): Promise => { const country = - event.Records[0].messageAttributes['X-GU-GeoIP-Country-Code'] - .stringValue; + record.messageAttributes['X-GU-GeoIP-Country-Code'].stringValue; - const eventPath = - event.Records[0].messageAttributes['EventPath'].stringValue; + const eventPath = record.messageAttributes['EventPath'].stringValue; const signupRequest: unknown = { country, - ...JSON.parse(event.Records[0].body), + ...JSON.parse(record.body), }; - let result; if (eventPath === '/create/one-off') { - result = await runOneOff(signupRequest); + return runOneOff(signupRequest); } else if (eventPath === '/create/recurring') { - result = await runRecurring(signupRequest); + return runRecurring(signupRequest); + } else { + return Promise.reject(`Invalid path: ${String(eventPath)}`); } }; -export const runOneOff = async ( - signupRequest: unknown, -): Promise => { +const runOneOff = async (signupRequest: unknown): Promise => { const persist = ( signupRequest: OneOffSignupRequest, identityId: string, @@ -77,9 +84,7 @@ export const runOneOff = async ( return createSignup(signupRequest, oneOffSignupValidator, persist); }; -export const runRecurring = async ( - signupRequest: unknown, -): Promise => { +const runRecurring = async (signupRequest: unknown): Promise => { const persist = ( signupRequest: RecurringSignupRequest, identityId: string, @@ -107,15 +112,11 @@ const createSignup = async ( signupRequest: unknown, validator: Validator, persist: Persist, -): Promise => { +): Promise => { const validationErrors: ValidationErrors = []; if (!validator(signupRequest, validationErrors)) { console.log('Validation of signup failed', validationErrors); - return { - headers, - statusCode: 400, - body: 'Invalid body', - }; + return Promise.reject('Validation of signup failed'); } const token = await identityAccessTokenPromise; @@ -136,24 +137,13 @@ const createSignup = async ( console.log('dbResult: ', dbResult); if (dbResult.rowCount !== 1) { - return { - headers, - statusCode: 500, - body: 'Internal Server Error', - }; + return Promise.reject( + `Unexpected row count in database response: ${dbResult.rowCount}`, + ); } - return { - headers, - statusCode: 200, - body: 'OK', - }; } else { - return { - headers, - statusCode: identityResult.status, - body: identityResult.status.toString(), - }; + return Promise.reject(identityResult.status.toString()); } }; -export const handler = getHandler(run); +export const handler = getSQSHandler(run); diff --git a/src/lib/handler.ts b/src/lib/handler.ts index b3f00d4..63d112f 100644 --- a/src/lib/handler.ts +++ b/src/lib/handler.ts @@ -1,16 +1,18 @@ import { - APIGatewayProxyCallback, + APIGatewayProxyEvent, + APIGatewayProxyHandler, APIGatewayProxyResult, + Callback, Context, + SQSBatchResponse, + SQSEvent, + SQSHandler, } from 'aws-lambda'; -import { APIGatewayEvent } from './models'; -export const getHandler = ( - run: (event: APIGatewayEvent) => Promise, -) => ( - event: APIGatewayEvent, +const getHandler = (run: (event: INPUT) => Promise) => ( + event: INPUT, context: Context, - callback: APIGatewayProxyCallback, + callback: Callback, ): void => { // setTimeout is necessary because of a bug in the node lambda runtime which can break requests to ssm setTimeout(() => { @@ -31,3 +33,11 @@ export const getHandler = ( }); }); }; + +export const getApiGatewayHandler = ( + run: (event: APIGatewayProxyEvent) => Promise, +): APIGatewayProxyHandler => getHandler(run); + +export const getSQSHandler = ( + run: (event: SQSEvent) => Promise, +): SQSHandler => getHandler(run); diff --git a/src/lib/models.ts b/src/lib/models.ts index a70d403..b93c8fa 100644 --- a/src/lib/models.ts +++ b/src/lib/models.ts @@ -1,13 +1,8 @@ import * as IR from 'typecheck.macro/dist/IR'; -export interface APIGatewayEvent { - headers: Record; - path: string; - body: string; -} - export function isValidEmail(email: string): boolean { - const re = /[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*@(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?/; + const re = + /[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*@(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?/; return re.test(email.toLowerCase()); } diff --git a/src/reactivate-recurring-reminder/lambda/lambda.ts b/src/reactivate-recurring-reminder/lambda/lambda.ts index d239c7c..a72f202 100644 --- a/src/reactivate-recurring-reminder/lambda/lambda.ts +++ b/src/reactivate-recurring-reminder/lambda/lambda.ts @@ -1,10 +1,10 @@ -import { APIGatewayProxyResult } from 'aws-lambda'; +import { APIGatewayProxyEvent, APIGatewayProxyResult } from 'aws-lambda'; import * as AWS from 'aws-sdk'; import * as SSM from 'aws-sdk/clients/ssm'; import { Pool } from 'pg'; import { createDatabaseConnectionPool } from '../../lib/db'; -import { getHandler } from '../../lib/handler'; -import { APIGatewayEvent, ValidationErrors } from '../../lib/models'; +import { getApiGatewayHandler } from '../../lib/handler'; +import { ValidationErrors } from '../../lib/models'; import { getDatabaseParamsFromSSM } from '../../lib/ssm'; import { reactivateRecurringReminder } from '../lib/db'; import { reactivationValidator } from './models'; @@ -23,11 +23,11 @@ const dbConnectionPoolPromise: Promise = getDatabaseParamsFromSSM( ).then(createDatabaseConnectionPool); export const run = async ( - event: APIGatewayEvent, + event: APIGatewayProxyEvent, ): Promise => { console.log('received event: ', event); - const reactivationRequest: unknown = JSON.parse(event.body); + const reactivationRequest: unknown = JSON.parse(event.body ?? ''); const validationErrors: ValidationErrors = []; if (!reactivationValidator(reactivationRequest, validationErrors)) { @@ -46,4 +46,4 @@ export const run = async ( return Promise.resolve({ headers, statusCode: 200, body: 'OK' }); }; -export const handler = getHandler(run); +export const handler = getApiGatewayHandler(run);