diff --git a/package.json b/package.json index b5b27cb..8616139 100644 --- a/package.json +++ b/package.json @@ -21,12 +21,16 @@ "typescript": "^5.6.2" }, "dependencies": { + "@types/eventsource": "^1.1.15", + "@types/node-fetch": "2", "amqplib": "^0.10.4", "axios": "^1.7.7", "copyfiles": "^2.4.1", "dotenv": "^16.4.5", + "eventsource": "^2.0.2", "express": "^4.20.0", "fs-extra": "^11.2.0", + "node-fetch": "2", "winston": "^3.14.2" } } diff --git a/src/controllers/sseController.ts b/src/controllers/sseController.ts new file mode 100644 index 0000000..fabe5da --- /dev/null +++ b/src/controllers/sseController.ts @@ -0,0 +1,18 @@ +import { Request, Response } from "express"; +import { SSEManager } from "../utils/sseManager"; +import logger from "../utils/logger"; + +export const establishSSEConnection = (req: Request, res: Response) => { + try { + const commitSha = req.params.commitSha; + if (!commitSha) { + res.status(400).json({ error: "Commit SHA is required" }); + return; + } + + SSEManager.getInstance().addConnection(commitSha, res); + } catch (error) { + logger.error("Error establishing SSE connection:", error); + res.status(500).json({ error: "Failed to establish SSE connection" }); + } +}; diff --git a/src/server.ts b/src/server.ts index ba081ce..ab45ba3 100644 --- a/src/server.ts +++ b/src/server.ts @@ -9,12 +9,16 @@ import { } from "./config/rabbitmq"; import logger from "./utils/logger"; import { config } from "./config/config"; +import { establishSSEConnection } from "./controllers/sseController"; const app = express(); app.use(express.json()); const PORT = config.port; +// SSE endpoint +app.get("/logs/:commitSha", establishSSEConnection); + async function startConsumer() { const channel = getChannel(); await channel.consume(rabbitMQConfig.queueTestRunner, async (msg) => { diff --git a/src/utils/runner.ts b/src/utils/runner.ts index b9376ab..f77e777 100644 --- a/src/utils/runner.ts +++ b/src/utils/runner.ts @@ -11,6 +11,7 @@ import fs from "fs/promises"; import path from "path"; import { ProgressResponse } from "../models/types"; import { TestRepoManager } from "./testRepoManager"; +import SSELogger from "./sseLogger"; export async function runTestProcess(request: TestRunRequest): Promise { const { repoUrl, branch, commitSha } = request; @@ -104,6 +105,8 @@ export async function runTestProcess(request: TestRunRequest): Promise { stage: progress.progress_details.current_step, }); + SSELogger.log(commitSha, "Starting test process..."); + const testResult = await runInDocker(imageName, languageConfig.runCommand); logger.info("Test execution completed", { commitSha }); logger.info("Test output:", { @@ -111,6 +114,17 @@ export async function runTestProcess(request: TestRunRequest): Promise { stderr: testResult.stderr, exitCode: testResult.exitCode, }); + if (testResult.stdout) { + SSELogger.log(commitSha, `Test output:\n${testResult.stdout}`); + } + if (testResult.stderr) { + SSELogger.log(commitSha, `Test errors:\n${testResult.stderr}`); + } + + SSELogger.log( + commitSha, + `Test result: ${testResult.exitCode === 0 ? "Success" : "Failed"}`, + ); // Success is now determined by exit code(using this to avoid having to parse stdout/stderr for success/failure) const success = testResult.exitCode === 0; @@ -128,6 +142,7 @@ export async function runTestProcess(request: TestRunRequest): Promise { await reportResults(commitSha, result); } catch (error: any) { logger.error("Error during test process", { error, commitSha }); + SSELogger.log(commitSha, "Test process failed: " + error.message); const errorMessage = error.stderr || error.message || "Unknown error occurred"; await reportResults(commitSha, { diff --git a/src/utils/sseLogger.ts b/src/utils/sseLogger.ts new file mode 100644 index 0000000..f461f53 --- /dev/null +++ b/src/utils/sseLogger.ts @@ -0,0 +1,9 @@ +import { SSEManager } from "./sseManager"; + +class SSELogger { + static log(commitSha: string, message: string): void { + SSEManager.getInstance().sendMessage(commitSha, message); + } +} + +export default SSELogger; diff --git a/src/utils/sseManager.ts b/src/utils/sseManager.ts new file mode 100644 index 0000000..1051caa --- /dev/null +++ b/src/utils/sseManager.ts @@ -0,0 +1,77 @@ +import { Response } from "express"; +import logger from "./logger"; + +export class SSEManager { + private static instance: SSEManager; + private connections: Map; + + private constructor() { + this.connections = new Map(); + } + + public static getInstance(): SSEManager { + if (!SSEManager.instance) { + SSEManager.instance = new SSEManager(); + } + return SSEManager.instance; + } + + public addConnection(commitSha: string, res: Response): void { + // Check if connection already exists + if (this.connections.has(commitSha)) { + logger.warn(`Connection already exists for commit: ${commitSha}`); + return; + } + + // Set headers only once + res.setHeader("Content-Type", "text/event-stream"); + res.setHeader("Cache-Control", "no-cache"); + res.setHeader("Connection", "keep-alive"); + + // Store the connection + this.connections.set(commitSha, res); + + // Send initial message + this.sendMessage(commitSha, "Connected to test runner..."); + + res.on("close", () => { + this.removeConnection(commitSha); + logger.info(`SSE connection closed for commit: ${commitSha}`); + }); + } + + public sendMessage(commitSha: string, message: string): void { + const connection = this.connections.get(commitSha); + if (connection) { + try { + connection.write(`data: ${JSON.stringify({ message })}\n\n`); + logger.debug(`Sent message for ${commitSha}: ${message}`); + } catch (error) { + logger.error(`Error sending message for ${commitSha}:`, error); + this.removeConnection(commitSha); + } + } else { + logger.debug(`No connection found for ${commitSha}`); + } + } + + public removeConnection(commitSha: string): void { + this.connections.delete(commitSha); + logger.debug(`Removed connection for ${commitSha}`); + } + + public closeConnection(commitSha: string): void { + const connection = this.connections.get(commitSha); + if (connection) { + try { + this.sendMessage(commitSha, "Test run completed"); + connection.end(); + this.removeConnection(commitSha); + logger.info(`Closed connection for ${commitSha}`); + } catch (error) { + logger.error(`Error closing connection for ${commitSha}:`, error); + this.removeConnection(commitSha); + } + } + } +} diff --git a/yarn.lock b/yarn.lock index 48cb758..950c8bd 100644 --- a/yarn.lock +++ b/yarn.lock @@ -92,6 +92,11 @@ dependencies: "@types/node" "*" +"@types/eventsource@^1.1.15": + version "1.1.15" + resolved "https://registry.yarnpkg.com/@types/eventsource/-/eventsource-1.1.15.tgz#949383d3482e20557cbecbf3b038368d94b6be27" + integrity sha512-XQmGcbnxUNa06HR3VBVkc9+A2Vpi9ZyLJcdS5dwaQQ/4ZMWFO+5c90FnMUpbtMZwB/FChoYHwuVg8TvkECacTA== + "@types/express-serve-static-core@^4.17.33": version "4.19.5" resolved "https://registry.npmjs.org/@types/express-serve-static-core/-/express-serve-static-core-4.19.5.tgz" @@ -137,6 +142,14 @@ resolved "https://registry.npmjs.org/@types/mime/-/mime-1.3.5.tgz" integrity sha512-/pyBZWSLD2n0dcHE3hq8s8ZvcETHtEuF+3E7XVt0Ig2nvsVQXdghHVcEkIWjy9A0wKfTn97a/PSDYohKIlnP/w== +"@types/node-fetch@2": + version "2.6.12" + resolved "https://registry.yarnpkg.com/@types/node-fetch/-/node-fetch-2.6.12.tgz#8ab5c3ef8330f13100a7479e2cd56d3386830a03" + integrity sha512-8nneRWKCg3rMtF69nLQJnOYUcbafYeFSjqkw3jCRLsqkWFlHaoQrr5mXmofFGOx3DKn7UfmBMyov8ySvLRVldA== + dependencies: + "@types/node" "*" + form-data "^4.0.0" + "@types/node@*", "@types/node@^22.5.4": version "22.5.4" resolved "https://registry.npmjs.org/@types/node/-/node-22.5.4.tgz" @@ -540,6 +553,11 @@ event-stream@=3.3.4: stream-combiner "~0.0.4" through "~2.3.1" +eventsource@^2.0.2: + version "2.0.2" + resolved "https://registry.yarnpkg.com/eventsource/-/eventsource-2.0.2.tgz#76dfcc02930fb2ff339520b6d290da573a9e8508" + integrity sha512-IzUmBGPR3+oUG9dUeXynyNmf91/3zUSJg1lCktzKw47OXuhco54U3r9B7O4XX+Rb1Itm9OZ2b0RkTs10bICOxA== + express@^4.20.0: version "4.20.0" resolved "https://registry.npmjs.org/express/-/express-4.20.0.tgz" @@ -878,6 +896,13 @@ node-cleanup@^2.1.2: resolved "https://registry.npmjs.org/node-cleanup/-/node-cleanup-2.1.2.tgz" integrity sha512-qN8v/s2PAJwGUtr1/hYTpNKlD6Y9rc4p8KSmJXyGdYGZsDGKXrGThikLFP9OCHFeLeEpQzPwiAtdIvBLqm//Hw== +node-fetch@2: + version "2.7.0" + resolved "https://registry.yarnpkg.com/node-fetch/-/node-fetch-2.7.0.tgz#d0f0fa6e3e2dc1d27efcd8ad99d550bda94d187d" + integrity sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A== + dependencies: + whatwg-url "^5.0.0" + noms@0.0.0: version "0.0.0" resolved "https://registry.yarnpkg.com/noms/-/noms-0.0.0.tgz#da8ebd9f3af9d6760919b27d9cdc8092a7332859" @@ -1251,6 +1276,11 @@ toidentifier@1.0.1: resolved "https://registry.npmjs.org/toidentifier/-/toidentifier-1.0.1.tgz" integrity sha512-o5sSPKEkg/DIQNmH43V0/uerLrpzVedkUh8tGNvaeXpfpuwjKenlSox/2O/BTlZUtEe+JG7s5YhEz608PlAHRA== +tr46@~0.0.3: + version "0.0.3" + resolved "https://registry.yarnpkg.com/tr46/-/tr46-0.0.3.tgz#8184fd347dac9cdc185992f3a6622e14b9d9ab6a" + integrity sha512-N3WMsuqV66lT30CrXNbEjx4GEwlow3v6rr4mCcv6prnfwhS01rkgyFdjPNBYd9br7LpXV1+Emh01fHnq2Gdgrw== + triple-beam@^1.3.0: version "1.4.1" resolved "https://registry.yarnpkg.com/triple-beam/-/triple-beam-1.4.1.tgz#6fde70271dc6e5d73ca0c3b24e2d92afb7441984" @@ -1346,6 +1376,19 @@ vary@~1.1.2: resolved "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz" integrity sha512-BNGbWLfd0eUPabhkXUVm0j8uuvREyTh5ovRa/dyow/BqAbZJyC+5fU+IzQOzmAKzYqYRAISoRhdQr3eIZ/PXqg== +webidl-conversions@^3.0.0: + version "3.0.1" + resolved "https://registry.yarnpkg.com/webidl-conversions/-/webidl-conversions-3.0.1.tgz#24534275e2a7bc6be7bc86611cc16ae0a5654871" + integrity sha512-2JAn3z8AR6rjK8Sm8orRC0h/bcl/DqL7tRPdGZ4I1CjdF+EaMLmYxBHyXuKL849eucPFhvBoxMsflfOb8kxaeQ== + +whatwg-url@^5.0.0: + version "5.0.0" + resolved "https://registry.yarnpkg.com/whatwg-url/-/whatwg-url-5.0.0.tgz#966454e8765462e37644d3626f6742ce8b70965d" + integrity sha512-saE57nupxk6v3HY35+jzBwYa0rKSy0XR8JSxZPwgLr7ys0IBzhGviA1/TUGJLmSVqs8pb9AnvICXEuOHLprYTw== + dependencies: + tr46 "~0.0.3" + webidl-conversions "^3.0.0" + which@^2.0.1: version "2.0.2" resolved "https://registry.npmjs.org/which/-/which-2.0.2.tgz"