Skip to content

Commit

Permalink
ktor websocket transport migration
Browse files Browse the repository at this point in the history
  • Loading branch information
whyoleg committed Nov 16, 2023
1 parent a22031c commit 6cf8765
Show file tree
Hide file tree
Showing 11 changed files with 577 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,44 @@
public abstract interface class io/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransport : io/rsocket/kotlin/transport/RSocketClientTransport {
public static final field Factory Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransport$Factory;
public abstract fun getHeaders ()Lio/ktor/http/Headers;
public abstract fun getUrl ()Lio/ktor/http/Url;
}

public final class io/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransport$Factory : io/rsocket/kotlin/transport/RSocketTransportFactory {
public final fun invoke (Lkotlin/coroutines/CoroutineContext;Lio/ktor/http/HttpMethod;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransport;
public final fun invoke (Lkotlin/coroutines/CoroutineContext;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransport;
public static synthetic fun invoke$default (Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransport$Factory;Lkotlin/coroutines/CoroutineContext;Lio/ktor/http/HttpMethod;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransport;
public static synthetic fun invoke$default (Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransport$Factory;Lkotlin/coroutines/CoroutineContext;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransport;
}

public abstract interface class io/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransportBuilder : io/rsocket/kotlin/transport/RSocketTransportBuilder, io/rsocket/kotlin/transport/RSocketTransportEngineBuilder {
public abstract fun httpEngine (Lio/ktor/client/engine/HttpClientEngine;Lkotlin/jvm/functions/Function1;)V
public abstract fun httpEngine (Lio/ktor/client/engine/HttpClientEngineFactory;Lkotlin/jvm/functions/Function1;)V
public abstract fun httpEngine (Lkotlin/jvm/functions/Function1;)V
public abstract fun webSocketsConfig (Lkotlin/jvm/functions/Function1;)V
}

public final class io/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransportBuilder$DefaultImpls {
public static synthetic fun httpEngine$default (Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransportBuilder;Lio/ktor/client/engine/HttpClientEngine;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V
public static synthetic fun httpEngine$default (Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransportBuilder;Lio/ktor/client/engine/HttpClientEngineFactory;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V
}

public abstract interface class io/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransportEngine : io/rsocket/kotlin/transport/RSocketTransportEngine {
public static final field Factory Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransportEngine$Factory;
public abstract fun createTransport (Lio/ktor/http/HttpMethod;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransport;
public abstract fun createTransport (Ljava/lang/String;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransport;
}

public final class io/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransportEngine$DefaultImpls {
public static fun createTransport (Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransportEngine;Lio/ktor/http/HttpMethod;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransport;
public static fun createTransport (Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransportEngine;Ljava/lang/String;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransport;
public static synthetic fun createTransport$default (Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransportEngine;Lio/ktor/http/HttpMethod;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransport;
public static synthetic fun createTransport$default (Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransportEngine;Ljava/lang/String;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransport;
}

public final class io/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransportEngine$Factory : io/rsocket/kotlin/transport/RSocketTransportEngineFactory {
}

public final class io/rsocket/kotlin/transport/ktor/websocket/client/WebSocketClientTransportKt {
public static final fun WebSocketClientTransport (Lio/ktor/client/engine/HttpClientEngineFactory;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;ZLkotlin/coroutines/CoroutineContext;Lio/ktor/utils/io/pool/ObjectPool;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/ClientTransport;
public static final fun WebSocketClientTransport (Lio/ktor/client/engine/HttpClientEngineFactory;Ljava/lang/String;ZLkotlin/coroutines/CoroutineContext;Lio/ktor/utils/io/pool/ObjectPool;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/ClientTransport;
Expand Down
5 changes: 4 additions & 1 deletion rsocket-transport-ktor-websocket-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ kotlin {
sourceSets {
commonMain {
dependencies {
api(projects.rsocketTransportKtorWebsocketShared)
implementation(projects.rsocketInternalIo)
implementation(projects.rsocketTransportKtorWebsocketShared)

api(projects.rsocketCore)
api(libs.ktor.client.core)
api(libs.ktor.client.websockets)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
/*
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.transport.ktor.websocket.client

import io.ktor.client.*
import io.ktor.client.engine.*
import io.ktor.client.plugins.websocket.*
import io.ktor.client.request.*
import io.ktor.http.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.ktor.websocket.*
import kotlinx.coroutines.*
import kotlin.coroutines.*

public sealed interface KtorWebSocketClientTransport : RSocketClientTransport {
public val url: Url
public val headers: Headers

public companion object Factory : RSocketTransportFactory<
HttpRequestBuilder.() -> Unit,
KtorWebSocketClientTransport,
KtorWebSocketClientTransportBuilder>(::KtorWebSocketClientTransportBuilderImpl) {

public operator fun invoke(
context: CoroutineContext,
urlString: String,
request: HttpRequestBuilder.() -> Unit = {},
block: KtorWebSocketClientTransportBuilder.() -> Unit = {},
): KtorWebSocketClientTransport = invoke(
context = context,
method = HttpMethod.Get, host = null, port = null, path = null,
request = {
url.protocol = URLProtocol.WS
url.port = port

url.takeFrom(urlString)
request()
},
block = block
)

public operator fun invoke(
context: CoroutineContext,
method: HttpMethod = HttpMethod.Get,
host: String? = null,
port: Int? = null,
path: String? = null,
request: HttpRequestBuilder.() -> Unit = {},
block: KtorWebSocketClientTransportBuilder.() -> Unit = {},
): KtorWebSocketClientTransport = invoke(context, {
this.method = method
url("ws", host, port, path)
request()
}, block)
}
}

public sealed interface KtorWebSocketClientTransportEngine :
RSocketTransportEngine<HttpRequestBuilder.() -> Unit, KtorWebSocketClientTransport> {

public fun createTransport(
urlString: String,
request: HttpRequestBuilder.() -> Unit = {},
): KtorWebSocketClientTransport = createTransport(
method = HttpMethod.Get, host = null, port = null, path = null,
) {
url.protocol = URLProtocol.WS
url.port = port
url.takeFrom(urlString)
request()
}

public fun createTransport(
method: HttpMethod = HttpMethod.Get,
host: String? = null,
port: Int? = null,
path: String? = null,
request: HttpRequestBuilder.() -> Unit = {},
): KtorWebSocketClientTransport = createTransport {
this.method = method
url("ws", host, port, path)
request()
}

public companion object Factory : RSocketTransportEngineFactory<
HttpRequestBuilder.() -> Unit,
KtorWebSocketClientTransport,
KtorWebSocketClientTransportEngine,
KtorWebSocketClientTransportBuilder>(::KtorWebSocketClientTransportBuilderImpl)
}

public sealed interface KtorWebSocketClientTransportBuilder :
RSocketTransportBuilder<HttpRequestBuilder.() -> Unit, KtorWebSocketClientTransport>,
RSocketTransportEngineBuilder<HttpRequestBuilder.() -> Unit, KtorWebSocketClientTransport, KtorWebSocketClientTransportEngine> {

public fun httpEngine(configure: HttpClientEngineConfig.() -> Unit)
public fun httpEngine(engine: HttpClientEngine, configure: HttpClientEngineConfig.() -> Unit = {})
public fun <T : HttpClientEngineConfig> httpEngine(factory: HttpClientEngineFactory<T>, configure: T.() -> Unit = {})

public fun webSocketsConfig(block: WebSockets.Config.() -> Unit)
}

private class KtorWebSocketClientTransportBuilderImpl : KtorWebSocketClientTransportBuilder {
private var httpClientFactory: HttpClientFactory = HttpClientFactory.Default
private var webSocketsConfig: WebSockets.Config.() -> Unit = {}

override fun httpEngine(configure: HttpClientEngineConfig.() -> Unit) {
this.httpClientFactory = HttpClientFactory.FromConfiguration(configure)
}

override fun httpEngine(engine: HttpClientEngine, configure: HttpClientEngineConfig.() -> Unit) {
this.httpClientFactory = HttpClientFactory.FromEngine(engine, configure)
}

override fun <T : HttpClientEngineConfig> httpEngine(factory: HttpClientEngineFactory<T>, configure: T.() -> Unit) {
this.httpClientFactory = HttpClientFactory.FromFactory(factory, configure)
}

override fun webSocketsConfig(block: WebSockets.Config.() -> Unit) {
this.webSocketsConfig = block
}

private fun buildHttpClient(context: CoroutineContext): Pair<CoroutineContext, HttpClient> {
val httpClient = httpClientFactory.createHttpClient {
install(WebSockets, webSocketsConfig)
}
val newContext = httpClient.coroutineContext + context.supervisorContext()
val newJob = newContext.job
val httpClientJob = httpClient.coroutineContext.job

httpClientJob.invokeOnCompletion { newJob.cancel("HttpClient closed", it) }
newJob.invokeOnCompletion { httpClientJob.cancel("Transport closed", it) }

return newContext to httpClient
}

@RSocketTransportApi
override fun buildTransport(context: CoroutineContext, target: HttpRequestBuilder.() -> Unit): KtorWebSocketClientTransport {
val (newContext, httpClient) = buildHttpClient(context)
return KtorWebSocketClientTransportImpl(
coroutineContext = newContext,
requestBlock = target,
httpClient = httpClient,
)
}

@RSocketTransportApi
override fun buildEngine(context: CoroutineContext): KtorWebSocketClientTransportEngine {
val (newContext, httpClient) = buildHttpClient(context)
return KtorWebSocketClientTransportEngineImpl(
coroutineContext = newContext,
httpClient = httpClient,
)
}
}

private class KtorWebSocketClientTransportEngineImpl(
override val coroutineContext: CoroutineContext,
private val httpClient: HttpClient,
) : KtorWebSocketClientTransportEngine {
override fun createTransport(target: HttpRequestBuilder.() -> Unit): KtorWebSocketClientTransport {
return KtorWebSocketClientTransportImpl(
coroutineContext = coroutineContext.supervisorContext(),
requestBlock = target,
httpClient = httpClient
)
}
}

private class KtorWebSocketClientTransportImpl(
override val coroutineContext: CoroutineContext,
private val requestBlock: HttpRequestBuilder.() -> Unit,
private val httpClient: HttpClient,
) : KtorWebSocketClientTransport {
private val requestData: HttpRequestData by lazy {
HttpRequestBuilder().apply {
url {
protocol = URLProtocol.WS
port = protocol.defaultPort
}
}.apply(requestBlock).build()
}
override val url: Url get() = requestData.url
override val headers: Headers get() = requestData.headers

@RSocketTransportApi
override suspend fun createSession(): RSocketTransportSession {
ensureActive()

return KtorWebSocketSession(httpClient.webSocketSession(requestBlock))
}
}

private sealed class HttpClientFactory {
abstract fun createHttpClient(block: HttpClientConfig<*>.() -> Unit): HttpClient

object Default : HttpClientFactory() {
override fun createHttpClient(block: HttpClientConfig<*>.() -> Unit): HttpClient = HttpClient(block)
}

class FromConfiguration(
private val configure: HttpClientEngineConfig.() -> Unit,
) : HttpClientFactory() {
override fun createHttpClient(block: HttpClientConfig<*>.() -> Unit): HttpClient = HttpClient {
engine(configure)
block()
}
}

class FromEngine(
private val engine: HttpClientEngine,
private val configure: HttpClientEngineConfig.() -> Unit,
) : HttpClientFactory() {
override fun createHttpClient(block: HttpClientConfig<*>.() -> Unit): HttpClient = HttpClient(engine) {
engine(configure)
block()
}
}

class FromFactory<T : HttpClientEngineConfig>(
private val factory: HttpClientEngineFactory<T>,
private val configure: T.() -> Unit,
) : HttpClientFactory() {
override fun createHttpClient(block: HttpClientConfig<*>.() -> Unit): HttpClient = HttpClient(factory) {
engine(configure)
block()
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,34 @@
public abstract interface class io/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerInstance : io/rsocket/kotlin/transport/RSocketServerInstance {
public abstract fun getProtocol ()Ljava/lang/String;
public abstract fun getResolvedConnectors ()Ljava/util/List;
public abstract fun getRoute ()Ljava/lang/String;
}

public abstract interface class io/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransport : io/rsocket/kotlin/transport/RSocketServerTransport {
public static final field Factory Lio/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransport$Factory;
public abstract fun getConnectors ()Ljava/util/List;
}

public final class io/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransport$Factory : io/rsocket/kotlin/transport/RSocketTransportFactory {
public final fun invoke (Lkotlin/coroutines/CoroutineContext;ILjava/lang/String;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransport;
public final fun invoke (Lkotlin/coroutines/CoroutineContext;Lio/ktor/server/engine/EngineConnectorConfig;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransport;
public final fun invoke (Lkotlin/coroutines/CoroutineContext;[Lio/ktor/server/engine/EngineConnectorConfig;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransport;
public static synthetic fun invoke$default (Lio/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransport$Factory;Lkotlin/coroutines/CoroutineContext;ILjava/lang/String;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransport;
public static synthetic fun invoke$default (Lio/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransport$Factory;Lkotlin/coroutines/CoroutineContext;Lio/ktor/server/engine/EngineConnectorConfig;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransport;
public static synthetic fun invoke$default (Lio/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransport$Factory;Lkotlin/coroutines/CoroutineContext;[Lio/ktor/server/engine/EngineConnectorConfig;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransport;
}

public abstract interface class io/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransportBuilder : io/rsocket/kotlin/transport/RSocketTransportBuilder {
public abstract fun httpEngine (Lio/ktor/server/engine/ApplicationEngineFactory;Lkotlin/jvm/functions/Function1;)V
public abstract fun protocol (Ljava/lang/String;)V
public abstract fun route (Ljava/lang/String;)V
public abstract fun webSocketsConfig (Lkotlin/jvm/functions/Function1;)V
}

public final class io/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransportBuilder$DefaultImpls {
public static synthetic fun httpEngine$default (Lio/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransportBuilder;Lio/ktor/server/engine/ApplicationEngineFactory;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V
}

public final class io/rsocket/kotlin/transport/ktor/websocket/server/WebSocketServerTransportKt {
public static final fun WebSocketServerTransport (Lio/ktor/server/engine/ApplicationEngineFactory;ILjava/lang/String;Ljava/lang/String;Ljava/lang/String;Lio/ktor/utils/io/pool/ObjectPool;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/ServerTransport;
public static final fun WebSocketServerTransport (Lio/ktor/server/engine/ApplicationEngineFactory;[Lio/ktor/server/engine/EngineConnectorConfig;Ljava/lang/String;Ljava/lang/String;Lio/ktor/utils/io/pool/ObjectPool;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/ServerTransport;
Expand Down
Loading

0 comments on commit 6cf8765

Please sign in to comment.