Skip to content

Commit

Permalink
Add correlation to all requests and the logger (#222)
Browse files Browse the repository at this point in the history
* feat: add correlation to all requests and the pattern layout

* test: add tests for corelation id

* chore: update backbone version

* chore: revert eslint update

* chore: fix eslint

* chore: add more correlation id injections

* chore: rename correlation id library

* chore: update dependencies

* chore: rename launcher output variable

* chore: update dependencies in sdk

* chore: bump runtime

* chore: naming

* chore: do not use object.assign

* fix: merge conflict

* chore: update package-lock

* chore: remove tests

* test: add test for correlation id via webhooks

* chore: pr comments

* chore: create new event with http headers

* chore: update package-lock

* chore: eslint

* chore: test naming

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Co-authored-by: Julian König <[email protected]>
  • Loading branch information
3 people authored Sep 24, 2024
1 parent ecec859 commit 320d07a
Show file tree
Hide file tree
Showing 15 changed files with 139 additions and 17 deletions.
2 changes: 1 addition & 1 deletion .dev/compose.backbone.env
Original file line number Diff line number Diff line change
@@ -1 +1 @@
BACKBONE_VERSION=6.5.1
BACKBONE_VERSION=6.7.1
5 changes: 3 additions & 2 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
"appenders": {
"fileAppender": {
"type": "dateFile",
"filename": "/var/log/enmeshed-connector/latest.log"
"filename": "./latest.log",
"layout": { "type": "pattern", "pattern": "[%d] [%p] %c - %m %x{correlationId}" }
},
"consoleAppender": {
"type": "stdout",
"layout": { "type": "pattern", "pattern": "%[[%d] [%p] %c - %m%]" }
"layout": { "type": "pattern", "pattern": "%[[%d] [%p] %c - %m %x{correlationId}%]" }
},
"console": {
"type": "logLevelFilter",
Expand Down
2 changes: 1 addition & 1 deletion config/test.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"appenders": {
"consoleAppender": {
"type": "stdout",
"layout": { "type": "pattern", "pattern": "%[[%d] [%p] %c - %m%]" }
"layout": { "type": "pattern", "pattern": "%[[%d] [%p] %c - %m %x{correlationId}%]" }
},
"console": {
"type": "logLevelFilter",
Expand Down
1 change: 1 addition & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
"amqplib": "^0.10.4",
"axios": "^1.7.7",
"compression": "1.7.4",
"correlation-id": "^5.2.0",
"cors": "2.8.5",
"eventsource": "^2.0.2",
"express": "4.21.0",
Expand Down
3 changes: 2 additions & 1 deletion src/ConnectorRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { ConsumptionController } from "@nmshd/consumption";
import { ConsumptionServices, DataViewExpander, GetIdentityInfoResponse, ModuleConfiguration, Runtime, RuntimeHealth, RuntimeServices, TransportServices } from "@nmshd/runtime";
import { AccountController, TransportCoreErrors } from "@nmshd/transport";
import axios from "axios";
import correlator from "correlation-id";
import fs from "fs";
import { HttpsProxyAgent } from "https-proxy-agent";
import { validate as validateSchema } from "jsonschema";
Expand Down Expand Up @@ -57,7 +58,7 @@ export class ConnectorRuntime extends Runtime<ConnectorRuntimeConfig> {
private healthChecker: HealthChecker;

private constructor(connectorConfig: ConnectorRuntimeConfig, loggerFactory: NodeLoggerFactory) {
super(connectorConfig, loggerFactory);
super(connectorConfig, loggerFactory, undefined, correlator);
}

public static async create(connectorConfig: ConnectorRuntimeConfig): Promise<ConnectorRuntime> {
Expand Down
17 changes: 17 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { RuntimeConfig } from "@nmshd/runtime";
import correlator from "correlation-id";
import nconf from "nconf";
import { ConnectorRuntime } from "./ConnectorRuntime";
import { ConnectorRuntimeConfig } from "./ConnectorRuntimeConfig";
Expand Down Expand Up @@ -28,6 +29,7 @@ export function createConnectorConfig(overrides?: RuntimeConfig): ConnectorRunti
.file("default-file", { file: "config/default.json" });

const connectorConfig = nconf.get() as ConnectorRuntimeConfig;
addCorrelationIdSupportToLogger(connectorConfig);

if (connectorConfig.modules.sync.enabled && connectorConfig.modules.sse.enabled) {
// eslint-disable-next-line no-console
Expand All @@ -38,6 +40,21 @@ export function createConnectorConfig(overrides?: RuntimeConfig): ConnectorRunti
return connectorConfig;
}

function addCorrelationIdSupportToLogger(connectorConfig: ConnectorRuntimeConfig) {
Object.entries(connectorConfig.logging.appenders).forEach(([_key, appender]) => {
if ("layout" in appender && appender.layout.type === "pattern") {
const tokens = appender.layout.tokens;

appender.layout.tokens = {
...tokens,
correlationId: () => {
return correlator.getId() ?? "";
}
};
}
});
}

const envKeyMapping: Record<string, string> = {
DATABASE_NAME: "database:dbName", // eslint-disable-line @typescript-eslint/naming-convention
API_KEY: "infrastructure:httpServer:apiKey", // eslint-disable-line @typescript-eslint/naming-convention
Expand Down
13 changes: 13 additions & 0 deletions src/infrastructure/httpServer/HttpServer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { sleep } from "@js-soft/ts-utils";
import compression from "compression";
import correlator from "correlation-id";
import cors, { CorsOptions } from "cors";
import express, { Application, RequestHandler } from "express";
import helmet, { HelmetOptions } from "helmet";
Expand Down Expand Up @@ -58,6 +59,18 @@ export class HttpServer extends ConnectorInfrastructure<HttpServerConfiguration>
private configure(): void {
this.logger.debug("Configuring middleware...");

this.app.use((req, res, next) => {
let correlationId = req.headers["x-correlation-id"];
if (Array.isArray(correlationId)) {
correlationId = correlationId[0];
}
if (correlationId) {
correlator.withId(correlationId, next);
} else {
correlator.withId(next);
}
});

this.app.use(helmet(this.getHelmetOptions()));

this.app.use(requestLogger(this.logger));
Expand Down
12 changes: 7 additions & 5 deletions src/modules/sse/SseModule.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { ILogger } from "@js-soft/logging-abstractions";
import correlator from "correlation-id";
import eventSourceModule from "eventsource";
import { ConnectorMode } from "../../ConnectorMode";
import { ConnectorRuntime } from "../../ConnectorRuntime";
Expand Down Expand Up @@ -88,11 +89,12 @@ export default class SseModule extends ConnectorRuntimeModule<SseModuleConfigura

const services = this.runtime.getServices();

const syncResult = await services.transportServices.account.syncEverything();
if (syncResult.isError) {
this.logger.error(syncResult);
return;
}
await correlator.withId(async () => {
const syncResult = await services.transportServices.account.syncEverything();
if (syncResult.isError) {
this.logger.error(syncResult);
}
});
}

public stop(): void {
Expand Down
7 changes: 5 additions & 2 deletions src/modules/sync/SyncModule.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import correlator from "correlation-id";
import { ConnectorRuntimeModule, ConnectorRuntimeModuleConfiguration } from "../../ConnectorRuntimeModule";

export interface SyncModuleConfiguration extends ConnectorRuntimeModuleConfiguration {
Expand All @@ -16,8 +17,10 @@ export default class SyncModule extends ConnectorRuntimeModule<SyncModuleConfigu
}

private async sync() {
const result = await this.runtime.getServices().transportServices.account.syncEverything();
if (result.isError) this.logger.error("Sync failed", result.error);
await correlator.withId(async () => {
const result = await this.runtime.getServices().transportServices.account.syncEverything();
if (result.isError) this.logger.error("Sync failed", result.error);
});

this.syncTimeout = setTimeout(async () => await this.sync(), this.configuration.interval * 1000);
}
Expand Down
9 changes: 8 additions & 1 deletion src/modules/webhooks/WebhooksModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Event, DataEvent as tsUtilsDataEvent } from "@js-soft/ts-utils";
import { DataEvent } from "@nmshd/runtime";
import agentKeepAlive, { HttpsAgent as AgentKeepAliveHttps } from "agentkeepalive";
import axios, { AxiosInstance } from "axios";
import correlator from "correlation-id";
import { ConnectorRuntimeModule } from "../../ConnectorRuntimeModule";
import { ConfigModel, Webhook } from "./ConfigModel";
import { ConfigParser } from "./ConfigParser";
Expand Down Expand Up @@ -45,7 +46,13 @@ export default class WebhooksModule extends ConnectorRuntimeModule<WebhooksModul
try {
this.logger.debug(`Sending request to webhook '${url}' for trigger '${trigger}'.`);

const response = await this.axios.post(url, payload, { headers: webhook.target.headers });
let headers = webhook.target.headers;

const correlationId = correlator.getId();
// eslint-disable-next-line @typescript-eslint/naming-convention
if (correlationId) headers = { ...headers, "x-correlation-id": correlationId };

const response = await this.axios.post(url, payload, { headers });

if (response.status < 200 || response.status > 299) {
this.logger.warn(`Request to webhook '${url}' returned status ${response.status}. Expected value between 200 and 299.`);
Expand Down
65 changes: 65 additions & 0 deletions test/correlationId.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { AxiosInstance } from "axios";
import { randomUUID } from "crypto";
import { DateTime } from "luxon";
import { ConnectorClientWithMetadata, Launcher } from "./lib/Launcher";
import { getTimeout } from "./lib/setTimeout";
import { establishRelationship } from "./lib/testUtils";

const launcher = new Launcher();
let axiosClient: AxiosInstance;
let connectorClient1: ConnectorClientWithMetadata;
let connectorClient2: ConnectorClientWithMetadata;
let account2Address: string;
const uuidRegex = new RegExp("[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}");

beforeAll(async () => {
[connectorClient1, connectorClient2] = await launcher.launch(2);
axiosClient = connectorClient1["account"]["httpClient"];
await establishRelationship(connectorClient1, connectorClient2);
account2Address = (await connectorClient2.account.getIdentityInfo()).result.address;
}, getTimeout(30000));

afterAll(() => launcher.stop());

describe("test the correlation ids", () => {
// eslint-disable-next-line jest/expect-expect
test("should send a random correlation id via webhook", async () => {
connectorClient1._eventBus?.reset();

await axiosClient.post<any>("/api/v2/Requests/Outgoing", {
content: {
items: [{ "@type": "ReadAttributeRequestItem", mustBeAccepted: false, query: { "@type": "IdentityAttributeQuery", valueType: "Surname" } }],
expiresAt: DateTime.now().plus({ hour: 1 }).toISO()
},
peer: account2Address
});

await connectorClient1._eventBus?.waitForEvent("consumption.outgoingRequestCreated", (event: any) => {
return uuidRegex.test(event.headers["x-correlation-id"]);
});
});

// eslint-disable-next-line jest/expect-expect
test("should send a custom correlation id via webhook", async () => {
connectorClient1._eventBus?.reset();

const customCorrelationId = randomUUID();

await axiosClient.post<any>(
"/api/v2/Requests/Outgoing",
{
content: {
items: [{ "@type": "ReadAttributeRequestItem", mustBeAccepted: false, query: { "@type": "IdentityAttributeQuery", valueType: "Surname" } }],
expiresAt: DateTime.now().plus({ hour: 1 }).toISO()
},
peer: account2Address
},
// eslint-disable-next-line @typescript-eslint/naming-convention
{ headers: { "x-correlation-id": customCorrelationId } }
);

await connectorClient1._eventBus?.waitForEvent("consumption.outgoingRequestCreated", (event: any) => {
return event.headers["x-correlation-id"] === customCorrelationId;
});
});
});
11 changes: 11 additions & 0 deletions test/lib/DataEventWithHeader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { DataEvent } from "@js-soft/ts-utils";
import { IncomingHttpHeaders } from "node:http";

export class DataEventWithHeaders<T> extends DataEvent<T> {
public readonly headers: IncomingHttpHeaders;

public constructor(namespace: string, data: T, headers: IncomingHttpHeaders) {
super(namespace, data);
this.headers = headers;
}
}
4 changes: 2 additions & 2 deletions test/lib/Launcher.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { DataEvent } from "@js-soft/ts-utils";
import { ConnectorClient } from "@nmshd/connector-sdk";
import { Random, RandomCharacterRange } from "@nmshd/transport";
import { ChildProcess, spawn } from "child_process";
Expand All @@ -7,6 +6,7 @@ import http, { Server } from "node:http";
import https from "node:https";
import inspector from "node:inspector";
import path from "path";
import { DataEventWithHeaders } from "./DataEventWithHeader";
import { MockEventBus } from "./MockEventBus";
import getPort from "./getPort";
import waitForConnector from "./waitForConnector";
Expand Down Expand Up @@ -129,7 +129,7 @@ export class Launcher {
.use((req, res) => {
res.status(200).send("OK");

eventBus.publish(new DataEvent(req.body.trigger, req.body.data));
eventBus.publish(new DataEventWithHeaders(req.body.trigger, req.body.data, req.headers));
})
.listen(port);
}
Expand Down
4 changes: 2 additions & 2 deletions test/lib/MockEventBus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ export class MockEventBus extends EventEmitter2EventBus {
this.publishPromises.push(this.emitter.emitAsync(namespace, event));
}

public async waitForEvent<TEvent extends Event>(subscriptionTarget: string, predicate?: (event: TEvent) => boolean): Promise<TEvent> {
public async waitForEvent<TEvent extends Event>(subscriptionTarget: string, predicate?: (event: TEvent) => boolean, timeout?: number): Promise<TEvent> {
const alreadyTriggeredEvents = this.publishedEvents.find((e) => e.namespace === subscriptionTarget && (!predicate || predicate(e as TEvent))) as TEvent | undefined;
if (alreadyTriggeredEvents) {
return alreadyTriggeredEvents;
}

const event = await waitForEvent(this, subscriptionTarget, predicate);
const event = await waitForEvent(this, subscriptionTarget, predicate, timeout);
return event;
}

Expand Down

0 comments on commit 320d07a

Please sign in to comment.