Skip to content

Commit

Permalink
feat: skip dlq for timestamp guards
Browse files Browse the repository at this point in the history
  • Loading branch information
mbystedt committed Sep 12, 2024
1 parent e3aa59f commit ddc1db4
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 19 deletions.
7 changes: 4 additions & 3 deletions event-stream-processing/src/dead-letter-queue.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ export class DeadLetterQueueService {
});

public async send({ failures }: OsDocumentPipeline) {
if (failures.length === 0) {
const dlqFailures = failures.filter((failure) => !failure.options?.skipDlq);
if (dlqFailures.length === 0) {
return;
}
const chunkSize = 50;
for (let i = 0; i < failures.length; i += chunkSize) {
const chunk = failures.slice(i, i + chunkSize);
for (let i = 0; i < dlqFailures.length; i += chunkSize) {
const chunk = dlqFailures.slice(i, i + chunkSize);
const input: PutRecordBatchCommandInput = {
DeliveryStreamName: process.env.DLQ_STREAM_NAME,
Records: chunk.map((pipelineObject) => {
Expand Down
10 changes: 6 additions & 4 deletions event-stream-processing/src/ecs-transform.service.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
/* eslint-disable @typescript-eslint/no-unsafe-call */
import { KinesisStreamEvent } from 'aws-lambda';
import { injectable, inject, multiInject } from 'inversify';
import { Parser } from './types/parser';
import { TYPES } from './inversify.types';
import { LoggerService } from './util/logger.service';
import { GenericError } from './util/generic.error';
// eslint-disable-next-line max-len

import {
OsDocument,
KinesisStreamRecordDecodeFailure,
Expand Down Expand Up @@ -81,6 +80,10 @@ export class EcsTransformService {
} catch (error: unknown) {
const parser: string =
error instanceof ParserError ? error.parser : 'unknown';
const skipDlq: boolean =
error instanceof ParserError && error.options
? error.options.skipDlq
: false;
const message: string =
error instanceof ParserError || error instanceof GenericError
? error.message
Expand All @@ -100,14 +103,13 @@ export class EcsTransformService {
const path: string = document.data.log?.file?.path
? document.data.log?.file?.path
: '';
// eslint-disable-next-line max-len
this.logger.debug(
`PARSE_ERROR:${parser} ${team} ${hostName} ${serviceName} ${path}:${sequence} ${document.fingerprint.name} : ${message}`,
);
return new OsDocumentProcessingFailure(
document,
// eslint-disable-next-line max-len
`PARSE_ERROR:${parser} ${team} ${hostName} ${serviceName} ${path}:${sequence} ${document.fingerprint.name} : ${message}`,
{ skipDlq },
);
}
})
Expand Down
10 changes: 5 additions & 5 deletions event-stream-processing/src/parsers/document-index.parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export class DocumentIndexParser implements Parser {
if (!indexName) {
throw new ParserError(
'Could not map event to an index',
this.constructor.name,
'DocumentIndexParser',
);
}
indexName = this.applyTimestampSubstitution(document, indexName);
Expand All @@ -50,7 +50,7 @@ export class DocumentIndexParser implements Parser {
if (lodash.isNil(timestamp)) {
throw new ParserError(
'@timestamp field value has not been defined',
this.constructor.name,
'DocumentIndexParser',
);
}
const tsMomement = moment(timestamp);
Expand All @@ -60,7 +60,7 @@ export class DocumentIndexParser implements Parser {
}
throw new ParserError(
`Unexpected formatting: ${match}`,
this.constructor.name,
'DocumentIndexParser',
);
});
}
Expand All @@ -75,14 +75,14 @@ export class DocumentIndexParser implements Parser {
if (lodash.isNil(substitution)) {
throw new ParserError(
`${fieldName} field value not in document`,
this.constructor.name,
'DocumentIndexParser',
);
}
return substitution;
}
throw new ParserError(
`Unexpected formatting: ${match}`,
this.constructor.name,
'DocumentIndexParser',
);
});
}
Expand Down
4 changes: 2 additions & 2 deletions event-stream-processing/src/parsers/timestamp-field.parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ export class TimestampFieldParser implements Parser {
} else {
throw new ParserError(
`Invalid date: '${value}' invalid for format '${tsFormat}'`,
this.constructor.name,
'TimestampFieldParser',
);
}
} else {
throw new ParserError(
`No value set for timestamp: ${fieldName}`,
this.constructor.name,
'TimestampFieldParser',
);
}
}
Expand Down
6 changes: 4 additions & 2 deletions event-stream-processing/src/parsers/timestamp-guard.parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ export class TimestampGuardParser implements Parser {
)
) {
throw new ParserError(
`Timestamp is outside guard`,
this.constructor.name,
`Timestamp [${timestamp.toISOString()}] is outside guard [${startStr}, ${endStr}]`,
'TimestampGuardParser',
undefined,
{ skipDlq: true },
);
}
}
Expand Down
2 changes: 1 addition & 1 deletion event-stream-processing/src/parsers/url-explode.parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ export class UrlExplodeParser implements Parser {
} catch (e: unknown) {
throw new ParserError(
`Could not parse [${urlOriginal}]`,
this.constructor.name,
'UrlExplodeParser',
);
}
}
Expand Down
3 changes: 1 addition & 2 deletions event-stream-processing/src/types/os-document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ export enum FingerprintCategory {
}

export interface OsDocumentData {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
[key: string]: any;
}

Expand All @@ -44,13 +43,13 @@ export class PipelineProcessingFailure<T> {
constructor(
public source: T,
public message: string,
public options?: { skipDlq: boolean },
) {}
}

export class KinesisStreamRecordDecodeFailure extends PipelineProcessingFailure<KinesisStreamRecord> {}
export class OsDocumentProcessingFailure extends PipelineProcessingFailure<OsDocument> {}
export class OsDocumentCommitFailure extends PipelineProcessingFailure<OsDocument> {}
// eslint-disable-next-line max-len
export type PipelineObject =
| OsDocument
| KinesisStreamRecordDecodeFailure
Expand Down
1 change: 1 addition & 0 deletions event-stream-processing/src/util/parser.error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export class ParserError extends GenericError {
message: string,
public parser: string,
source?: Error | undefined,
public options?: { skipDlq: boolean },
) {
super(message, source);
Object.setPrototypeOf(this, new.target.prototype);
Expand Down

0 comments on commit ddc1db4

Please sign in to comment.