Skip to content

Commit

Permalink
chore:(feat): impl log streaming to terminal (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
nully0x authored Dec 4, 2024
1 parent 7b629e0 commit a5cd750
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 0 deletions.
4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@
"typescript": "^5.6.2"
},
"dependencies": {
"@types/eventsource": "^1.1.15",
"@types/node-fetch": "2",
"amqplib": "^0.10.4",
"axios": "^1.7.7",
"copyfiles": "^2.4.1",
"dotenv": "^16.4.5",
"eventsource": "^2.0.2",
"express": "^4.20.0",
"fs-extra": "^11.2.0",
"node-fetch": "2",
"winston": "^3.14.2"
}
}
18 changes: 18 additions & 0 deletions src/controllers/sseController.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Request, Response } from "express";
import { SSEManager } from "../utils/sseManager";
import logger from "../utils/logger";

export const establishSSEConnection = (req: Request, res: Response) => {
try {
const commitSha = req.params.commitSha;
if (!commitSha) {
res.status(400).json({ error: "Commit SHA is required" });
return;
}

SSEManager.getInstance().addConnection(commitSha, res);
} catch (error) {
logger.error("Error establishing SSE connection:", error);
res.status(500).json({ error: "Failed to establish SSE connection" });
}
};
4 changes: 4 additions & 0 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ import {
} from "./config/rabbitmq";
import logger from "./utils/logger";
import { config } from "./config/config";
import { establishSSEConnection } from "./controllers/sseController";

const app = express();
app.use(express.json());

const PORT = config.port;

// SSE endpoint
app.get("/logs/:commitSha", establishSSEConnection);

async function startConsumer() {
const channel = getChannel();
await channel.consume(rabbitMQConfig.queueTestRunner, async (msg) => {
Expand Down
15 changes: 15 additions & 0 deletions src/utils/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import fs from "fs/promises";
import path from "path";
import { ProgressResponse } from "../models/types";
import { TestRepoManager } from "./testRepoManager";
import SSELogger from "./sseLogger";

export async function runTestProcess(request: TestRunRequest): Promise<void> {
const { repoUrl, branch, commitSha } = request;
Expand Down Expand Up @@ -104,13 +105,26 @@ export async function runTestProcess(request: TestRunRequest): Promise<void> {
stage: progress.progress_details.current_step,
});

SSELogger.log(commitSha, "Starting test process...");

const testResult = await runInDocker(imageName, languageConfig.runCommand);
logger.info("Test execution completed", { commitSha });
logger.info("Test output:", {
stdout: testResult.stdout,
stderr: testResult.stderr,
exitCode: testResult.exitCode,
});
if (testResult.stdout) {
SSELogger.log(commitSha, `Test output:\n${testResult.stdout}`);
}
if (testResult.stderr) {
SSELogger.log(commitSha, `Test errors:\n${testResult.stderr}`);
}

SSELogger.log(
commitSha,
`Test result: ${testResult.exitCode === 0 ? "Success" : "Failed"}`,
);

// Success is now determined by exit code(using this to avoid having to parse stdout/stderr for success/failure)
const success = testResult.exitCode === 0;
Expand All @@ -128,6 +142,7 @@ export async function runTestProcess(request: TestRunRequest): Promise<void> {
await reportResults(commitSha, result);
} catch (error: any) {
logger.error("Error during test process", { error, commitSha });
SSELogger.log(commitSha, "Test process failed: " + error.message);
const errorMessage =
error.stderr || error.message || "Unknown error occurred";
await reportResults(commitSha, {
Expand Down
9 changes: 9 additions & 0 deletions src/utils/sseLogger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { SSEManager } from "./sseManager";

class SSELogger {
static log(commitSha: string, message: string): void {
SSEManager.getInstance().sendMessage(commitSha, message);
}
}

export default SSELogger;
77 changes: 77 additions & 0 deletions src/utils/sseManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { Response } from "express";
import logger from "./logger";

export class SSEManager {
private static instance: SSEManager;
private connections: Map<string, Response>;

private constructor() {
this.connections = new Map();
}

public static getInstance(): SSEManager {
if (!SSEManager.instance) {
SSEManager.instance = new SSEManager();
}
return SSEManager.instance;
}

public addConnection(commitSha: string, res: Response): void {
// Check if connection already exists
if (this.connections.has(commitSha)) {
logger.warn(`Connection already exists for commit: ${commitSha}`);
return;
}

// Set headers only once
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");

// Store the connection
this.connections.set(commitSha, res);

// Send initial message
this.sendMessage(commitSha, "Connected to test runner...");

res.on("close", () => {
this.removeConnection(commitSha);
logger.info(`SSE connection closed for commit: ${commitSha}`);
});
}

public sendMessage(commitSha: string, message: string): void {
const connection = this.connections.get(commitSha);
if (connection) {
try {
connection.write(`data: ${JSON.stringify({ message })}\n\n`);
logger.debug(`Sent message for ${commitSha}: ${message}`);
} catch (error) {
logger.error(`Error sending message for ${commitSha}:`, error);
this.removeConnection(commitSha);
}
} else {
logger.debug(`No connection found for ${commitSha}`);
}
}

public removeConnection(commitSha: string): void {
this.connections.delete(commitSha);
logger.debug(`Removed connection for ${commitSha}`);
}

public closeConnection(commitSha: string): void {
const connection = this.connections.get(commitSha);
if (connection) {
try {
this.sendMessage(commitSha, "Test run completed");
connection.end();
this.removeConnection(commitSha);
logger.info(`Closed connection for ${commitSha}`);
} catch (error) {
logger.error(`Error closing connection for ${commitSha}:`, error);
this.removeConnection(commitSha);
}
}
}
}
43 changes: 43 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@
dependencies:
"@types/node" "*"

"@types/eventsource@^1.1.15":
version "1.1.15"
resolved "https://registry.yarnpkg.com/@types/eventsource/-/eventsource-1.1.15.tgz#949383d3482e20557cbecbf3b038368d94b6be27"
integrity sha512-XQmGcbnxUNa06HR3VBVkc9+A2Vpi9ZyLJcdS5dwaQQ/4ZMWFO+5c90FnMUpbtMZwB/FChoYHwuVg8TvkECacTA==

"@types/express-serve-static-core@^4.17.33":
version "4.19.5"
resolved "https://registry.npmjs.org/@types/express-serve-static-core/-/express-serve-static-core-4.19.5.tgz"
Expand Down Expand Up @@ -137,6 +142,14 @@
resolved "https://registry.npmjs.org/@types/mime/-/mime-1.3.5.tgz"
integrity sha512-/pyBZWSLD2n0dcHE3hq8s8ZvcETHtEuF+3E7XVt0Ig2nvsVQXdghHVcEkIWjy9A0wKfTn97a/PSDYohKIlnP/w==

"@types/node-fetch@2":
version "2.6.12"
resolved "https://registry.yarnpkg.com/@types/node-fetch/-/node-fetch-2.6.12.tgz#8ab5c3ef8330f13100a7479e2cd56d3386830a03"
integrity sha512-8nneRWKCg3rMtF69nLQJnOYUcbafYeFSjqkw3jCRLsqkWFlHaoQrr5mXmofFGOx3DKn7UfmBMyov8ySvLRVldA==
dependencies:
"@types/node" "*"
form-data "^4.0.0"

"@types/node@*", "@types/node@^22.5.4":
version "22.5.4"
resolved "https://registry.npmjs.org/@types/node/-/node-22.5.4.tgz"
Expand Down Expand Up @@ -540,6 +553,11 @@ event-stream@=3.3.4:
stream-combiner "~0.0.4"
through "~2.3.1"

eventsource@^2.0.2:
version "2.0.2"
resolved "https://registry.yarnpkg.com/eventsource/-/eventsource-2.0.2.tgz#76dfcc02930fb2ff339520b6d290da573a9e8508"
integrity sha512-IzUmBGPR3+oUG9dUeXynyNmf91/3zUSJg1lCktzKw47OXuhco54U3r9B7O4XX+Rb1Itm9OZ2b0RkTs10bICOxA==

express@^4.20.0:
version "4.20.0"
resolved "https://registry.npmjs.org/express/-/express-4.20.0.tgz"
Expand Down Expand Up @@ -878,6 +896,13 @@ node-cleanup@^2.1.2:
resolved "https://registry.npmjs.org/node-cleanup/-/node-cleanup-2.1.2.tgz"
integrity sha512-qN8v/s2PAJwGUtr1/hYTpNKlD6Y9rc4p8KSmJXyGdYGZsDGKXrGThikLFP9OCHFeLeEpQzPwiAtdIvBLqm//Hw==

node-fetch@2:
version "2.7.0"
resolved "https://registry.yarnpkg.com/node-fetch/-/node-fetch-2.7.0.tgz#d0f0fa6e3e2dc1d27efcd8ad99d550bda94d187d"
integrity sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A==
dependencies:
whatwg-url "^5.0.0"

[email protected]:
version "0.0.0"
resolved "https://registry.yarnpkg.com/noms/-/noms-0.0.0.tgz#da8ebd9f3af9d6760919b27d9cdc8092a7332859"
Expand Down Expand Up @@ -1251,6 +1276,11 @@ [email protected]:
resolved "https://registry.npmjs.org/toidentifier/-/toidentifier-1.0.1.tgz"
integrity sha512-o5sSPKEkg/DIQNmH43V0/uerLrpzVedkUh8tGNvaeXpfpuwjKenlSox/2O/BTlZUtEe+JG7s5YhEz608PlAHRA==

tr46@~0.0.3:
version "0.0.3"
resolved "https://registry.yarnpkg.com/tr46/-/tr46-0.0.3.tgz#8184fd347dac9cdc185992f3a6622e14b9d9ab6a"
integrity sha512-N3WMsuqV66lT30CrXNbEjx4GEwlow3v6rr4mCcv6prnfwhS01rkgyFdjPNBYd9br7LpXV1+Emh01fHnq2Gdgrw==

triple-beam@^1.3.0:
version "1.4.1"
resolved "https://registry.yarnpkg.com/triple-beam/-/triple-beam-1.4.1.tgz#6fde70271dc6e5d73ca0c3b24e2d92afb7441984"
Expand Down Expand Up @@ -1346,6 +1376,19 @@ vary@~1.1.2:
resolved "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz"
integrity sha512-BNGbWLfd0eUPabhkXUVm0j8uuvREyTh5ovRa/dyow/BqAbZJyC+5fU+IzQOzmAKzYqYRAISoRhdQr3eIZ/PXqg==

webidl-conversions@^3.0.0:
version "3.0.1"
resolved "https://registry.yarnpkg.com/webidl-conversions/-/webidl-conversions-3.0.1.tgz#24534275e2a7bc6be7bc86611cc16ae0a5654871"
integrity sha512-2JAn3z8AR6rjK8Sm8orRC0h/bcl/DqL7tRPdGZ4I1CjdF+EaMLmYxBHyXuKL849eucPFhvBoxMsflfOb8kxaeQ==

whatwg-url@^5.0.0:
version "5.0.0"
resolved "https://registry.yarnpkg.com/whatwg-url/-/whatwg-url-5.0.0.tgz#966454e8765462e37644d3626f6742ce8b70965d"
integrity sha512-saE57nupxk6v3HY35+jzBwYa0rKSy0XR8JSxZPwgLr7ys0IBzhGviA1/TUGJLmSVqs8pb9AnvICXEuOHLprYTw==
dependencies:
tr46 "~0.0.3"
webidl-conversions "^3.0.0"

which@^2.0.1:
version "2.0.2"
resolved "https://registry.npmjs.org/which/-/which-2.0.2.tgz"
Expand Down

0 comments on commit a5cd750

Please sign in to comment.