From ddc1db40415cea610aa22e880a2103fec5fb87a3 Mon Sep 17 00:00:00 2001 From: Matthew Bystedt Date: Thu, 12 Sep 2024 14:47:32 -0700 Subject: [PATCH] feat: skip dlq for timestamp guards --- .../src/dead-letter-queue.service.ts | 7 ++++--- event-stream-processing/src/ecs-transform.service.ts | 10 ++++++---- .../src/parsers/document-index.parser.ts | 10 +++++----- .../src/parsers/timestamp-field.parser.ts | 4 ++-- .../src/parsers/timestamp-guard.parser.ts | 6 ++++-- .../src/parsers/url-explode.parser.ts | 2 +- event-stream-processing/src/types/os-document.ts | 3 +-- event-stream-processing/src/util/parser.error.ts | 1 + 8 files changed, 24 insertions(+), 19 deletions(-) diff --git a/event-stream-processing/src/dead-letter-queue.service.ts b/event-stream-processing/src/dead-letter-queue.service.ts index 13f0ec9..67d301e 100644 --- a/event-stream-processing/src/dead-letter-queue.service.ts +++ b/event-stream-processing/src/dead-letter-queue.service.ts @@ -20,12 +20,13 @@ export class DeadLetterQueueService { }); public async send({ failures }: OsDocumentPipeline) { - if (failures.length === 0) { + const dlqFailures = failures.filter((failure) => !failure.options?.skipDlq); + if (dlqFailures.length === 0) { return; } const chunkSize = 50; - for (let i = 0; i < failures.length; i += chunkSize) { - const chunk = failures.slice(i, i + chunkSize); + for (let i = 0; i < dlqFailures.length; i += chunkSize) { + const chunk = dlqFailures.slice(i, i + chunkSize); const input: PutRecordBatchCommandInput = { DeliveryStreamName: process.env.DLQ_STREAM_NAME, Records: chunk.map((pipelineObject) => { diff --git a/event-stream-processing/src/ecs-transform.service.ts b/event-stream-processing/src/ecs-transform.service.ts index 0f63e4f..85af096 100644 --- a/event-stream-processing/src/ecs-transform.service.ts +++ b/event-stream-processing/src/ecs-transform.service.ts @@ -1,11 +1,10 @@ -/* eslint-disable @typescript-eslint/no-unsafe-call */ import { KinesisStreamEvent } from 'aws-lambda'; import { injectable, inject, multiInject } from 'inversify'; import { Parser } from './types/parser'; import { TYPES } from './inversify.types'; import { LoggerService } from './util/logger.service'; import { GenericError } from './util/generic.error'; -// eslint-disable-next-line max-len + import { OsDocument, KinesisStreamRecordDecodeFailure, @@ -81,6 +80,10 @@ export class EcsTransformService { } catch (error: unknown) { const parser: string = error instanceof ParserError ? error.parser : 'unknown'; + const skipDlq: boolean = + error instanceof ParserError && error.options + ? error.options.skipDlq + : false; const message: string = error instanceof ParserError || error instanceof GenericError ? error.message @@ -100,14 +103,13 @@ export class EcsTransformService { const path: string = document.data.log?.file?.path ? document.data.log?.file?.path : ''; - // eslint-disable-next-line max-len this.logger.debug( `PARSE_ERROR:${parser} ${team} ${hostName} ${serviceName} ${path}:${sequence} ${document.fingerprint.name} : ${message}`, ); return new OsDocumentProcessingFailure( document, - // eslint-disable-next-line max-len `PARSE_ERROR:${parser} ${team} ${hostName} ${serviceName} ${path}:${sequence} ${document.fingerprint.name} : ${message}`, + { skipDlq }, ); } }) diff --git a/event-stream-processing/src/parsers/document-index.parser.ts b/event-stream-processing/src/parsers/document-index.parser.ts index 07ea561..77cd82c 100644 --- a/event-stream-processing/src/parsers/document-index.parser.ts +++ b/event-stream-processing/src/parsers/document-index.parser.ts @@ -32,7 +32,7 @@ export class DocumentIndexParser implements Parser { if (!indexName) { throw new ParserError( 'Could not map event to an index', - this.constructor.name, + 'DocumentIndexParser', ); } indexName = this.applyTimestampSubstitution(document, indexName); @@ -50,7 +50,7 @@ export class DocumentIndexParser implements Parser { if (lodash.isNil(timestamp)) { throw new ParserError( '@timestamp field value has not been defined', - this.constructor.name, + 'DocumentIndexParser', ); } const tsMomement = moment(timestamp); @@ -60,7 +60,7 @@ export class DocumentIndexParser implements Parser { } throw new ParserError( `Unexpected formatting: ${match}`, - this.constructor.name, + 'DocumentIndexParser', ); }); } @@ -75,14 +75,14 @@ export class DocumentIndexParser implements Parser { if (lodash.isNil(substitution)) { throw new ParserError( `${fieldName} field value not in document`, - this.constructor.name, + 'DocumentIndexParser', ); } return substitution; } throw new ParserError( `Unexpected formatting: ${match}`, - this.constructor.name, + 'DocumentIndexParser', ); }); } diff --git a/event-stream-processing/src/parsers/timestamp-field.parser.ts b/event-stream-processing/src/parsers/timestamp-field.parser.ts index 8dfaad6..9191f43 100644 --- a/event-stream-processing/src/parsers/timestamp-field.parser.ts +++ b/event-stream-processing/src/parsers/timestamp-field.parser.ts @@ -48,13 +48,13 @@ export class TimestampFieldParser implements Parser { } else { throw new ParserError( `Invalid date: '${value}' invalid for format '${tsFormat}'`, - this.constructor.name, + 'TimestampFieldParser', ); } } else { throw new ParserError( `No value set for timestamp: ${fieldName}`, - this.constructor.name, + 'TimestampFieldParser', ); } } diff --git a/event-stream-processing/src/parsers/timestamp-guard.parser.ts b/event-stream-processing/src/parsers/timestamp-guard.parser.ts index 86a0edd..90fe2cb 100644 --- a/event-stream-processing/src/parsers/timestamp-guard.parser.ts +++ b/event-stream-processing/src/parsers/timestamp-guard.parser.ts @@ -38,8 +38,10 @@ export class TimestampGuardParser implements Parser { ) ) { throw new ParserError( - `Timestamp is outside guard`, - this.constructor.name, + `Timestamp [${timestamp.toISOString()}] is outside guard [${startStr}, ${endStr}]`, + 'TimestampGuardParser', + undefined, + { skipDlq: true }, ); } } diff --git a/event-stream-processing/src/parsers/url-explode.parser.ts b/event-stream-processing/src/parsers/url-explode.parser.ts index 31f4827..bcd11d5 100644 --- a/event-stream-processing/src/parsers/url-explode.parser.ts +++ b/event-stream-processing/src/parsers/url-explode.parser.ts @@ -69,7 +69,7 @@ export class UrlExplodeParser implements Parser { } catch (e: unknown) { throw new ParserError( `Could not parse [${urlOriginal}]`, - this.constructor.name, + 'UrlExplodeParser', ); } } diff --git a/event-stream-processing/src/types/os-document.ts b/event-stream-processing/src/types/os-document.ts index cdc8bff..e91c28f 100644 --- a/event-stream-processing/src/types/os-document.ts +++ b/event-stream-processing/src/types/os-document.ts @@ -18,7 +18,6 @@ export enum FingerprintCategory { } export interface OsDocumentData { - // eslint-disable-next-line @typescript-eslint/no-explicit-any [key: string]: any; } @@ -44,13 +43,13 @@ export class PipelineProcessingFailure { constructor( public source: T, public message: string, + public options?: { skipDlq: boolean }, ) {} } export class KinesisStreamRecordDecodeFailure extends PipelineProcessingFailure {} export class OsDocumentProcessingFailure extends PipelineProcessingFailure {} export class OsDocumentCommitFailure extends PipelineProcessingFailure {} -// eslint-disable-next-line max-len export type PipelineObject = | OsDocument | KinesisStreamRecordDecodeFailure diff --git a/event-stream-processing/src/util/parser.error.ts b/event-stream-processing/src/util/parser.error.ts index da89fa6..2e97208 100644 --- a/event-stream-processing/src/util/parser.error.ts +++ b/event-stream-processing/src/util/parser.error.ts @@ -5,6 +5,7 @@ export class ParserError extends GenericError { message: string, public parser: string, source?: Error | undefined, + public options?: { skipDlq: boolean }, ) { super(message, source); Object.setPrototypeOf(this, new.target.prototype);