Skip to content

Commit

Permalink
Update TunnelService.ts
Browse files Browse the repository at this point in the history
  • Loading branch information
DFanso committed Dec 8, 2024
1 parent 14ee237 commit 6c2ec98
Showing 1 changed file with 95 additions and 16 deletions.
111 changes: 95 additions & 16 deletions src/services/TunnelService.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { EventEmitter } from 'events';
import * as net from 'net';
import * as tls from 'tls';
import WebSocket from 'ws';
import { v4 as uuidv4 } from 'uuid';
import { logger } from '../utils/logger';
Expand Down Expand Up @@ -174,32 +175,46 @@ export class TunnelService extends EventEmitter {
}

try {
// In a VM setup, we need to use WebSocket to forward the request to the client
// Handle WebSocket upgrade requests
if (req.headers.upgrade && req.headers.upgrade.toLowerCase() === 'websocket') {
logger.info(`Handling WebSocket upgrade request for ${subdomain}`);
return this.handleWebSocketUpgrade(req, res, tunnelConfig);
}

// Get request body as Buffer to handle binary data
const body = await this.getRequestBody(req);

// Prepare the message for the client
const message = {
type: 'request',
method: req.method,
path: req.url,
headers: req.headers,
body: await this.getRequestBody(req)
headers: {
...req.headers,
'x-forwarded-proto': req.socket instanceof tls.TLSSocket ? 'https' : 'http',
'x-forwarded-for': req.socket.remoteAddress || '',
'x-real-ip': req.socket.remoteAddress || ''
},
body: body.toString('base64'), // Send binary data as base64
isBase64Encoded: true
};

logger.info(`Forwarding request via WebSocket`, {
subdomain,
method: req.method,
url: req.url,
targetPort: tunnelConfig.targetPort
targetPort: tunnelConfig.targetPort,
contentLength: body.length
});

// Send the request to the client via WebSocket
tunnelConfig.ws.send(JSON.stringify(message));

// Wait for the response from the client
const response = await this.waitForResponse(tunnelConfig.ws);

// Forward the response back to the original requester
res.writeHead(response.statusCode, response.headers);
res.end(response.body);

// Handle streaming responses
const responseStream = await this.waitForResponseStream(tunnelConfig.ws, res);
if (!responseStream) {
throw new Error('Failed to establish response stream');
}
} catch (error) {
logger.error('Error in proxyRequest', { error, subdomain });
res.statusCode = 502;
Expand All @@ -210,19 +225,83 @@ export class TunnelService extends EventEmitter {
}
}

private getRequestBody(req: IncomingMessage): Promise<string> {
private getRequestBody(req: IncomingMessage): Promise<Buffer> {
return new Promise((resolve) => {
let body = '';
const chunks: Buffer[] = [];
req.on('data', chunk => {
body += chunk.toString();
chunks.push(chunk);
});
req.on('end', () => {
resolve(body);
resolve(Buffer.concat(chunks));
});
});
}

private waitForResponse(ws: WebSocket): Promise<any> {
private async handleWebSocketUpgrade(req: IncomingMessage, res: ServerResponse, tunnelConfig: TunnelConfig): Promise<void> {
const message = {
type: 'upgrade',
path: req.url,
headers: req.headers
};

// Send upgrade request to client
tunnelConfig.ws.send(JSON.stringify(message));

// Wait for client to confirm upgrade
const response = await this.waitForResponse(tunnelConfig.ws);
if (response.type === 'upgrade-success') {
// Perform WebSocket upgrade
const upgradeHeader = {
'Upgrade': 'websocket',
'Connection': 'Upgrade',
'Sec-WebSocket-Accept': response.acceptKey
};
res.writeHead(101, upgradeHeader);
res.end();
} else {
res.writeHead(400);
res.end('WebSocket upgrade failed');
}
}

private waitForResponseStream(ws: WebSocket, res: ServerResponse): Promise<boolean> {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error('Response timeout'));
}, 30000);

const messageHandler = (data: WebSocket.Data) => {
try {
const response = JSON.parse(data.toString());

if (response.type === 'response-start') {
// Initial response with status and headers
res.writeHead(response.statusCode, response.headers);
if (!response.isStreaming) {
resolve(true);
}
} else if (response.type === 'response-chunk') {
// Handle streaming chunk
const chunk = Buffer.from(response.data, 'base64');
res.write(chunk);
} else if (response.type === 'response-end') {
// End of response
clearTimeout(timeout);
ws.removeListener('message', messageHandler);
res.end();
resolve(true);
}
} catch (error) {
logger.error('Error parsing response chunk', { error });
reject(error);
}
};

ws.on('message', messageHandler);
});
}

private async waitForResponse(ws: WebSocket): Promise<any> {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error('Response timeout'));
Expand Down

0 comments on commit 6c2ec98

Please sign in to comment.