Skip to content

Commit

Permalink
feat: socket 종료시, 상태값 재전송
Browse files Browse the repository at this point in the history
  • Loading branch information
DongGeon0908 committed Jul 24, 2024
1 parent bec29f3 commit 6d5928b
Showing 1 changed file with 66 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import reactor.core.publisher.Mono
import java.time.Duration
import java.time.LocalDateTime

// rsocket의 경우, client의 requester 처리에 있어 부가적인 설정이 필요. 그러므로 reactive websocket으로 진행
// poc 용도의 handler, 리펙토링 예정
@Component
class ReactiveConcurrentUserWebSocketHandler(
Expand Down Expand Up @@ -104,7 +105,11 @@ class ReactiveConcurrentUserWebSocketHandler(
Flux.interval(Duration.ofMillis(1000L))
.zipWith(eventFlux) { _, event -> event }
.map(session::textMessage)
).and(session.receive().map(WebSocketMessage::getPayloadAsText).log()).then()
).and(session.receive().map(WebSocketMessage::getPayloadAsText).log())
.doFinally {
handleSessionTermination(session, user.uid)
}
.then()
}

private fun isTokenHeader(headerKey: String): Boolean {
Expand All @@ -116,6 +121,66 @@ class ReactiveConcurrentUserWebSocketHandler(
val regex = Regex("/ws/v1/groups/(\\w+)/concurrent-users")
return regex.find(uri)?.groupValues?.get(1)
}

private fun removeSessionFromMap(session: WebSocketSession) {
concurrentUserMap.forEach { (groupId, uidBySession) ->
val userToRemove = uidBySession.filterValues { it == session }.keys.firstOrNull()
if (userToRemove != null) {
uidBySession.remove(userToRemove)
logger.info { "Removed session for user $userToRemove from group $groupId" }
if (uidBySession.isEmpty()) {
concurrentUserMap.remove(groupId)
logger.info { "Removed group $groupId as it has no more users." }
}
}
}
}

private fun handleSessionTermination(session: WebSocketSession, uid: Long) {
concurrentUserMap.forEach { (groupId, uidBySession) ->
val removedUser = uidBySession.remove(uid)

if (removedUser != null) {
logger.info { "Removed session for user $uid from group $groupId" }

if (uidBySession.isEmpty()) {
concurrentUserMap.remove(groupId)
logger.info { "Removed group $groupId as it has no more users." }
} else {
// Send the updated group status to remaining users
sendUpdatedGroupStatus(groupId, uidBySession)
}
}
}
}

private fun sendUpdatedGroupStatus(groupId: Long, uidBySession: MutableMap<Long, WebSocketSession>) {
val uids = uidBySession.keys
val userInfoByUid = userInfoService.findAllByIds(uids.toList()).associateBy { it.id }

val groupUsers = groupUserService.findAllByGroupIdAndUids(groupId, userInfoByUid.keys)
.associateBy { it.uid }

val message = ConcurrentMessage(
groupId = groupId,
groupUsers = userInfoByUid.mapNotNull { (uid, info) ->
val groupUser = groupUsers[uid] ?: return@mapNotNull null

ConcurrentMessage.ConcurrentUser(
groupUserId = groupUser.id,
uid = uid,
nickname = info.nickname
)
}
)

uidBySession.forEach { (_, websocketSession) ->
websocketSession
.send(Mono.just(websocketSession.textMessage(mapper.writeValueAsString(message))))
.subscribe()
}
}

}

data class ConcurrentMessage(
Expand Down

0 comments on commit 6d5928b

Please sign in to comment.