diff --git a/src/main/kotlin/com/hero/alignlab/ws/handler/ReactiveGroupUserWebSocketHandler.kt b/src/main/kotlin/com/hero/alignlab/ws/handler/ReactiveGroupUserWebSocketHandler.kt index d165221..446a4eb 100644 --- a/src/main/kotlin/com/hero/alignlab/ws/handler/ReactiveGroupUserWebSocketHandler.kt +++ b/src/main/kotlin/com/hero/alignlab/ws/handler/ReactiveGroupUserWebSocketHandler.kt @@ -9,6 +9,8 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.launch +import kotlinx.coroutines.reactor.awaitSingleOrNull +import kotlinx.coroutines.reactor.mono import org.springframework.stereotype.Component import org.springframework.web.reactive.socket.WebSocketHandler import org.springframework.web.reactive.socket.WebSocketMessage @@ -35,34 +37,35 @@ class ReactiveGroupUserWebSocketHandler( private val groupUserByMap: ConcurrentMap> = ConcurrentHashMap() override fun handle(session: WebSocketSession): Mono { - val uriContext = GroupUserUriContext.from(session) + return mono { + val uriContext = GroupUserUriContext.from(session) - val user = authFacade.resolveAuthUser(uriContext.token) + val user = authFacade.resolveAuthUser(uriContext.token) - val groupUsers = groupUserService.findAllByUidSync(user.uid) + val groupUsers = groupUserService.findAllByUid(user.uid) - groupUsers.forEach { groupUser -> - val targetGroupUser = groupUserByMap[groupUser.groupId] ?: ConcurrentHashMap() + groupUsers.forEach { groupUser -> + val targetGroupUser = groupUserByMap[groupUser.groupId] ?: ConcurrentHashMap() - targetGroupUser[groupUser.uid] = session + targetGroupUser[groupUser.uid] = session - groupUserByMap[groupUser.groupId] = targetGroupUser - } + groupUserByMap[groupUser.groupId] = targetGroupUser + } - groupUserByMap.forEach { (groupId, sessionByUid) -> - sessionByUid[user.uid] ?: return@forEach + groupUserByMap.forEach { (groupId, sessionByUid) -> + sessionByUid[user.uid] ?: return@forEach - CoroutineScope(Dispatchers.IO + Job()).launch { - sendUpdatedGroupStatus(groupId, sessionByUid) + launchSendEvent(groupId, sessionByUid) } - } - return session.receive() - .map(WebSocketMessage::getPayloadAsText) - .flatMap { payload -> checkPingPong(payload, session) } - .log() - .doFinally { handleSessionTermination(user.uid) } - .then() + session.receive() + .map(WebSocketMessage::getPayloadAsText) + .flatMap { payload -> checkPingPong(payload, session) } + .log() + .doFinally { handleSessionTermination(user.uid) } + .then() + .awaitSingleOrNull() + } } private fun checkPingPong( @@ -95,15 +98,24 @@ class ReactiveGroupUserWebSocketHandler( } false -> { - CoroutineScope(Dispatchers.IO + Job()).launch { - sendUpdatedGroupStatus(groupId, uidBySession) - } + launchSendEvent(groupId, uidBySession) } } } } } + + /** 발송되는 순서가 중요하지 않다. */ + private fun launchSendEvent( + groupId: Long, + sessionByUid: ConcurrentMap + ) { + CoroutineScope(Dispatchers.IO + Job()).launch { + sendUpdatedGroupStatus(groupId, sessionByUid) + } + } + private suspend fun sendUpdatedGroupStatus( groupId: Long, sessionByUid: MutableMap