diff --git a/boot/platform/build.gradle b/boot/platform/build.gradle index b64c46cb..36521efe 100644 --- a/boot/platform/build.gradle +++ b/boot/platform/build.gradle @@ -57,6 +57,7 @@ dependencies { implementation("org.springframework.boot:spring-boot-starter-actuator") implementation("org.springframework.boot:spring-boot-starter-webflux") implementation("org.springframework.boot:spring-boot-starter-validation") + implementation("org.springframework.boot:spring-boot-starter-rsocket") implementation("org.springframework.session:spring-session-data-redis") implementation("org.springframework.boot:spring-boot-starter-data-redis-reactive") diff --git a/boot/platform/src/main/java/com/platform/boot/relational/rsocket/ConnectedClient.java b/boot/platform/src/main/java/com/platform/boot/relational/rsocket/ConnectedClient.java new file mode 100644 index 00000000..8ff17670 --- /dev/null +++ b/boot/platform/src/main/java/com/platform/boot/relational/rsocket/ConnectedClient.java @@ -0,0 +1,23 @@ +package com.platform.boot.relational.rsocket; + +import lombok.Data; +import org.springframework.messaging.rsocket.RSocketRequester; + +import java.io.Serializable; +import java.time.LocalDateTime; + +/** + * @author Alex Bob + */ +@Data +public class ConnectedClient implements Serializable { + + private final RSocketRequester requester; + private final LocalDateTime connectedTime; + + ConnectedClient(RSocketRequester requester) { + this.requester = requester; + this.connectedTime = LocalDateTime.now(); + } + +} diff --git a/boot/platform/src/main/java/com/platform/boot/relational/rsocket/MessageIn.java b/boot/platform/src/main/java/com/platform/boot/relational/rsocket/MessageIn.java new file mode 100644 index 00000000..732b335e --- /dev/null +++ b/boot/platform/src/main/java/com/platform/boot/relational/rsocket/MessageIn.java @@ -0,0 +1,30 @@ +package com.platform.boot.relational.rsocket; + +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * @author Alex Bob + */ +@Data +@NoArgsConstructor +public class MessageIn implements Serializable { + + private MessageType type; + private String content; + private Object data; + private String from; + private String to; + + public MessageIn(MessageType type, String content, Object data) { + this.type = type; + this.content = content; + this.data = data; + } + + public static MessageIn of(MessageType type, String content, Object data) { + return new MessageIn(type, content, data); + } +} diff --git a/boot/platform/src/main/java/com/platform/boot/relational/rsocket/MessageOut.java b/boot/platform/src/main/java/com/platform/boot/relational/rsocket/MessageOut.java new file mode 100644 index 00000000..06e06a92 --- /dev/null +++ b/boot/platform/src/main/java/com/platform/boot/relational/rsocket/MessageOut.java @@ -0,0 +1,36 @@ +package com.platform.boot.relational.rsocket; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import lombok.ToString; + +/** + * @author Alex Bob + */ +@Data +@NoArgsConstructor +@EqualsAndHashCode(callSuper = true) +@ToString(callSuper = true) +public class MessageOut extends MessageIn { + + private Integer status; + + public MessageOut(MessageType type, String content, Object data) { + super(type, content, data); + } + + public static MessageOut of(MessageType type, String content, Object data) { + return new MessageOut(type, content, data); + } + + public MessageOut status(Integer status) { + this.status = status; + return this; + } + + public MessageOut content(String content) { + this.setContent(content); + return this; + } +} diff --git a/boot/platform/src/main/java/com/platform/boot/relational/rsocket/MessageType.java b/boot/platform/src/main/java/com/platform/boot/relational/rsocket/MessageType.java new file mode 100644 index 00000000..8c44d9f7 --- /dev/null +++ b/boot/platform/src/main/java/com/platform/boot/relational/rsocket/MessageType.java @@ -0,0 +1,18 @@ +package com.platform.boot.relational.rsocket; + +import java.io.Serializable; + +/** + * @author Alex Bob + */ +public enum MessageType implements Serializable { + /** + * 命令 + */ + COMMAND, + + /** + * 未知 + */ + UNKNOWN; +} diff --git a/boot/platform/src/main/java/com/platform/boot/relational/rsocket/RsocketController.java b/boot/platform/src/main/java/com/platform/boot/relational/rsocket/RsocketController.java new file mode 100644 index 00000000..5f2fce93 --- /dev/null +++ b/boot/platform/src/main/java/com/platform/boot/relational/rsocket/RsocketController.java @@ -0,0 +1,40 @@ +package com.platform.boot.relational.rsocket; + +import lombok.RequiredArgsConstructor; +import org.springframework.messaging.handler.annotation.MessageMapping; +import org.springframework.messaging.rsocket.RSocketRequester; +import org.springframework.messaging.rsocket.annotation.ConnectMapping; +import org.springframework.stereotype.Controller; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Objects; + +/** + * @author Alex Bob + */ +@Controller +@RequiredArgsConstructor +public class RsocketController { + + private final RsocketManager rsocketManager; + + @ConnectMapping("connect.setup") + public Mono setup(String clientIdentifier, RSocketRequester requester) { + Objects.requireNonNull(requester.rsocket(), "rsocket connection should not be null"); + this.rsocketManager.connect(clientIdentifier, requester); + return Mono.empty(); + } + + @MessageMapping("request.stream") + public Flux stream(String clientIdentifier, RSocketRequester requester) { + return this.rsocketManager.radars(clientIdentifier, requester); + } + + @MessageMapping("request.sender") + public Mono sender(Mono messageInMono) { + return messageInMono.doOnNext(this.rsocketManager::send) + .map(in -> MessageOut.of(in.getType(), in.getContent(), in.getData()).status(200)); + } + +} diff --git a/boot/platform/src/main/java/com/platform/boot/relational/rsocket/RsocketManager.java b/boot/platform/src/main/java/com/platform/boot/relational/rsocket/RsocketManager.java new file mode 100644 index 00000000..1bc724ff --- /dev/null +++ b/boot/platform/src/main/java/com/platform/boot/relational/rsocket/RsocketManager.java @@ -0,0 +1,59 @@ +package com.platform.boot.relational.rsocket; + +import lombok.extern.log4j.Log4j2; +import org.springframework.messaging.rsocket.RSocketRequester; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; + +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author Alex Bob + */ +@Log4j2 +@Service +public class RsocketManager { + + private final Sinks.Many replaySink = Sinks.many().replay().limit(Duration.ofMinutes(5)); + private final ConcurrentHashMap clients = new ConcurrentHashMap<>(200); + + public void connect(String clientIdentifier, RSocketRequester requester) { + log.debug("Connect [{}] RSocketRequester.", clientIdentifier); + Objects.requireNonNull(requester.rsocket(), "rsocket connection should not be null") + .onClose() + .doFirst(() -> this.clients.put(clientIdentifier, new ConnectedClient(requester))) + .doFinally(sig -> { + log.debug("Client closed, uuid is {}. signal is {}.", clientIdentifier, sig.toString()); + this.clients.remove(clientIdentifier); + }).subscribe(); + } + + public Flux radars(String clientIdentifier, RSocketRequester requester) { + log.debug("Radars [{}] RSocketRequester.", clientIdentifier); + this.clients.put(clientIdentifier, new ConnectedClient(requester)); + return this.replaySink.asFlux(); + } + + public void send(MessageIn message) { + MessageOut messageOut = MessageOut.of(message.getType(), message.getContent(), message.getData()); + try { + this.replaySink.tryEmitNext(messageOut.status(200)); + } catch (Exception e) { + log.error("send message error : {}", e.getMessage()); + this.replaySink.tryEmitNext(messageOut.status(500).content(e.getMessage())); + } + } + + public void taskTest() { + if (this.clients.isEmpty() || !this.clients.containsKey("CommandClient")) { + return; + } + ConnectedClient connectedClient = this.clients.get("CommandClient"); + connectedClient.getRequester().route("user.message") + .data(MessageOut.of(MessageType.COMMAND, "test", "test").status(200)) + .send().subscribe(); + } +} diff --git a/boot/platform/src/main/resources/application-local.yml b/boot/platform/src/main/resources/application-local.yml index 9f110151..17b4af5a 100644 --- a/boot/platform/src/main/resources/application-local.yml +++ b/boot/platform/src/main/resources/application-local.yml @@ -6,7 +6,7 @@ logging: org.springframework.r2dbc: debug io.r2dbc.postgresql.PARAM: debug -server.port: 8081 +server.port: 8080 spring: application.name: plate diff --git a/boot/platform/src/main/resources/application.yml b/boot/platform/src/main/resources/application.yml index b0c596f6..5479edc6 100644 --- a/boot/platform/src/main/resources/application.yml +++ b/boot/platform/src/main/resources/application.yml @@ -7,6 +7,9 @@ spring: threads.virtual.enabled: true main.keep-alive: true application.name: plate + rsocket.server: + mapping-path: /rsocket + transport: websocket webflux.format: time: "HH:mm:ss" date-time: "yyyy-MM-dd HH:mm:ss" @@ -18,9 +21,7 @@ spring: codec: max-in-memory-size: 10MB log-request-details: false - cache: - type: redis - redis: - key-prefix: "plate:caches:" - time-to-live: 300s - enable-statistics: true \ No newline at end of file + cache.redis: + key-prefix: "plate:caches:" + time-to-live: 5M + enable-statistics: true \ No newline at end of file