Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Realtime tests #658

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Realtime/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ kotlin {
api(libs.ktor.client.websockets)
}
}
val commonTest by getting {
dependencies {
implementation(libs.ktor.server.host)
implementation(libs.ktor.server.websockets)
implementation(project(":test-common"))
implementation(libs.bundles.testing)
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import io.github.jan.supabase.plugins.MainPlugin
import io.github.jan.supabase.plugins.SupabasePluginProvider
import io.github.jan.supabase.serializer.KotlinXSerializer
import io.github.jan.supabase.supabaseJson
import io.ktor.client.plugins.websocket.DefaultClientWebSocketSession
import io.ktor.client.plugins.websocket.WebSockets
import io.ktor.serialization.kotlinx.KotlinxWebsocketSerializationConverter
import kotlinx.coroutines.flow.StateFlow
Expand Down Expand Up @@ -113,6 +114,7 @@ sealed interface Realtime : MainPlugin<Realtime.Config>, CustomSerializationPlug
var disconnectOnSessionLoss: Boolean = true,
var connectOnSubscribe: Boolean = true,
var disconnectOnNoSubscriptions: Boolean = true,
var websocketSessionProvider: (suspend () -> DefaultClientWebSocketSession)? = null,
@Deprecated("This property is deprecated and will be removed in a future version.") var eventsPerSecond: Int = 10,
): MainConfig(), CustomSerializationConfig {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.serialization.json.buildJsonObject
import kotlin.collections.component1
import kotlin.collections.component2
import kotlin.collections.set
import kotlin.time.Duration.Companion.milliseconds

@PublishedApi internal class RealtimeImpl(override val supabaseClient: SupabaseClient, override val config: Realtime.Config) : Realtime {
Expand Down Expand Up @@ -76,7 +79,7 @@ import kotlin.time.Duration.Companion.milliseconds
_status.value = Realtime.Status.CONNECTING
val realtimeUrl = websocketUrl
try {
ws = supabaseClient.httpClient.webSocketSession(realtimeUrl)
ws = config.websocketSessionProvider?.invoke() ?: supabaseClient.httpClient.webSocketSession(realtimeUrl)
_status.value = Realtime.Status.CONNECTED
Realtime.logger.i { "Connected to realtime websocket!" }
listenForMessages()
Expand Down Expand Up @@ -234,7 +237,7 @@ import kotlin.time.Duration.Companion.milliseconds
}

override suspend fun close() {
ws?.cancel()
disconnect()
}

override suspend fun block() {
Expand Down
5 changes: 5 additions & 0 deletions Realtime/src/commonTest/kotlin/FlowUtils.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first

suspend inline fun <reified T> Flow<T>.waitForValue(value: T) = filter { it == value }.first()
193 changes: 193 additions & 0 deletions Realtime/src/commonTest/kotlin/RealtimeChannelTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import io.github.jan.supabase.gotrue.Auth
import io.github.jan.supabase.gotrue.auth
import io.github.jan.supabase.gotrue.minimalSettings
import io.github.jan.supabase.realtime.Realtime
import io.github.jan.supabase.realtime.RealtimeChannel
import io.github.jan.supabase.realtime.RealtimeChannel.Companion.CHANNEL_EVENT_REPLY
import io.github.jan.supabase.realtime.RealtimeChannel.Companion.CHANNEL_EVENT_SYSTEM
import io.github.jan.supabase.realtime.RealtimeJoinPayload
import io.github.jan.supabase.realtime.RealtimeMessage
import io.github.jan.supabase.realtime.channel
import io.github.jan.supabase.realtime.realtime
import io.ktor.server.websocket.receiveDeserialized
import io.ktor.server.websocket.sendSerialized
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.buildJsonObject
import kotlinx.serialization.json.decodeFromJsonElement
import kotlinx.serialization.json.jsonObject
import kotlinx.serialization.json.jsonPrimitive
import kotlinx.serialization.json.put
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith

class RealtimeChannelTest {

@Test
fun testConnectOnSubscribeDisabled() {
createTestClient(
wsHandler = {
//Does not matter for this test
},
supabaseHandler = {
val channel = it.channel("")
assertFailsWith<IllegalStateException>() {
channel.subscribe()
}
},
realtimeConfig = {
connectOnSubscribe = false
}
)
}

@Test
fun testConnectOnSubscribeEnabled() {
createTestClient(
wsHandler = {
incoming.receive()
},
supabaseHandler = {
val channel = it.channel("")
channel.subscribe(false)
assertEquals(Realtime.Status.CONNECTED, it.realtime.status.value)
}
)
}

@Test
fun testChannelStatusWithoutPostgres() {
val channelId = "channelId"
createTestClient(
wsHandler = {
incoming.receive()
sendSerialized(RealtimeMessage("realtime:$channelId", CHANNEL_EVENT_SYSTEM, buildJsonObject { put("status", "ok") }, ""))
incoming.receive()
sendSerialized(RealtimeMessage("realtime:$channelId", CHANNEL_EVENT_REPLY, buildJsonObject { put("status", "ok") }, ""))
},
supabaseHandler = {
val channel = it.channel("channelId")
assertEquals(channel.status.value, RealtimeChannel.Status.UNSUBSCRIBED)
assertEquals(it.realtime.status.value, Realtime.Status.DISCONNECTED)
channel.subscribe(blockUntilSubscribed = true)
assertEquals(channel.status.value, RealtimeChannel.Status.SUBSCRIBED)
channel.unsubscribe()
assertEquals(channel.status.value, RealtimeChannel.Status.UNSUBSCRIBING)
assertEquals(RealtimeChannel.Status.UNSUBSCRIBED, channel.status.waitForValue(RealtimeChannel.Status.UNSUBSCRIBED))
},
)
}

@Test
fun testSendingPayloadWithoutJWT() {
val expectedChannelId = "channelId"
val expectedIsPrivate = true
val expectedReceiveOwnBroadcasts = true
val expectedAcknowledge = true
val expectedPresenceKey = "presenceKey"
createTestClient(
wsHandler = {
val message = this.receiveDeserialized<RealtimeMessage>()
val payload = Json.decodeFromJsonElement<RealtimeJoinPayload>(message.payload)
assertEquals("realtime:$expectedChannelId", message.topic)
assertEquals(expectedIsPrivate, payload.config.isPrivate)
assertEquals(expectedReceiveOwnBroadcasts, payload.config.broadcast.receiveOwnBroadcasts)
assertEquals(expectedAcknowledge, payload.config.broadcast.acknowledgeBroadcasts)
assertEquals(expectedPresenceKey, payload.config.presence.key)
},
supabaseHandler = {
val channel = it.channel("channelId") {
isPrivate = expectedIsPrivate
broadcast {
receiveOwnBroadcasts = expectedReceiveOwnBroadcasts
acknowledgeBroadcasts = expectedAcknowledge
}
presence {
key = expectedPresenceKey
}
}
channel.subscribe()
}
)
}

@Test
fun testSendingPayloadWithAuthJWT() {
val expectedAuthToken = "authToken"
createTestClient(
wsHandler = {
val message = this.receiveDeserialized<RealtimeMessage>()
assertEquals(expectedAuthToken, message.payload["access_token"]?.jsonPrimitive?.content)
},
supabaseHandler = {
it.auth.importAuthToken(expectedAuthToken)
val channel = it.channel("channelId")
channel.subscribe()
},
supabaseConfig = {
install(Auth) {
minimalSettings()
}
}
)
}

@Test
fun testSendingPayloadWithCustomJWT() {
val expectedAuthToken = "authToken"
createTestClient(
wsHandler = {
val message = this.receiveDeserialized<RealtimeMessage>()
assertEquals(expectedAuthToken, message.payload["access_token"]?.jsonPrimitive?.content)
},
supabaseHandler = {
val channel = it.channel("channelId")
channel.subscribe()
},
realtimeConfig = {
jwtToken = expectedAuthToken
}
)
}

@Test
fun testSendingBroadcasts() {
val message = buildJsonObject {
put("key", "value")
}
val event = "event"
createTestClient(
wsHandler = {
handleSubscribe("channelId")
val rMessage = this.receiveDeserialized<RealtimeMessage>()
assertEquals("realtime:channelId", rMessage.topic)
assertEquals("broadcast", rMessage.event)
assertEquals(message, rMessage.payload["payload"]?.jsonObject)
assertEquals(event, rMessage.payload["event"]?.jsonPrimitive?.content)
assertEquals("broadcast", rMessage.payload["type"]?.jsonPrimitive?.content)
},
supabaseHandler = {
val channel = it.channel("channelId")
channel.subscribe(true)
channel.broadcast(event, message)
}
)
}

@Test
fun testSendingPresenceUnsubscribed() {
createTestClient(
wsHandler = {
handleSubscribe("channelId")
},
supabaseHandler = {
val channel = it.channel("channelId")
channel.subscribe(true)
assertFailsWith<IllegalStateException> {
channel.track(buildJsonObject { })
}
}
)
}

}
50 changes: 50 additions & 0 deletions Realtime/src/commonTest/kotlin/RealtimeTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import io.github.jan.supabase.realtime.Realtime
import io.github.jan.supabase.realtime.RealtimeMessage
import io.github.jan.supabase.realtime.realtime
import io.ktor.server.websocket.receiveDeserialized
import kotlinx.serialization.json.buildJsonObject
import kotlinx.serialization.json.put
import kotlin.test.Test
import kotlin.test.assertEquals

class RealtimeTest {

@Test
fun testRealtimeStatus() {
createTestClient(
wsHandler = {
//Does not matter for this test
},
supabaseHandler = {
assertEquals(Realtime.Status.DISCONNECTED, it.realtime.status.value)
it.realtime.connect()
assertEquals(Realtime.Status.CONNECTED, it.realtime.status.value)
it.realtime.disconnect()
assertEquals(Realtime.Status.DISCONNECTED, it.realtime.status.value)
}
)
}

@Test
fun testSendingRealtimeMessages() {
val expectedMessage = RealtimeMessage(
topic = "realtimeTopic",
event = "realtimeEvent",
payload = buildJsonObject {
put("key", "value")
},
ref = "realtimeRef"
)
createTestClient(
wsHandler = {
val message = this.receiveDeserialized<RealtimeMessage>()
assertEquals(expectedMessage, message)
},
supabaseHandler = {
it.realtime.connect()
it.realtime.send(expectedMessage)
}
)
}

}
11 changes: 11 additions & 0 deletions Realtime/src/commonTest/kotlin/RealtimeTestUtils.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import io.github.jan.supabase.realtime.RealtimeChannel.Companion.CHANNEL_EVENT_SYSTEM
import io.github.jan.supabase.realtime.RealtimeMessage
import io.ktor.server.websocket.DefaultWebSocketServerSession
import io.ktor.server.websocket.sendSerialized
import kotlinx.serialization.json.buildJsonObject
import kotlinx.serialization.json.put

suspend fun DefaultWebSocketServerSession.handleSubscribe(channelId: String) {
incoming.receive()
sendSerialized(RealtimeMessage("realtime:$channelId", CHANNEL_EVENT_SYSTEM, buildJsonObject { put("status", "ok") }, ""))
}
54 changes: 54 additions & 0 deletions Realtime/src/commonTest/kotlin/RealtimeWSMock.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import io.github.jan.supabase.SupabaseClient
import io.github.jan.supabase.SupabaseClientBuilder
import io.github.jan.supabase.createSupabaseClient
import io.github.jan.supabase.logging.LogLevel
import io.github.jan.supabase.realtime.Realtime
import io.github.jan.supabase.supabaseJson
import io.ktor.client.plugins.websocket.webSocket
import io.ktor.serialization.kotlinx.KotlinxWebsocketSerializationConverter
import io.ktor.server.testing.ApplicationTestBuilder
import io.ktor.server.testing.testApplication
import io.ktor.server.websocket.DefaultWebSocketServerSession
import io.ktor.server.websocket.WebSockets
import io.ktor.server.websocket.webSocket

fun ApplicationTestBuilder.configureServer(
handler: suspend DefaultWebSocketServerSession.() -> Unit
) {
install(WebSockets) {
contentConverter = KotlinxWebsocketSerializationConverter(supabaseJson)
}
routing {
webSocket("/", handler = handler)
}
}

fun createTestClient(
wsHandler: suspend DefaultWebSocketServerSession.() -> Unit,
supabaseHandler: suspend (SupabaseClient) -> Unit,
realtimeConfig: Realtime.Config.() -> Unit = {},
supabaseConfig: SupabaseClientBuilder.() -> Unit = {}
) {
testApplication {
configureServer(wsHandler)
val client = createClient {
install(io.ktor.client.plugins.websocket.WebSockets) {
contentConverter = KotlinxWebsocketSerializationConverter(supabaseJson)
}
}
client.webSocket("/") {
val supabase = createSupabaseClient("", "") {
defaultLogLevel = LogLevel.DEBUG
install(Realtime) {
websocketSessionProvider = {
this@webSocket
}
realtimeConfig()
}
supabaseConfig()
}
supabaseHandler(supabase)
supabase.close()
}
}
}
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ ktor-client-websockets = { module = "io.ktor:ktor-client-websockets", version.re
ktor-client-mock = { module = "io.ktor:ktor-client-mock", version.ref = "ktor" }
ktor-json = { module = "io.ktor:ktor-serialization-kotlinx-json", version.ref = "ktor" }
ktor-server-core = { module = "io.ktor:ktor-server-core", version.ref = "ktor" }
ktor-server-host = { module = "io.ktor:ktor-server-test-host", version.ref = "ktor" }
ktor-server-websockets = { module = "io.ktor:ktor-server-websockets", version.ref = "ktor" }
ktor-server-cio = { module = "io.ktor:ktor-server-cio", version.ref = "ktor" }

kermit = { module = "co.touchlab:kermit", version.ref = "kermit" }
Expand Down
Loading