Skip to content

Commit

Permalink
Refactor to use SQSBatchResponse (#244)
Browse files Browse the repository at this point in the history
* batch size of 1

* Refactor to use SQSBatchResponse

* success

* format

* DLQ alarm
  • Loading branch information
tomrf1 authored Dec 17, 2024
1 parent 4067e09 commit 0cf59d1
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 73 deletions.
128 changes: 128 additions & 0 deletions cdk/lib/__snapshots__/support-reminders.test.ts.snap

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions cdk/lib/support-reminders.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,20 @@ export class SupportReminders extends GuStack {
retentionPeriod: Duration.days(14),
});

new GuAlarm(this, `${app}-alarm`, {
app: app,
snsTopicName: alarmsTopic,
alarmName: `${app}-${this.stage}: failed event on the dead letter queue`,
alarmDescription: `A reminder signup event failed and is now on the dead letter queue.`,
metric: deadLetterQueue
.metric('ApproximateNumberOfMessagesVisible')
.with({ statistic: 'Sum', period: Duration.minutes(1) }),
comparisonOperator: ComparisonOperator.GREATER_THAN_THRESHOLD,
threshold: 0,
evaluationPeriods: 24,
actionsEnabled: this.stage === 'PROD',
});

const queue = new Queue(this, `${app}Queue`, {
queueName,
visibilityTimeout: Duration.minutes(2),
Expand All @@ -71,6 +85,7 @@ export class SupportReminders extends GuStack {
// SQS to Lambda event source mapping
const eventSource = new SqsEventSource(queue, {
reportBatchItemFailures: true,
batchSize: 1,
});
const events=[eventSource];

Expand Down
12 changes: 6 additions & 6 deletions src/cancel-reminders/lambda/lambda.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { APIGatewayProxyResult } from 'aws-lambda';
import { APIGatewayProxyEvent, APIGatewayProxyResult } from 'aws-lambda';
import * as AWS from 'aws-sdk';
import * as SSM from 'aws-sdk/clients/ssm';
import { Pool } from 'pg';
import { createDatabaseConnectionPool } from '../../lib/db';
import { getHandler } from '../../lib/handler';
import { APIGatewayEvent, ValidationErrors } from '../../lib/models';
import { getApiGatewayHandler } from '../../lib/handler';
import { ValidationErrors } from '../../lib/models';
import { getDatabaseParamsFromSSM } from '../../lib/ssm';
import { cancelPendingSignups } from '../lib/db';
import { cancellationValidator } from './models';
Expand All @@ -23,11 +23,11 @@ const dbConnectionPoolPromise: Promise<Pool> = getDatabaseParamsFromSSM(
).then(createDatabaseConnectionPool);

export const run = async (
event: APIGatewayEvent,
event: APIGatewayProxyEvent,
): Promise<APIGatewayProxyResult> => {
console.log('received event: ', event);

const cancellationRequest: unknown = JSON.parse(event.body);
const cancellationRequest: unknown = JSON.parse(event.body ?? '');

const validationErrors: ValidationErrors = [];
if (!cancellationValidator(cancellationRequest, validationErrors)) {
Expand All @@ -46,4 +46,4 @@ export const run = async (
return { headers, statusCode: 200, body: 'OK' };
};

export const handler = getHandler(run);
export const handler = getApiGatewayHandler(run);
84 changes: 37 additions & 47 deletions src/create-reminder-signup/lambda/lambda.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { APIGatewayProxyResult, SQSEvent } from 'aws-lambda';
import { SQSBatchResponse, SQSEvent, SQSRecord } from 'aws-lambda';
import * as AWS from 'aws-sdk';
import * as SSM from 'aws-sdk/clients/ssm';
import { Pool, QueryResult } from 'pg';
import { createDatabaseConnectionPool } from '../../lib/db';
import { getHandler } from '../../lib/handler';
import { APIGatewayEvent, ValidationErrors } from '../../lib/models';
import { getSQSHandler } from '../../lib/handler';
import { ValidationErrors } from '../../lib/models';
import {
getDatabaseParamsFromSSM,
getParamFromSSM,
Expand All @@ -22,13 +22,6 @@ import {
recurringSignupValidator,
} from './models';

const headers = {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': '*',
'Access-Control-Allow-Methods': '*',
};

const ssm: SSM = new AWS.SSM({ region: 'eu-west-1' });

const dbConnectionPoolPromise: Promise<Pool> = getDatabaseParamsFromSSM(
Expand All @@ -39,32 +32,46 @@ const identityAccessTokenPromise: Promise<string> = getParamFromSSM(
`/support-reminders/idapi/${ssmStage}/accessToken`,
);

export const run = async (event: SQSEvent): Promise<void> => {
export const run = async (event: SQSEvent): Promise<SQSBatchResponse> => {
console.log('received event: ', event);

// SQS event source is configured with batchSize = 1
const record = event.Records[0];

// If there is an error, return the messageId to SQS
return processRecord(record)
.then(() => ({
batchItemFailures: [],
}))
.catch((error) => {
console.log(error);
return {
batchItemFailures: [{ itemIdentifier: record.messageId }],
};
});
};

const processRecord = (record: SQSRecord): Promise<void> => {
const country =
event.Records[0].messageAttributes['X-GU-GeoIP-Country-Code']
.stringValue;
record.messageAttributes['X-GU-GeoIP-Country-Code'].stringValue;

const eventPath =
event.Records[0].messageAttributes['EventPath'].stringValue;
const eventPath = record.messageAttributes['EventPath'].stringValue;

const signupRequest: unknown = {
country,
...JSON.parse(event.Records[0].body),
...JSON.parse(record.body),
};

let result;
if (eventPath === '/create/one-off') {
result = await runOneOff(signupRequest);
return runOneOff(signupRequest);
} else if (eventPath === '/create/recurring') {
result = await runRecurring(signupRequest);
return runRecurring(signupRequest);
} else {
return Promise.reject(`Invalid path: ${String(eventPath)}`);
}
};

export const runOneOff = async (
signupRequest: unknown,
): Promise<APIGatewayProxyResult> => {
const runOneOff = async (signupRequest: unknown): Promise<void> => {
const persist = (
signupRequest: OneOffSignupRequest,
identityId: string,
Expand All @@ -77,9 +84,7 @@ export const runOneOff = async (
return createSignup(signupRequest, oneOffSignupValidator, persist);
};

export const runRecurring = async (
signupRequest: unknown,
): Promise<APIGatewayProxyResult> => {
const runRecurring = async (signupRequest: unknown): Promise<void> => {
const persist = (
signupRequest: RecurringSignupRequest,
identityId: string,
Expand Down Expand Up @@ -107,15 +112,11 @@ const createSignup = async <T extends BaseSignupRequest>(
signupRequest: unknown,
validator: Validator<T>,
persist: Persist<T>,
): Promise<APIGatewayProxyResult> => {
): Promise<void> => {
const validationErrors: ValidationErrors = [];
if (!validator(signupRequest, validationErrors)) {
console.log('Validation of signup failed', validationErrors);
return {
headers,
statusCode: 400,
body: 'Invalid body',
};
return Promise.reject('Validation of signup failed');
}

const token = await identityAccessTokenPromise;
Expand All @@ -136,24 +137,13 @@ const createSignup = async <T extends BaseSignupRequest>(
console.log('dbResult: ', dbResult);

if (dbResult.rowCount !== 1) {
return {
headers,
statusCode: 500,
body: 'Internal Server Error',
};
return Promise.reject(
`Unexpected row count in database response: ${dbResult.rowCount}`,
);
}
return {
headers,
statusCode: 200,
body: 'OK',
};
} else {
return {
headers,
statusCode: identityResult.status,
body: identityResult.status.toString(),
};
return Promise.reject(identityResult.status.toString());
}
};

export const handler = getHandler(run);
export const handler = getSQSHandler(run);
Loading

0 comments on commit 0cf59d1

Please sign in to comment.