Skip to content

Commit

Permalink
feat: Async context tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
zermelo-wisen committed Jul 24, 2024
1 parent 01945e6 commit e1e6385
Show file tree
Hide file tree
Showing 18 changed files with 917 additions and 1,135 deletions.
2 changes: 1 addition & 1 deletion src/AppMapStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export default class AppMapStream {
return true;
}

public emit(event: unknown) {
public push(event: unknown) {
if (this.fd === undefined) this.fd = this.open();
else writeSync(this.fd, ",");
writeSync(this.fd, JSON.stringify(event));
Expand Down
97 changes: 77 additions & 20 deletions src/Recording.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import assert from "node:assert";
import { AsyncLocalStorage } from "node:async_hooks";
import { readFileSync, renameSync, rmSync } from "node:fs";
import { join } from "node:path";
import { inspect } from "node:util";
Expand All @@ -13,8 +14,8 @@ import { defaultMetadata } from "./metadata";
import { parameter } from "./parameter";
import type { FunctionInfo } from "./registry";
import compactObject from "./util/compactObject";
import { shouldRecord } from "./recorderControl";
import { getTime } from "./util/getTime";
import { warn } from "./message";

export default class Recording {
constructor(type: AppMap.RecorderType, recorder: string, ...names: string[]) {
Expand All @@ -32,7 +33,7 @@ export default class Recording {

private nextId = 1;
private functionsSeen = new Set<FunctionInfo>();
private stream: AppMapStream | undefined;
private stream: AppMapStream;
private partPath: string;
public readonly path;
public metadata: AppMap.Metadata;
Expand Down Expand Up @@ -198,55 +199,111 @@ export default class Recording {
return event;
}

private emit(event: unknown) {
// Check here if we should record instead of requiring each
// possible hook to check it.
// This is also checked in recorder.record() to prevent
// unnecessary event object creation. Checking this inside hooks,
// (http, sqlite, pg, mysql, ...) will save some CPU cycles but
// will complicate their code.
if (!shouldRecord()) return;
assert(this.stream);
this.stream.emit(event);
}

private eventUpdates: Record<number, AppMap.Event> = {};

fixup(event: AppMap.Event) {
this.eventUpdates[event.id] = event;
if (this.bufferedEvents.has(event.id)) {
const buffered = this.bufferedEvents.get(event.id)!;
if (event === buffered) return;
else Object.assign(buffered, event);
} else this.eventUpdates[event.id] = event;
}

abandon(): void {
if (this.stream?.close()) rmSync(this.partPath);
this.stream = undefined;
if (this.running && this.stream?.close()) rmSync(this.partPath);
this.running = false;
this.disposeBufferedEvents(Recording.rootBuffer);
}

finish(): boolean {
if (!this.running) return false;
this.passEvents(this.stream, Recording.rootBuffer);
const written = this.stream?.close(
compactObject({
classMap: makeClassMap(this.functionsSeen.keys()),
metadata: compactObject(this.metadata),
eventUpdates: Object.keys(this.eventUpdates).length > 0 ? this.eventUpdates : undefined,
}),
);
this.stream = undefined;
if (written) {
renameSync(this.partPath, this.path);
writtenAppMaps.push(this.path);
}
this.running = false;
this.disposeBufferedEvents(Recording.rootBuffer);
return !!written;
}

get running(): boolean {
return !!this.stream;
public running = true;

private bufferedEvents = new Map<number, AppMap.Event>();

public emit(event: AppMap.Event) {
if (!this.running) {
warn("event emitted while recording not running");
return;
}
if (Recording.buffering) {
this.bufferedEvents.set(event.id, event);
Recording.buffer.push({ event, owner: new WeakRef(this) });
} else this.stream.push(event);
}

private static rootBuffer: EventBuffer = [];
private static asyncStorage = new AsyncLocalStorage<EventBuffer>();

private static get buffering(): boolean {
return Recording.rootBuffer.length > 0;
}

private static get buffer(): EventBuffer {
return Recording.asyncStorage.getStore() ?? Recording.rootBuffer;
}

public static fork<T>(fun: () => T): T {
const forked: EventBuffer = [];
Recording.buffer.push(forked);
return Recording.asyncStorage.run(forked, fun);
}

public static run(context: EventBuffer | undefined, fun: () => void) {
if (context) Recording.asyncStorage.run(context, fun);
else fun();
}

public static getContext() {
return Recording.asyncStorage.getStore();
}

readAppMap(): AppMap.AppMap {
assert(!this.running);
return JSON.parse(readFileSync(this.path, "utf8")) as AppMap.AppMap;
}

private passEvents(stream: AppMapStream, buffer: EventBuffer) {
for (const event of buffer) {
if (Array.isArray(event)) this.passEvents(stream, event);
else if (event?.owner.deref() == this) stream.push(event.event);
}
}

private disposeBufferedEvents(buffer: EventBuffer) {
for (let i = 0; i < buffer.length; i++) {
const event = buffer[i];
if (Array.isArray(event)) this.disposeBufferedEvents(event);
else if (event?.owner.deref() == this) buffer[i] = null;
}
}
}

interface EventWithOwner {
owner: WeakRef<Recording>;
event: AppMap.Event;
}

type EventOrBuffer = EventWithOwner | null | EventOrBuffer[];
export type EventBuffer = EventOrBuffer[];

export const writtenAppMaps: string[] = [];

function makeAppMapFilename(name: string): string {
Expand Down
4 changes: 2 additions & 2 deletions src/__tests__/AppMapStream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ describe(AppMapStream, () => {
const stream = new AppMapStream("./test.appmap.json");
expect(stream.seenAny).toBe(false);

stream.emit({ event: "call" });
stream.emit({ event: "return" });
stream.push({ event: "call" });
stream.push({ event: "return" });
expect(stream.close()).toBe(true);

expect(stream.seenAny).toBe(true);
Expand Down
1 change: 0 additions & 1 deletion src/__tests__/Recording.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/* eslint-disable @typescript-eslint/unbound-method */
import { AsyncLocalStorage } from "node:async_hooks";
import { setTimeout } from "node:timers/promises";

import AppMapStream from "../AppMapStream";
import { makeReturnEvent } from "../event";
Expand Down
3 changes: 2 additions & 1 deletion src/hooks/mongo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ function patchMethod<K extends MethodLikeKeys<mongodb.Collection>>(
if (err)
recordings.forEach(
(recording, idx) =>
isActive(recording) && recording.functionException(callEvents[idx].id, err, startTime),
isActive(recording) &&
recording.functionException(callEvents[idx].id, err, startTime),
);
else
recordings.forEach(
Expand Down
3 changes: 2 additions & 1 deletion src/hooks/mysql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ function createQueryProxy(query: mysql.QueryFunction) {
if (err)
recordings.forEach(
(recording, idx) =>
isActive(recording) && recording.functionException(callEvents[idx].id, err, startTime),
isActive(recording) &&
recording.functionException(callEvents[idx].id, err, startTime),
);
else
recordings.forEach(
Expand Down
55 changes: 42 additions & 13 deletions src/hooks/prisma.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import { getActiveRecordings } from "../recorder";
import { FunctionInfo } from "../registry";
import config from "../config";
import { setCustomInspect } from "../parameter";
import { isPromise } from "node:util/types";
import Recording, { EventBuffer } from "../Recording";

const patchedModules = new WeakSet<object>();
const sqlHookAttachedPrismaClientInstances = new WeakSet<object>();
Expand Down Expand Up @@ -124,6 +126,15 @@ function getFunctionInfo(model: string, action: string, moduleId: string) {
// than 2 levels deep structure.
const argsCustomInspect = (v: unknown) => inspect(v, { customInspect: true, depth: 10 });

// We need to create sql events in the same async context with the function call
// events created to represent Prisma query methods. Context is remembered between
// the function call and return events (created after promise is settled) of a Prisma
// query method. On the other hand, events created by listening to direct queries
// sent with prisma.$queryRaw() for example may not appear inside the context of the
// async function running prisma.$queryRaw() because we won't have the same context
// in the callback.
let queryMethodContext: EventBuffer | undefined;

function createPrismaClientMethodProxy<T extends (...args: unknown[]) => unknown>(
prismaClientMethod: T,
moduleId: string,
Expand All @@ -139,9 +150,17 @@ function createPrismaClientMethodProxy<T extends (...args: unknown[]) => unknown
const action = requestParams.action;
const model = requestParams.model;
const argsArg = [setCustomInspect(requestParams.args, argsCustomInspect)];
prismaCalls = recordings.map((recording) =>
recording.functionCall(getFunctionInfo(model, action, moduleId), model, argsArg),
);

queryMethodContext = Recording.getContext();

prismaCalls = recordings.map((recording) => {
const result = recording.functionCall(
getFunctionInfo(model, action, moduleId),
model,
argsArg,
);
return result;
});
}
}

Expand All @@ -151,9 +170,14 @@ function createPrismaClientMethodProxy<T extends (...args: unknown[]) => unknown
try {
const result = target.apply(thisArg, argArray);

recordings.forEach((recording, idx) =>
recording.functionReturn(calls[idx].id, result, startTime),
);
assert(isPromise(result));
void result.finally(() => {
queryMethodContext = undefined;
recordings.forEach((recording, idx) =>
recording.functionReturn(calls[idx].id, result, startTime),
);
});

return result;
} catch (exn: unknown) {
recordings.forEach((recording, idx) =>
Expand Down Expand Up @@ -190,12 +214,17 @@ function attachSqlHook(thisArg: unknown) {
assert("$on" in thisArg && typeof thisArg.$on === "function");
thisArg.$on("query", (queryEvent: QueryEvent) => {
const recordings = getActiveRecordings();
const callEvents = recordings.map((recording) => recording.sqlQuery(dbType, queryEvent.query));
const elapsedSec = queryEvent.duration / 1000.0;
// Give a startTime so that functionReturn calculates same elapsedSec
const startTime = getTime() - elapsedSec;
recordings.forEach((recording, idx) =>
recording.functionReturn(callEvents[idx].id, undefined, startTime),
);

Recording.run(queryMethodContext, () => {
const callEvents = recordings.map((recording) =>
recording.sqlQuery(dbType, queryEvent.query),
);
const elapsedSec = queryEvent.duration / 1000.0;
// Give a startTime so that functionReturn calculates same elapsedSec
const startTime = getTime() - elapsedSec;
recordings.forEach((recording, idx) =>
recording.functionReturn(callEvents[idx].id, undefined, startTime),
);
});
});
}
6 changes: 4 additions & 2 deletions src/recorder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,15 @@ export function record<This, Return>(

const startTime = getTime();
try {
const result = fun.apply(this, args);
const result = funInfo.async
? Recording.fork(() => fun.apply(this, args))
: fun.apply(this, args);
recordings.forEach((recording, idx) =>
recording.functionReturn(callEvents[idx].id, result, startTime),
);
return result;
} catch (exn: unknown) {
recordings.map((recording, idx) =>
recordings.forEach((recording, idx) =>
recording.functionException(callEvents[idx].id, exn, startTime),
);
throw exn;
Expand Down
Loading

0 comments on commit e1e6385

Please sign in to comment.