From 039cb0c4eae32c991e806cf85256699c7269fb61 Mon Sep 17 00:00:00 2001 From: Oleg Yukhnevich Date: Fri, 17 Nov 2023 00:14:31 +0200 Subject: [PATCH] migrate ktor integration --- .../api/rsocket-ktor-client.api | 8 ++-- rsocket-ktor-client/build.gradle.kts | 2 + .../io/rsocket/kotlin/ktor/client/Builders.kt | 40 +++++++++---------- .../api/rsocket-ktor-server.api | 2 + .../kotlin/ktor/server/RSocketSupport.kt | 15 ++++++- .../io/rsocket/kotlin/ktor/server/Routing.kt | 32 +++------------ 6 files changed, 47 insertions(+), 52 deletions(-) diff --git a/rsocket-ktor-client/api/rsocket-ktor-client.api b/rsocket-ktor-client/api/rsocket-ktor-client.api index aa6779b3..545be491 100644 --- a/rsocket-ktor-client/api/rsocket-ktor-client.api +++ b/rsocket-ktor-client/api/rsocket-ktor-client.api @@ -1,9 +1,9 @@ public final class io/rsocket/kotlin/ktor/client/BuildersKt { - public static final fun rSocket (Lio/ktor/client/HttpClient;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static final fun rSocket (Lio/ktor/client/HttpClient;Ljava/lang/String;ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun rSocket (Lio/ktor/client/HttpClient;Lio/ktor/http/HttpMethod;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun rSocket (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun rSocket (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static synthetic fun rSocket$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; - public static synthetic fun rSocket$default (Lio/ktor/client/HttpClient;Ljava/lang/String;ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static synthetic fun rSocket$default (Lio/ktor/client/HttpClient;Lio/ktor/http/HttpMethod;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static synthetic fun rSocket$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; } public final class io/rsocket/kotlin/ktor/client/RSocketSupport { diff --git a/rsocket-ktor-client/build.gradle.kts b/rsocket-ktor-client/build.gradle.kts index 85374b91..28c0de5f 100644 --- a/rsocket-ktor-client/build.gradle.kts +++ b/rsocket-ktor-client/build.gradle.kts @@ -28,6 +28,8 @@ kotlin { sourceSets { commonMain { dependencies { + implementation(projects.rsocketInternalIo) + api(projects.rsocketCore) api(projects.rsocketTransportKtorWebsocketShared) api(libs.ktor.client.websockets) diff --git a/rsocket-ktor-client/src/commonMain/kotlin/io/rsocket/kotlin/ktor/client/Builders.kt b/rsocket-ktor-client/src/commonMain/kotlin/io/rsocket/kotlin/ktor/client/Builders.kt index 64741917..4beb8d51 100644 --- a/rsocket-ktor-client/src/commonMain/kotlin/io/rsocket/kotlin/ktor/client/Builders.kt +++ b/rsocket-ktor-client/src/commonMain/kotlin/io/rsocket/kotlin/ktor/client/Builders.kt @@ -22,50 +22,48 @@ import io.ktor.client.plugins.websocket.* import io.ktor.client.request.* import io.ktor.http.* import io.rsocket.kotlin.* +import io.rsocket.kotlin.internal.io.* import io.rsocket.kotlin.transport.* import io.rsocket.kotlin.transport.ktor.websocket.* +import kotlinx.coroutines.* import kotlin.coroutines.* public suspend fun HttpClient.rSocket( request: HttpRequestBuilder.() -> Unit, -): RSocket = plugin(RSocketSupport).run { - connector.connect(KtorClientTransport(this@rSocket, request)) -} +): RSocket = plugin(RSocketSupport).connector.connect(KtorWebSocketClientTransport(this, request)) public suspend fun HttpClient.rSocket( urlString: String, - secure: Boolean = false, request: HttpRequestBuilder.() -> Unit = {}, -): RSocket = rSocket { - url { - this.protocol = if (secure) URLProtocol.WSS else URLProtocol.WS - this.port = protocol.defaultPort - takeFrom(urlString) - } +): RSocket = rSocket(method = HttpMethod.Get, host = null, port = null, path = null) { + url.protocol = URLProtocol.WS + url.port = url.protocol.defaultPort + url.takeFrom(urlString) request() } public suspend fun HttpClient.rSocket( + method: HttpMethod = HttpMethod.Get, host: String? = null, port: Int? = null, path: String? = null, - secure: Boolean = false, request: HttpRequestBuilder.() -> Unit = {}, ): RSocket = rSocket { - url { - this.protocol = if (secure) URLProtocol.WSS else URLProtocol.WS - this.port = protocol.defaultPort - set(host = host, port = port, path = path) - } + this.method = method + url("ws", host, port, path) request() } -private class KtorClientTransport( +private class KtorWebSocketClientTransport( private val client: HttpClient, private val request: HttpRequestBuilder.() -> Unit, -) : ClientTransport { - override val coroutineContext: CoroutineContext get() = client.coroutineContext +) : RSocketClientTransport { + override val coroutineContext: CoroutineContext = client.coroutineContext.supervisorContext() + + @RSocketTransportApi + override suspend fun createSession(): RSocketTransportSession { + ensureActive() - @TransportApi - override suspend fun connect(): Connection = WebSocketConnection(client.webSocketSession(request)) + return KtorWebSocketSession(client.webSocketSession(request)) + } } diff --git a/rsocket-ktor-server/api/rsocket-ktor-server.api b/rsocket-ktor-server/api/rsocket-ktor-server.api index 118f542e..07117040 100644 --- a/rsocket-ktor-server/api/rsocket-ktor-server.api +++ b/rsocket-ktor-server/api/rsocket-ktor-server.api @@ -18,7 +18,9 @@ public final class io/rsocket/kotlin/ktor/server/RSocketSupport$Feature : io/kto } public final class io/rsocket/kotlin/ktor/server/RoutingKt { + public static final fun rSocket (Lio/ktor/server/routing/Route;Ljava/lang/String;Lio/rsocket/kotlin/ConnectionAcceptor;)V public static final fun rSocket (Lio/ktor/server/routing/Route;Ljava/lang/String;Ljava/lang/String;Lio/rsocket/kotlin/ConnectionAcceptor;)V + public static synthetic fun rSocket$default (Lio/ktor/server/routing/Route;Ljava/lang/String;Lio/rsocket/kotlin/ConnectionAcceptor;ILjava/lang/Object;)V public static synthetic fun rSocket$default (Lio/ktor/server/routing/Route;Ljava/lang/String;Ljava/lang/String;Lio/rsocket/kotlin/ConnectionAcceptor;ILjava/lang/Object;)V } diff --git a/rsocket-ktor-server/src/commonMain/kotlin/io/rsocket/kotlin/ktor/server/RSocketSupport.kt b/rsocket-ktor-server/src/commonMain/kotlin/io/rsocket/kotlin/ktor/server/RSocketSupport.kt index 06c79490..d2050aee 100644 --- a/rsocket-ktor-server/src/commonMain/kotlin/io/rsocket/kotlin/ktor/server/RSocketSupport.kt +++ b/rsocket-ktor-server/src/commonMain/kotlin/io/rsocket/kotlin/ktor/server/RSocketSupport.kt @@ -21,10 +21,14 @@ import io.ktor.server.websocket.* import io.ktor.util.* import io.ktor.utils.io.core.internal.* import io.ktor.utils.io.pool.* +import io.rsocket.kotlin.* import io.rsocket.kotlin.core.* +import io.rsocket.kotlin.transport.* +import io.rsocket.kotlin.transport.ktor.websocket.* +import kotlinx.coroutines.* public class RSocketSupport private constructor( - internal val server: RSocketServer, + private val server: RSocketServer, ) { public class Config internal constructor() { @Suppress("DEPRECATION") @@ -48,4 +52,13 @@ public class RSocketSupport private constructor( } } } + + @RSocketTransportApi + internal fun handler(acceptor: ConnectionAcceptor): suspend DefaultWebSocketServerSession.() -> Unit { + val serverAcceptor = server.createAcceptor(acceptor) + return { + serverAcceptor.acceptSession(KtorWebSocketSession(this)) + coroutineContext.job.join() + } + } } diff --git a/rsocket-ktor-server/src/commonMain/kotlin/io/rsocket/kotlin/ktor/server/Routing.kt b/rsocket-ktor-server/src/commonMain/kotlin/io/rsocket/kotlin/ktor/server/Routing.kt index 1de2a2b6..e1c1ded4 100644 --- a/rsocket-ktor-server/src/commonMain/kotlin/io/rsocket/kotlin/ktor/server/Routing.kt +++ b/rsocket-ktor-server/src/commonMain/kotlin/io/rsocket/kotlin/ktor/server/Routing.kt @@ -21,31 +21,11 @@ import io.ktor.server.routing.* import io.ktor.server.websocket.* import io.rsocket.kotlin.* import io.rsocket.kotlin.transport.* -import io.rsocket.kotlin.transport.ktor.websocket.* -import kotlinx.coroutines.* -public fun Route.rSocket( - path: String? = null, - protocol: String? = null, - acceptor: ConnectionAcceptor, -): Unit = application.plugin(RSocketSupport).run { - server.bindIn(application, KtorServerTransport(this@rSocket, path, protocol), acceptor) -} +@OptIn(RSocketTransportApi::class) +public fun Route.rSocket(protocol: String? = null, acceptor: ConnectionAcceptor): Unit = + webSocket(protocol, application.plugin(RSocketSupport).handler(acceptor)) -private class KtorServerTransport( - private val route: Route, - private val path: String?, - private val protocol: String?, -) : ServerTransport { - @TransportApi - override fun CoroutineScope.start(accept: suspend CoroutineScope.(Connection) -> Unit) { - val handler: suspend DefaultWebSocketServerSession.() -> Unit = { - val connection = WebSocketConnection(this) - accept(connection) - } - when (path) { - null -> route.webSocket(protocol, handler) - else -> route.webSocket(path, protocol, handler) - } - } -} +@OptIn(RSocketTransportApi::class) +public fun Route.rSocket(path: String, protocol: String? = null, acceptor: ConnectionAcceptor): Unit = + webSocket(path, protocol, application.plugin(RSocketSupport).handler(acceptor))