From c0f88bf988eb276dce4ebda3508c9359c5607122 Mon Sep 17 00:00:00 2001 From: Justin Brooks Date: Fri, 15 Nov 2024 13:29:42 -0500 Subject: [PATCH] add remove hooks --- apps/pty-proxy/src/controller/agent-socket.ts | 19 +++++++++--- packages/job-dispatch/src/resource.ts | 29 +++++++++++-------- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/apps/pty-proxy/src/controller/agent-socket.ts b/apps/pty-proxy/src/controller/agent-socket.ts index f02d1f50..3926028d 100644 --- a/apps/pty-proxy/src/controller/agent-socket.ts +++ b/apps/pty-proxy/src/controller/agent-socket.ts @@ -12,11 +12,12 @@ import { can, getUser } from "@ctrlplane/auth/utils"; import { eq } from "@ctrlplane/db"; import { db } from "@ctrlplane/db/client"; import * as schema from "@ctrlplane/db/schema"; -import { upsertResources } from "@ctrlplane/job-dispatch"; +import { deleteResources, upsertResources } from "@ctrlplane/job-dispatch"; import { logger } from "@ctrlplane/logger"; import { Permission } from "@ctrlplane/validators/auth"; import { agentConnect, agentHeartbeat } from "@ctrlplane/validators/session"; +import { agents } from "./sockets.js"; import { ifMessage } from "./utils.js"; export class AgentSocket { @@ -84,14 +85,16 @@ export class AgentSocket { this.socket.on( "message", ifMessage() - .is(agentConnect, (data) => + .is(agentConnect, (data) => { this.updateResource({ + updatedAt: new Date(), config: data.config, metadata: data.metadata, - }), - ) + }); + }) .is(agentHeartbeat, () => this.updateResource({ + updatedAt: new Date(), metadata: { ...(this.resource?.metadata ?? {}), ["last-heartbeat"]: new Date().toISOString(), @@ -100,6 +103,14 @@ export class AgentSocket { ) .handle(), ); + + this.socket.on("close", () => { + logger.info("Agent disconnected", { agentName: this.name }); + if (this.resource?.id == null) return; + + agents.delete(this.resource.id); + deleteResources(db, [this.resource.id]); + }); } async updateResource( diff --git a/packages/job-dispatch/src/resource.ts b/packages/job-dispatch/src/resource.ts index 37632c3f..3126f40b 100644 --- a/packages/job-dispatch/src/resource.ts +++ b/packages/job-dispatch/src/resource.ts @@ -320,18 +320,13 @@ export const upsertResources = async ( ); if (resourcesToDelete.length > 0) { - await tx - .delete(resource) - .where( - inArray( - resource.id, - resourcesToDelete.map((r) => r.id), - ), - ) - .catch((err) => { - log.error("Error deleting resources", { error: err }); - throw err; - }); + await deleteResources( + tx, + resourcesToDelete.map((r) => r.id), + ).catch((err) => { + log.error("Error deleting resources", { error: err }); + throw err; + }); log.info(`Deleted ${resourcesToDelete.length} resources`, { resourcesToDelete, @@ -344,3 +339,13 @@ export const upsertResources = async ( throw err; } }; + +/** + * Delete resources from the database. + * + * @param tx - The transaction to use. + * @param resourceIds - The ids of the resources to delete. + */ +export const deleteResources = async (tx: Tx, resourceIds: string[]) => { + await tx.delete(resource).where(inArray(resource.id, resourceIds)); +};