Skip to content

Commit

Permalink
Merge pull request #306 from platinouss/fix/240229-media-server-memor…
Browse files Browse the repository at this point in the history
…y-leak

Fix(#302): 미디어 서버 메모리 누수 해결
  • Loading branch information
tmddus2 authored Mar 5, 2024
2 parents 9af87ba + bf62e4c commit ccd291b
Show file tree
Hide file tree
Showing 17 changed files with 271 additions and 148 deletions.
51 changes: 43 additions & 8 deletions mediaServer/src/RelayServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import { ClientConnectionInfo } from './models/ClientConnectionInfo';

export class RelayServer {
private readonly _io;
private readonly _roomsConnectionInfo: Map<string, RoomConnectionInfo>;
private readonly _clientsConnectionInfo: Map<string, ClientConnectionInfo>;
private readonly _roomConnectionInfoList: Map<string, RoomConnectionInfo>;
private readonly _clientConnectionInfoList: Map<string, ClientConnectionInfo>;
private readonly _scheduledEndLectureList: Map<string, number>;

constructor(port: number) {
this._roomsConnectionInfo = new Map();
this._clientsConnectionInfo = new Map();
this._roomConnectionInfoList = new Map();
this._clientConnectionInfoList = new Map();
this._scheduledEndLectureList = new Map();
this._io = new Server(port, {
cors: {
// TODO: 특정 URL만 origin 하도록 수정 필요
Expand All @@ -23,15 +25,48 @@ export class RelayServer {
return this._io;
}

get roomsConnectionInfo() {
return this._roomsConnectionInfo;
get roomConnectionInfoList() {
return this._roomConnectionInfoList;
}

get clientsConnectionInfo() {
return this._clientsConnectionInfo;
get clientConnectionInfoList() {
return this._clientConnectionInfoList;
}

get scheduledEndLectureList() {
return this._scheduledEndLectureList;
}

listen = (path: string, event: string, method: (socket: Socket) => void) => {
this._io.of(path).on(event, method);
};

deleteRoom = (presenterEmail: string, roomId: string) => {
const roomConnectionInfo = this._roomConnectionInfoList.get(roomId);
if (!roomConnectionInfo) {
console.log('존재하지 않는 방입니다.');
return;
}
roomConnectionInfo.closeParticipantConnection(roomId);
this._roomConnectionInfoList.delete(roomId);
const presenterConnectionInfo = this._clientConnectionInfoList.get(presenterEmail);
if (!presenterConnectionInfo) {
console.log('존재하지 않는 발표자입니다.');
return;
}
presenterConnectionInfo.disconnectWebRTCConnection();
presenterConnectionInfo.disconnectSocket(presenterEmail, roomId);
this._clientConnectionInfoList.delete(presenterEmail);
this._scheduledEndLectureList.delete(roomId);
};

clearScheduledEndLecture = (roomId: string) => {
const timerId = this._scheduledEndLectureList.get(roomId);
if (!timerId) {
console.log('이미 종료됐거나 열리지 않은 강의실입니다.');
return;
}
clearTimeout(timerId);
this._scheduledEndLectureList.delete(roomId);
};
}
3 changes: 3 additions & 0 deletions mediaServer/src/config/lecture.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
const RECONNECT_TIMEOUT = 1000 * 60 * 5;

export { RECONNECT_TIMEOUT };
5 changes: 5 additions & 0 deletions mediaServer/src/constants/media-converter.constant.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import path from 'path';

const AUDIO_OUTPUT_DIR = path.join(process.cwd(), 'output');

export { AUDIO_OUTPUT_DIR };
9 changes: 5 additions & 4 deletions mediaServer/src/listeners/client.listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import { findRoomInfoById } from '../repositories/room.repository';
import { ClientConnectionInfo } from '../models/ClientConnectionInfo';
import { relayServer } from '../main';
import { getEmailByJwtPayload } from '../utils/auth';
import { isNotEqualPresenterEmail } from '../validation/client.validation';
import { isNotEqualPresenterEmail, isReconnectPresenter } from '../validation/client.validation';
import { RTCPeerConnection } from 'wrtc';
import { pc_config } from '../config/pc.config';
import { isReconnectPresenter, sendPrevLectureData, setPresenterConnection } from '../services/presenter.service';
import { sendPrevLectureData, setPresenterConnection } from '../services/presenter.service';
import { setParticipantWebRTCConnection, setPresenterWebRTCConnection } from '../services/webrtc-connection.service';
import { hasCurrentBoardDataInLecture } from '../validation/lecture.validation';

Expand All @@ -24,8 +24,9 @@ export class ClientListener {
return;
}
socket.join(email);
relayServer.clientsConnectionInfo.set(email, new ClientConnectionInfo(RTCPC, socket));
relayServer.clientConnectionInfoList.set(email, new ClientConnectionInfo(RTCPC, socket));
if (isReconnectPresenter(roomInfo.presenterEmail, email)) {
relayServer.clearScheduledEndLecture(data.roomId);
await sendPrevLectureData(data.roomId, email, roomInfo);
} else {
await setPresenterConnection(data.roomId, email, RTCPC, data.whiteboard);
Expand All @@ -44,7 +45,7 @@ export class ClientListener {
const RTCPC = new RTCPeerConnection(pc_config);
socket.join(clientId);
await saveClientInfo(clientId, clientType, data.roomId);
relayServer.clientsConnectionInfo.set(clientId, new ClientConnectionInfo(RTCPC, socket));
relayServer.clientConnectionInfoList.set(clientId, new ClientConnectionInfo(RTCPC, socket));
const roomInfo = await findRoomInfoById(data.roomId);
if (!hasCurrentBoardDataInLecture(roomInfo.currentWhiteboardData)) {
return;
Expand Down
73 changes: 63 additions & 10 deletions mediaServer/src/listeners/lecture.listener.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,32 @@
import { Socket } from 'socket.io';
import { askQuestion, enterAsGuest, getClientEmail, leaveRoom } from '../services/participant.service';
import { askQuestion, getClientEmail, leaveRoom } from '../services/participant.service';
import { findClientInfoByEmail } from '../repositories/client.repsitory';
import { findRoomInfoById } from '../repositories/room.repository';
import { editWhiteboard, endLecture } from '../services/presenter.service';
import { updateQuestionStatus } from '../repositories/question-repository';
import { relayServer } from '../main';
import { startLecture } from '../services/lecture.service';
import { isParticipant, isParticipatingClient, isPresenter } from '../validation/client.validation';
import { scheduleEndLecture, startLecture } from '../services/lecture.service';
import { isGuest, isParticipant, isParticipatingClient, isPresenter } from '../validation/client.validation';
import { ClientConnectionInfo } from '../models/ClientConnectionInfo';
import { RoomConnectionInfo } from '../models/RoomConnectionInfo';
import { canEnterLecture } from '../validation/lecture.validation';
import { canEnterLecture, isLectureOngoing } from '../validation/lecture.validation';
import { mediaConverter } from '../utils/media-converter';

export class LectureListener {
// TODO: 클라이언트는 한 개의 방만 접속할 수 있는지? 만약 그렇다면, 이미 참여 중인 빙이 있을 때 요청 거부하도록 처리해야 함
lecture = async (socket: Socket) => {
enterLecture = async (socket: Socket) => {
const email = getClientEmail(socket);
if (!email) {
await enterAsGuest(socket);
await this.enterLectureAsGuest(socket);
return;
}
const clientInfo = await findClientInfoByEmail(email);
const clientConnectionInfo = (await relayServer.clientsConnectionInfo.get(email)) as ClientConnectionInfo;
if (!isParticipatingClient(clientInfo, clientConnectionInfo, clientInfo.roomId)) {
const clientConnectionInfo = (await relayServer.clientConnectionInfoList.get(email)) as ClientConnectionInfo;
if (!isParticipatingClient(email, clientInfo, clientInfo.roomId)) {
return;
}
const roomInfo = await findRoomInfoById(clientInfo.roomId);
const roomConnectionInfo = relayServer.roomsConnectionInfo.get(clientInfo.roomId) as RoomConnectionInfo;
const roomConnectionInfo = relayServer.roomConnectionInfoList.get(clientInfo.roomId) as RoomConnectionInfo;
if (!canEnterLecture(roomConnectionInfo)) {
return;
}
Expand Down Expand Up @@ -69,7 +70,59 @@ export class LectureListener {
if (isParticipant(clientInfo.type, clientInfo.roomId, data.roomId)) {
return;
}
leaveRoom(clientInfo.roomId, clientConnectionInfo);
leaveRoom(clientInfo.roomId, email);
});

socket.on('disconnect', (reason) => {
if (isPresenter(clientInfo.type, clientInfo.roomId, clientInfo.roomId) && isLectureOngoing(reason)) {
scheduleEndLecture(clientInfo.roomId, email);
const presenterStreamInfo = mediaConverter.getPresenterStreamInfo(clientInfo.roomId);
if (!presenterStreamInfo) {
console.log('존재하지 않는 강의실입니다.');
return;
}
presenterStreamInfo.pauseRecording();
}
if (isParticipant(clientInfo.type, clientInfo.roomId, clientInfo.roomId)) {
leaveRoom(clientInfo.roomId, email);
}
});
};

enterLectureAsGuest = async (socket: Socket) => {
const clientId = socket.handshake.auth.accessToken;
if (!clientId) {
console.log('잘못 된 접근입니다.');
return;
}
const clientInfo = await findClientInfoByEmail(clientId);
const roomInfo = await findRoomInfoById(clientInfo.roomId);
const roomConnectionInfo = relayServer.roomConnectionInfoList.get(clientInfo.roomId) as RoomConnectionInfo;
if (!isParticipatingClient(clientId, clientInfo, clientInfo.roomId)) {
return;
}
if (!canEnterLecture(roomConnectionInfo)) {
return;
}
socket.join(clientInfo.roomId);
roomConnectionInfo.participantIdList.add(clientId);

socket.on('ask', async (data) => {
if (!isGuest(clientInfo.type, clientInfo.roomId, data.roomId)) {
return;
}
await askQuestion(roomInfo.presenterEmail, data.roomId, data.content);
});

socket.on('leave', (data) => {
if (isParticipant(clientInfo.type, clientInfo.roomId, data.roomId)) {
return;
}
leaveRoom(clientInfo.roomId, clientId);
});

socket.on('disconnect', () => {
leaveRoom(clientInfo.roomId, clientId);
});
};
}
2 changes: 1 addition & 1 deletion mediaServer/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ const lectureListener = new LectureListener();

relayServer.listen('/create-room', 'connection', clientListener.createRoom);
relayServer.listen('/enter-room', 'connection', clientListener.enterRoom);
relayServer.listen('/lecture', 'connection', lectureListener.lecture);
relayServer.listen('/lecture', 'connection', lectureListener.enterLecture);

export { relayServer };
23 changes: 11 additions & 12 deletions mediaServer/src/models/ClientConnectionInfo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,18 @@ export class ClientConnectionInfo {
return this._RTCPC;
}

get enterSocket(): Socket | null {
return this._enterSocket;
}

set enterSocket(socket: Socket) {
this._enterSocket = socket;
}

get lectureSocket(): Socket | null {
return this._lectureSocket;
}

set lectureSocket(socket: Socket) {
this._lectureSocket = socket;
}

disconnectWebRTCConnection = () => {
this._RTCPC.close();
};

disconnectSocket = (clientId: string, roomId: string) => {
this._enterSocket?.leave(clientId);
this._enterSocket?.disconnect();
this._lectureSocket?.leave(roomId);
this._lectureSocket?.disconnect();
};
}
15 changes: 10 additions & 5 deletions mediaServer/src/models/PeerStreamInfo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ interface MediaFileNameList {
}

export class PeerStreamInfo {
private readonly _audioSink: RTCAudioSink;
private readonly _mediaFileNameList: MediaFileNameList;
private readonly _audio: PassThrough;
private _audioSink: RTCAudioSink;
private _recordEnd: boolean;
private _proc: FfmpegCommand | null;

Expand Down Expand Up @@ -38,10 +38,6 @@ export class PeerStreamInfo {
this._recordEnd = isRecordEnd;
}

get proc(): FfmpegCommand | null {
return this._proc;
}

set proc(FfmpegCommand: FfmpegCommand) {
this._proc = FfmpegCommand;
}
Expand All @@ -53,7 +49,16 @@ export class PeerStreamInfo {
};
};

replaceAudioSink = (audioSink: RTCAudioSink) => {
this._audioSink = audioSink;
};

pauseRecording = () => {
this._audioSink.stop();
};

stopRecording = () => {
this._recordEnd = true;
this._audioSink.stop();
this._audio.end();
};
Expand Down
35 changes: 19 additions & 16 deletions mediaServer/src/models/RoomConnectionInfo.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
import { RTCPeerConnection } from 'wrtc';
import { Socket } from 'socket.io';
import { ClientConnectionInfo } from './ClientConnectionInfo';
import { relayServer } from '../main';

export class RoomConnectionInfo {
private _presenterSocket: Socket | null;
private readonly _presenterRTCPC: RTCPeerConnection;
private readonly _studentInfoList: Set<ClientConnectionInfo>;
private readonly _participantIdList: Set<string>;
private _stream: MediaStream | null;

constructor(RTCPC: RTCPeerConnection) {
this._presenterSocket = null;
this._presenterRTCPC = RTCPC;
this._studentInfoList = new Set();
this._participantIdList = new Set();
this._stream = null;
}

set presenterSocket(socket: Socket) {
this._presenterSocket = socket;
}

get studentInfoList(): Set<ClientConnectionInfo> {
return this._studentInfoList;
get participantIdList(): Set<string> {
return this._participantIdList;
}

set stream(presenterStream: MediaStream) {
Expand All @@ -31,19 +31,22 @@ export class RoomConnectionInfo {
return this._stream;
}

endLecture = (roomId: string) => {
this._presenterRTCPC.close();
this._studentInfoList.forEach((studentInfo: ClientConnectionInfo) => {
studentInfo.enterSocket?.leave(roomId);
studentInfo.enterSocket?.disconnect();
studentInfo.lectureSocket?.leave(roomId);
studentInfo.lectureSocket?.disconnect();
studentInfo.RTCPC?.close();
closeParticipantConnection = (roomId: string) => {
this._participantIdList.forEach((participantId: string) => {
const participantConnectionInfo = relayServer.clientConnectionInfoList.get(participantId);
if (participantConnectionInfo) {
participantConnectionInfo.disconnectSocket(participantId, roomId);
participantConnectionInfo.disconnectWebRTCConnection();
relayServer.clientConnectionInfoList.delete(participantId);
}
});
};

exitRoom = (clientInfo: ClientConnectionInfo, roomId: string) => {
clientInfo.lectureSocket?.leave(roomId);
this._studentInfoList.delete(clientInfo);
exitRoom = (participantId: string, roomId: string) => {
const participantConnectionInfo = relayServer.clientConnectionInfoList.get(participantId);
if (participantConnectionInfo) {
participantConnectionInfo.disconnectSocket(participantId, roomId);
this._participantIdList.delete(participantId);
}
};
}
20 changes: 18 additions & 2 deletions mediaServer/src/services/lecture.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ import { ClientType } from '../constants/client-type.constant';
import { Socket } from 'socket.io';
import { ClientConnectionInfo } from '../models/ClientConnectionInfo';
import { RoomConnectionInfo } from '../models/RoomConnectionInfo';
import { relayServer } from '../main';
import { RECONNECT_TIMEOUT } from '../config/lecture.config';
import { endLecture } from './presenter.service';

const startLecture = (
email: string,
Expand All @@ -17,12 +20,25 @@ const startLecture = (
socket.join(email);
}
if (clientInfo.type === ClientType.STUDENT) {
roomConnectionInfo.studentInfoList.add(clientConnectionInfo);
roomConnectionInfo.participantIdList.add(email);
fetch((process.env.SERVER_API_URL + '/lecture/' + clientInfo.roomId) as string, {
method: 'PATCH',
headers: { Authorization: socket.handshake.auth.accessToken }
}).then((response) => console.log('강의 시작:' + response.status));
}
};

export { startLecture };
const scheduleEndLecture = (roomId: string, presenterId: string) => {
const timerId = setTimeout(
async () => {
await endLecture(roomId, presenterId);
relayServer.scheduledEndLectureList.delete(roomId);
},
RECONNECT_TIMEOUT,
roomId,
presenterId
);
relayServer.scheduledEndLectureList.set(roomId, timerId);
};

export { startLecture, scheduleEndLecture };
Loading

0 comments on commit ccd291b

Please sign in to comment.