Skip to content

Commit

Permalink
♻️ refactor: 创建了 RsocketManager, MessageType, RsocketController, Mess…
Browse files Browse the repository at this point in the history
…ageOut, MessageIn, ConnectedClient 类以及相关的 yml 配置,同时移除了不必要的缓存设置
  • Loading branch information
vnobo committed Jul 3, 2024
1 parent 7497760 commit 5ebe3d0
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 7 deletions.
1 change: 1 addition & 0 deletions boot/platform/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ dependencies {
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation("org.springframework.boot:spring-boot-starter-validation")
implementation("org.springframework.boot:spring-boot-starter-rsocket")

implementation("org.springframework.session:spring-session-data-redis")
implementation("org.springframework.boot:spring-boot-starter-data-redis-reactive")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.platform.boot.relational.rsocket;

import lombok.Data;
import org.springframework.messaging.rsocket.RSocketRequester;

import java.io.Serializable;
import java.time.LocalDateTime;

/**
* @author <a href="https://github.com/vnobo">Alex Bob</a>
*/
@Data
public class ConnectedClient implements Serializable {

private final RSocketRequester requester;
private final LocalDateTime connectedTime;

ConnectedClient(RSocketRequester requester) {
this.requester = requester;
this.connectedTime = LocalDateTime.now();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.platform.boot.relational.rsocket;

import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

/**
* @author <a href="https://github.com/vnobo">Alex Bob</a>
*/
@Data
@NoArgsConstructor
public class MessageIn implements Serializable {

private MessageType type;
private String content;
private Object data;
private String from;
private String to;

public MessageIn(MessageType type, String content, Object data) {
this.type = type;
this.content = content;
this.data = data;
}

public static MessageIn of(MessageType type, String content, Object data) {
return new MessageIn(type, content, data);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.platform.boot.relational.rsocket;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;

/**
* @author <a href="https://github.com/vnobo">Alex Bob</a>
*/
@Data
@NoArgsConstructor
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class MessageOut extends MessageIn {

private Integer status;

public MessageOut(MessageType type, String content, Object data) {
super(type, content, data);
}

public static MessageOut of(MessageType type, String content, Object data) {
return new MessageOut(type, content, data);
}

public MessageOut status(Integer status) {
this.status = status;
return this;
}

public MessageOut content(String content) {
this.setContent(content);
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.platform.boot.relational.rsocket;

import java.io.Serializable;

/**
* @author <a href="https://github.com/vnobo">Alex Bob</a>
*/
public enum MessageType implements Serializable {
/**
* 命令
*/
COMMAND,

/**
* 未知
*/
UNKNOWN;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.platform.boot.relational.rsocket;

import lombok.RequiredArgsConstructor;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Objects;

/**
* @author <a href="https://github.com/vnobo">Alex Bob</a>
*/
@Controller
@RequiredArgsConstructor
public class RsocketController {

private final RsocketManager rsocketManager;

@ConnectMapping("connect.setup")
public Mono<Void> setup(String clientIdentifier, RSocketRequester requester) {
Objects.requireNonNull(requester.rsocket(), "rsocket connection should not be null");
this.rsocketManager.connect(clientIdentifier, requester);
return Mono.empty();
}

@MessageMapping("request.stream")
public Flux<MessageOut> stream(String clientIdentifier, RSocketRequester requester) {
return this.rsocketManager.radars(clientIdentifier, requester);
}

@MessageMapping("request.sender")
public Mono<MessageOut> sender(Mono<MessageIn> messageInMono) {
return messageInMono.doOnNext(this.rsocketManager::send)
.map(in -> MessageOut.of(in.getType(), in.getContent(), in.getData()).status(200));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.platform.boot.relational.rsocket;

import lombok.extern.log4j.Log4j2;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

/**
* @author <a href="https://github.com/vnobo">Alex Bob</a>
*/
@Log4j2
@Service
public class RsocketManager {

private final Sinks.Many<MessageOut> replaySink = Sinks.many().replay().limit(Duration.ofMinutes(5));
private final ConcurrentHashMap<String, ConnectedClient> clients = new ConcurrentHashMap<>(200);

public void connect(String clientIdentifier, RSocketRequester requester) {
log.debug("Connect [{}] RSocketRequester.", clientIdentifier);
Objects.requireNonNull(requester.rsocket(), "rsocket connection should not be null")
.onClose()
.doFirst(() -> this.clients.put(clientIdentifier, new ConnectedClient(requester)))
.doFinally(sig -> {
log.debug("Client closed, uuid is {}. signal is {}.", clientIdentifier, sig.toString());
this.clients.remove(clientIdentifier);
}).subscribe();
}

public Flux<MessageOut> radars(String clientIdentifier, RSocketRequester requester) {
log.debug("Radars [{}] RSocketRequester.", clientIdentifier);
this.clients.put(clientIdentifier, new ConnectedClient(requester));
return this.replaySink.asFlux();
}

public void send(MessageIn message) {
MessageOut messageOut = MessageOut.of(message.getType(), message.getContent(), message.getData());
try {
this.replaySink.tryEmitNext(messageOut.status(200));
} catch (Exception e) {
log.error("send message error : {}", e.getMessage());
this.replaySink.tryEmitNext(messageOut.status(500).content(e.getMessage()));
}
}

public void taskTest() {
if (this.clients.isEmpty() || !this.clients.containsKey("CommandClient")) {
return;
}
ConnectedClient connectedClient = this.clients.get("CommandClient");
connectedClient.getRequester().route("user.message")
.data(MessageOut.of(MessageType.COMMAND, "test", "test").status(200))
.send().subscribe();
}
}
2 changes: 1 addition & 1 deletion boot/platform/src/main/resources/application-local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ logging:
org.springframework.r2dbc: debug
io.r2dbc.postgresql.PARAM: debug

server.port: 8081
server.port: 8080

spring:
application.name: plate
Expand Down
13 changes: 7 additions & 6 deletions boot/platform/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ spring:
threads.virtual.enabled: true
main.keep-alive: true
application.name: plate
rsocket.server:
mapping-path: /rsocket
transport: websocket
webflux.format:
time: "HH:mm:ss"
date-time: "yyyy-MM-dd HH:mm:ss"
Expand All @@ -18,9 +21,7 @@ spring:
codec:
max-in-memory-size: 10MB
log-request-details: false
cache:
type: redis
redis:
key-prefix: "plate:caches:"
time-to-live: 300s
enable-statistics: true
cache.redis:
key-prefix: "plate:caches:"
time-to-live: 5M
enable-statistics: true

0 comments on commit 5ebe3d0

Please sign in to comment.