Skip to content

Commit

Permalink
fix: ws 발송 로직 리펙토링
Browse files Browse the repository at this point in the history
  • Loading branch information
DongGeon0908 committed Jan 8, 2025
1 parent 893be7f commit 887c671
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ class CheerUpFacade(
logger.error(e) { "fail to create cheerUp" }
}.onSuccess { createdCheerUp ->
reactiveGroupUserWebSocketHandler.launchSendEventByCheerUp(
uid = createdCheerUp.targetUid,
actorUid = user.uid,
targetUid = createdCheerUp.targetUid,
groupId = otherGroupUser.groupId,
senderUid = user.uid,
)
}.getOrNull()?.targetUid
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ class GroupFacade(
private fun sendEventWithDelay(groupUserScore: GroupUserScore) {
CoroutineScope(Dispatchers.IO + Job()).launch {
delay(3000)
wsHandler.launchSendEvent(groupUserScore.uid, groupUserScore.groupId)
wsHandler.launchSendStatusUpdateEvent(groupUserScore.uid)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession
import reactor.core.publisher.Mono
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentMap

@Component
class ReactiveGroupUserWebSocketHandler(
Expand All @@ -29,35 +28,48 @@ class ReactiveGroupUserWebSocketHandler(

/**
* redis는 현 상태에서 사용하지 않는다. 현재 스펙상 오버엔지니어링
* - session 정보들을 local에 캐싱해두어 사용.
* - key : groupdId
* - value
* - key : uid
* - value : WebSocketSession
*/
private val groupUserByMap: ConcurrentMap<Long, ConcurrentMap<Long, WebSocketSession>> = ConcurrentHashMap()
private val groupUserByGroupId: MutableMap<Long, MutableMap<Long, WebSocketSession>> = mutableMapOf()

override fun handle(session: WebSocketSession): Mono<Void> {
return mono {
/** uri 정보를 파싱 */
val uriContext = GroupUserUriContext.from(session)

/** uri에 들어있는 Token 정보로, ws로 요청이 들어온 user를 조회 */
val user = authFacade.resolveAuthUser(uriContext.token)

/** user가 속해 있는 그룹 정보를 전체 조회 */
val groupUsers = groupUserService.findAllByUid(user.uid)

groupUsers.forEach { groupUser ->
val targetGroupUser = groupUserByMap[groupUser.groupId] ?: ConcurrentHashMap()
/** 현재 접속중인 그룹 유저들의 정보 */
val groupUsersByUid = groupUserByGroupId[groupUser.groupId] ?: ConcurrentHashMap()

targetGroupUser[groupUser.uid] = session
/** ws를 요청한 사용자의 Session 정볼흘 업데이트 */
groupUsersByUid[groupUser.uid] = session

groupUserByMap[groupUser.groupId] = targetGroupUser
/** local 정보 최신화 */
groupUserByGroupId[groupUser.groupId] = groupUsersByUid
}

groupUserByMap.forEach { (groupId, sessionByUid) ->
groupUserByGroupId.forEach { (groupId, sessionByUid) ->
/** ws 요청 사용자의 세션 */
sessionByUid[user.uid] ?: return@forEach

launchSendEvent(user.uid, groupId, sessionByUid)
/** 홰당 사용자의 session에 소켓 메세지 발송 및 같은 그룹원에게 소켓 메세지 발송 */
launchSendConnectEvent(
groupId = groupId,
sessionByUid = sessionByUid
)
}

/** ws에 대한 ping-pong 및 종료 처리 로직 */
session.receive()
.map { message -> message.payloadAsText }
.flatMap { payload -> checkPingPong(payload, session) }
Expand Down Expand Up @@ -85,97 +97,109 @@ class ReactiveGroupUserWebSocketHandler(
}

private fun handleSessionTermination(uid: Long) {
groupUserByMap.forEach { (groupId, uidBySession) ->
groupUserByGroupId.forEach { (groupId, uidBySession) ->
val removedUser = uidBySession.remove(uid)

if (removedUser != null) {
logger.info { "Removed session for user $uid from group $groupId" }

when (uidBySession.isEmpty()) {
true -> {
groupUserByMap.remove(groupId)
groupUserByGroupId.remove(groupId)
logger.info { "Removed group $groupId as it has no more users." }
}

false -> {
launchSendEvent(uid, groupId, uidBySession)
launchSendConnectEvent(groupId, uidBySession)
}
}
}
}
}

fun launchSendEvent(uid: Long, groupId: Long) {
groupUserByMap[groupId]?.let { groupUsers ->
launchSendEvent(uid, groupId, groupUsers)
fun forceCloseAllWebSocketSessions() {
groupUserByGroupId.forEach { (_, session) ->
session.forEach { (_, session) ->
session.close().subscribe()
}
}
}

fun launchSendEventByCheerUp(uid: Long, groupId: Long, senderUid: Long) {
groupUserByMap[groupId]?.let { groupUsers ->
CoroutineScope(Dispatchers.IO + Job()).launch {
val eventMessage = groupUsers.keys.toList().let { uids ->
groupUserWsFacade.generateEventMessage(
uid = uid,
groupId = groupId,
uids = uids,
cheerUpSenderUid = senderUid
)
}
/** Websocket Session Release */
groupUserByGroupId.clear()
}

groupUsers[uid]?.let { session ->
session
.send(Mono.just(session.textMessage(eventMessage.message())))
.subscribe()
}
}
fun getWsGroupUsers(): List<WsGroupUserModel> {
return groupUserByGroupId.map { (groupId, groupUsers) ->
WsGroupUserModel(groupId, groupUsers.keys)
}
}

/** 발송되는 순서가 중요하지 않다. */
private fun launchSendEvent(
uid: Long,
private fun launchSendConnectEvent(
groupId: Long,
sessionByUid: ConcurrentMap<Long, WebSocketSession>
sessionByUid: MutableMap<Long, WebSocketSession>,
) {
CoroutineScope(Dispatchers.IO + Job()).launch {
sendUpdatedGroupStatus(uid, groupId, sessionByUid)
sendConnectEvent(
groupId = groupId,
sessionByUid = sessionByUid
)
}
}

private suspend fun sendUpdatedGroupStatus(
uid: Long,
private suspend fun sendConnectEvent(
groupId: Long,
sessionByUid: MutableMap<Long, WebSocketSession>
) {
val eventMessage = sessionByUid.keys
.toList()
.let { uids -> groupUserWsFacade.generateEventMessage(uid, groupId, uids) }
sessionByUid.forEach { (uid, session) ->
val eventMessage = groupUserWsFacade.generateEventMessage(
uid = uid,
groupId = groupId,
spreadUids = sessionByUid.keys.toList()
)

sessionByUid.forEach { (_, session) ->
session
.send(Mono.just(session.textMessage(eventMessage.message())))
.subscribe()
}
}

fun forceCloseAllWebSocketSessions() {
groupUserByMap.forEach { (_, session) ->
session.forEach { (_, session) ->
fun launchSendStatusUpdateEvent(groupId: Long) {
val sessions = groupUserByGroupId[groupId] ?: return

launchSendConnectEvent(groupId, sessions)
}

/** 응원을 보낸 사람과 받은 사람에게 WS 알림 진행 */
fun launchSendEventByCheerUp(actorUid: Long, targetUid: Long, groupId: Long) {
CoroutineScope(Dispatchers.IO + Job()).launch {
/** 응원하기를 누른 action 대상자 */
groupUserByGroupId[groupId]?.get(actorUid)?.let { session ->
val eventMessage = groupUserWsFacade.generateEventMessage(
uid = actorUid,
groupId = groupId,
spreadUids = listOf(actorUid, targetUid),
cheerUpSenderUid = actorUid,
)

session
.close()
.send(Mono.just(session.textMessage(eventMessage.message())))
.subscribe()
}
}

/** Websocket Session Release */
groupUserByMap.clear()
}
/** 응원하기를 받은 대상자 */
groupUserByGroupId[groupId]?.get(targetUid)?.let { session ->
val eventMessage = groupUserWsFacade.generateEventMessage(
uid = targetUid,
groupId = groupId,
spreadUids = listOf(actorUid, targetUid),
cheerUpSenderUid = actorUid,
)

fun getWsGroupUsers(): List<WsGroupUserModel> {
return groupUserByMap.map {
WsGroupUserModel(it.key, it.value.keys)
session
.send(Mono.just(session.textMessage(eventMessage.message())))
.subscribe()
}
}
}
}

Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.hero.alignlab.ws.model

import com.hero.alignlab.common.extension.mapper
import com.hero.alignlab.domain.cheer.domain.CheerUp
import com.hero.alignlab.domain.group.domain.GroupUser
import com.hero.alignlab.domain.group.domain.GroupUserScore
import com.hero.alignlab.domain.user.domain.UserInfo
Expand Down Expand Up @@ -29,9 +28,9 @@ data class GroupUserEventMessage(
/** 나에게 응원하기를 보낸 사용자의 uid */
val senderUid: Long?,
/** 금일 받은 응원하기 수 */
val countCheeredUp: Long?,
val countCheeredUp: Long,
/** 내가 응원을 보낸 사용자 목록 */
val sentUids: List<Long>?,
val sentUids: List<Long>,
)

companion object {
Expand All @@ -43,7 +42,7 @@ data class GroupUserEventMessage(
scoreByUid: Map<Long, GroupUserScore>,
cheerUpSenderUid: Long?,
countCheeredUp: Long,
cheerUpsByTargetUid: Map<Long, List<CheerUp>>,
cheerUpsByTargetUid: List<Long>,
): GroupUserEventMessage {
val rank = AtomicInteger(1)

Expand All @@ -70,7 +69,7 @@ data class GroupUserEventMessage(
cheerUp = CheerUpModel(
senderUid = cheerUpSenderUid,
countCheeredUp = countCheeredUp,
sentUids = cheerUpsByTargetUid[uid]?.map { cheerUp -> cheerUp.targetUid }
sentUids = cheerUpsByTargetUid,
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ class GroupUserWsFacade(
suspend fun generateEventMessage(
uid: Long,
groupId: Long,
uids: List<Long>,
spreadUids: List<Long>,
cheerUpSenderUid: Long? = null,
): GroupUserEventMessage {
val now = LocalDate.now()

return parZip(
{ userInfoService.findAllByIds(uids) },
{ groupUserService.findAllByGroupIdAndUids(groupId, uids) },
{ groupUserScoreService.findAllByGroupIdAndUids(groupId, uids) },
{ userInfoService.findAllByIds(spreadUids) },
{ groupUserService.findAllByGroupIdAndUids(groupId, spreadUids) },
{ groupUserScoreService.findAllByGroupIdAndUids(groupId, spreadUids) },
{ cheerUpService.countAllByCheeredAtAndUid(now, uid) },
{ cheerUpService.findAllByTargetUidInAndCheeredAt(uids.toSet(), now) }
{ cheerUpService.findAllByUidAndCheeredAt(uid, now) },
) { userInfoByUid, groupUsers, groupUserScores, countCheeredUp, cheerUps ->
GroupUserEventMessage.of(
uid = uid,
Expand All @@ -39,7 +39,7 @@ class GroupUserWsFacade(
scoreByUid = groupUserScores.associateBy { score -> score.uid },
cheerUpSenderUid = cheerUpSenderUid,
countCheeredUp = countCheeredUp,
cheerUpsByTargetUid = cheerUps.groupBy { cheerUp -> cheerUp.targetUid },
cheerUpsByTargetUid = cheerUps.map { cheerUp -> cheerUp.targetUid }
)
}
}
Expand Down

0 comments on commit 887c671

Please sign in to comment.