Skip to content

Commit

Permalink
imp: ws에서 blocking 요소 완전 제거
Browse files Browse the repository at this point in the history
  • Loading branch information
DongGeon0908 committed Aug 21, 2024
1 parent cf79c53 commit 7f7f175
Showing 1 changed file with 34 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import kotlinx.coroutines.reactor.awaitSingleOrNull
import kotlinx.coroutines.reactor.mono
import org.springframework.stereotype.Component
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketMessage
Expand All @@ -35,34 +37,35 @@ class ReactiveGroupUserWebSocketHandler(
private val groupUserByMap: ConcurrentMap<Long, ConcurrentMap<Long, WebSocketSession>> = ConcurrentHashMap()

override fun handle(session: WebSocketSession): Mono<Void> {
val uriContext = GroupUserUriContext.from(session)
return mono {
val uriContext = GroupUserUriContext.from(session)

val user = authFacade.resolveAuthUser(uriContext.token)
val user = authFacade.resolveAuthUser(uriContext.token)

val groupUsers = groupUserService.findAllByUidSync(user.uid)
val groupUsers = groupUserService.findAllByUid(user.uid)

groupUsers.forEach { groupUser ->
val targetGroupUser = groupUserByMap[groupUser.groupId] ?: ConcurrentHashMap()
groupUsers.forEach { groupUser ->
val targetGroupUser = groupUserByMap[groupUser.groupId] ?: ConcurrentHashMap()

targetGroupUser[groupUser.uid] = session
targetGroupUser[groupUser.uid] = session

groupUserByMap[groupUser.groupId] = targetGroupUser
}
groupUserByMap[groupUser.groupId] = targetGroupUser
}

groupUserByMap.forEach { (groupId, sessionByUid) ->
sessionByUid[user.uid] ?: return@forEach
groupUserByMap.forEach { (groupId, sessionByUid) ->
sessionByUid[user.uid] ?: return@forEach

CoroutineScope(Dispatchers.IO + Job()).launch {
sendUpdatedGroupStatus(groupId, sessionByUid)
launchSendEvent(groupId, sessionByUid)
}
}

return session.receive()
.map(WebSocketMessage::getPayloadAsText)
.flatMap { payload -> checkPingPong(payload, session) }
.log()
.doFinally { handleSessionTermination(user.uid) }
.then()
session.receive()
.map(WebSocketMessage::getPayloadAsText)
.flatMap { payload -> checkPingPong(payload, session) }
.log()
.doFinally { handleSessionTermination(user.uid) }
.then()
.awaitSingleOrNull()
}
}

private fun checkPingPong(
Expand Down Expand Up @@ -95,15 +98,24 @@ class ReactiveGroupUserWebSocketHandler(
}

false -> {
CoroutineScope(Dispatchers.IO + Job()).launch {
sendUpdatedGroupStatus(groupId, uidBySession)
}
launchSendEvent(groupId, uidBySession)
}
}
}
}
}


/** 발송되는 순서가 중요하지 않다. */
private fun launchSendEvent(
groupId: Long,
sessionByUid: ConcurrentMap<Long, WebSocketSession>
) {
CoroutineScope(Dispatchers.IO + Job()).launch {
sendUpdatedGroupStatus(groupId, sessionByUid)
}
}

private suspend fun sendUpdatedGroupStatus(
groupId: Long,
sessionByUid: MutableMap<Long, WebSocketSession>
Expand Down

0 comments on commit 7f7f175

Please sign in to comment.