Skip to content

Commit

Permalink
Integrated Presence with PubNub stage1
Browse files Browse the repository at this point in the history
  • Loading branch information
marcin-cebo committed Aug 12, 2023
1 parent 170a300 commit dd8786e
Show file tree
Hide file tree
Showing 23 changed files with 473 additions and 165 deletions.
10 changes: 5 additions & 5 deletions src/main/kotlin/com/pubnub/api/PubNub.kt
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import com.pubnub.api.endpoints.push.RemoveChannelsFromPush
import com.pubnub.api.enums.PNPushEnvironment
import com.pubnub.api.enums.PNPushType
import com.pubnub.api.enums.PNReconnectionPolicy
import com.pubnub.api.eventengine.EventEngineConf
import com.pubnub.api.eventengine.EventEnginesConf
import com.pubnub.api.managers.BasePathManager
import com.pubnub.api.managers.DuplicationManager
import com.pubnub.api.managers.ListenerManager
Expand Down Expand Up @@ -86,7 +86,7 @@ import com.pubnub.api.models.consumer.objects.membership.ChannelMembershipInput
import com.pubnub.api.models.consumer.objects.membership.PNChannelDetailsLevel
import com.pubnub.api.presence.Presence
import com.pubnub.api.subscribe.Subscribe
import com.pubnub.api.subscribe.eventengine.configuration.SubscribeEventEngineConfImpl
import com.pubnub.api.subscribe.eventengine.configuration.EventEnginesConfImpl
import com.pubnub.api.vendor.Base64
import com.pubnub.api.vendor.Crypto
import com.pubnub.api.vendor.FileEncryptionUtil.decrypt
Expand All @@ -98,10 +98,10 @@ import java.util.UUID

class PubNub internal constructor(
val configuration: PNConfiguration,
eventEngineConf: EventEngineConf
eventEnginesConf: EventEnginesConf
) {

constructor(configuration: PNConfiguration) : this(configuration, SubscribeEventEngineConfImpl())
constructor(configuration: PNConfiguration) : this(configuration, EventEnginesConfImpl())

companion object {
private const val TIMESTAMP_DIVIDER = 1000
Expand Down Expand Up @@ -129,7 +129,7 @@ class PubNub internal constructor(
private val tokenParser: TokenParser = TokenParser()
private val listenerManager = ListenerManager(this)
internal val subscriptionManager = SubscriptionManager(this, listenerManager)
private val subscribe = Subscribe.create(this, listenerManager, configuration.retryPolicy, eventEngineConf, SubscribeMessageProcessor(this, DuplicationManager(configuration)))
private val subscribe = Subscribe.create(this, listenerManager, configuration.retryPolicy, eventEnginesConf, SubscribeMessageProcessor(this, DuplicationManager(configuration)))

//endregion

Expand Down
6 changes: 6 additions & 0 deletions src/main/kotlin/com/pubnub/api/eventengine/EventEngine.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.pubnub.api.eventengine

interface EventEngine {
fun start()
fun stop()
}
12 changes: 0 additions & 12 deletions src/main/kotlin/com/pubnub/api/eventengine/EventEngineConf.kt

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.pubnub.api.eventengine

interface EventEngineManager {
fun addEventToQueue(event: Event)
fun start()
fun stop()
}
18 changes: 18 additions & 0 deletions src/main/kotlin/com/pubnub/api/eventengine/EventEnginesConf.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.pubnub.api.eventengine

import com.pubnub.api.presence.eventengine.effect.PresenceEffectInvocation
import com.pubnub.api.presence.eventengine.event.PresenceEvent
import com.pubnub.api.subscribe.eventengine.effect.SubscribeEffectInvocation
import com.pubnub.api.subscribe.eventengine.event.SubscribeEvent

interface EventEnginesConf {
val subscribeEventSink: Sink<SubscribeEvent>
val subscribeEventSource: Source<SubscribeEvent>
val subscribeEffectSink: Sink<SubscribeEffectInvocation>
val subscribeEffectSource: Source<SubscribeEffectInvocation>

val presenceEventSink: Sink<PresenceEvent>
val presenceEventSource: Source<PresenceEvent>
val presenceEffectSink: Sink<PresenceEffectInvocation>
val presenceEffectSource: Source<PresenceEffectInvocation>
}
18 changes: 9 additions & 9 deletions src/main/kotlin/com/pubnub/api/eventengine/State.kt
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package com.pubnub.api.eventengine

interface State<T : EffectInvocation, U : Event, V : State<T, U, V>> {
open fun onEntry(): Set<T> = setOf()
open fun onExit(): Set<T> = setOf()
abstract fun transition(event: U): Pair<V, Set<T>>
interface State<Ei : EffectInvocation, Ev : Event, S : State<Ei, Ev, S>> {
open fun onEntry(): Set<Ei> = setOf()
open fun onExit(): Set<Ei> = setOf()
abstract fun transition(event: Ev): Pair<S, Set<Ei>>
}

fun <T : EffectInvocation, U : Event, V : State<T, U, V>> V.noTransition(): Pair<V, Set<T>> = Pair(this, emptySet())
fun <T : EffectInvocation, U : Event, V : State<T, U, V>> V.transitionTo(
state: V,
vararg invocations: T
): Pair<V, Set<T>> {
fun <Ei : EffectInvocation, Ev : Event, S : State<Ei, Ev, S>> S.noTransition(): Pair<S, Set<Ei>> = Pair(this, emptySet())
fun <Ei : EffectInvocation, Ev : Event, S : State<Ei, Ev, S>> S.transitionTo(
state: S,
vararg invocations: Ei
): Pair<S, Set<Ei>> {
val effectInvocations = this.onExit() + invocations + state.onEntry()
return Pair(state, effectInvocations)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.pubnub.api.managers

import com.pubnub.api.eventengine.EffectDispatcher
import com.pubnub.api.eventengine.Event
import com.pubnub.api.eventengine.EventEngineManager
import com.pubnub.api.eventengine.Sink
import com.pubnub.api.presence.eventengine.PresenceEventEngine
import com.pubnub.api.presence.eventengine.effect.PresenceEffectInvocation
import com.pubnub.api.presence.eventengine.event.PresenceEvent

class PresenceEventEngineManager(
private val presenceEventEngine: PresenceEventEngine,
private val effectDispatcher: EffectDispatcher<PresenceEffectInvocation>,
private val eventSink: Sink<PresenceEvent>
) : EventEngineManager {

override fun addEventToQueue(event: Event) {
eventSink.add(event as PresenceEvent)
}

override fun start() {
presenceEventEngine.start()
effectDispatcher.start()
}

override fun stop() {
presenceEventEngine.stop()
effectDispatcher.stop()
}
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
package com.pubnub.api.managers

import com.pubnub.api.eventengine.EffectDispatcher
import com.pubnub.api.eventengine.Event
import com.pubnub.api.eventengine.EventEngineManager
import com.pubnub.api.eventengine.Sink
import com.pubnub.api.subscribe.eventengine.SubscribeEventEngine
import com.pubnub.api.subscribe.eventengine.effect.SubscribeEffectInvocation
import com.pubnub.api.subscribe.eventengine.event.SubscribeEvent

class EventEngineManager(
class SubscribeEventEngineManager(
private val subscribeEventEngine: SubscribeEventEngine,
private val effectDispatcher: EffectDispatcher<SubscribeEffectInvocation>,
private val eventSink: Sink<SubscribeEvent>
) {
) : EventEngineManager {

fun addEventToQueue(subscribeEvent: SubscribeEvent) {
eventSink.add(subscribeEvent) // todo add unit tests
override fun addEventToQueue(event: Event) {
eventSink.add(event as SubscribeEvent)
}

fun start() {
override fun start() {
subscribeEventEngine.start()
effectDispatcher.start()
}

fun stop() {
override fun stop() {
subscribeEventEngine.stop()
effectDispatcher.stop()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.pubnub.api.presence.eventengine

import com.pubnub.api.eventengine.EventEngine
import com.pubnub.api.eventengine.Sink
import com.pubnub.api.eventengine.Source
import com.pubnub.api.eventengine.transition
import com.pubnub.api.presence.eventengine.effect.PresenceEffectInvocation
import com.pubnub.api.presence.eventengine.event.PresenceEvent
import com.pubnub.api.presence.eventengine.state.PresenceState
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

class PresenceEventEngine(
private val effectSink: Sink<PresenceEffectInvocation>,
private val eventSource: Source<PresenceEvent>,
private var currentState: PresenceState = PresenceState.HearbeatInactive,
private val executorService: ExecutorService = Executors.newSingleThreadExecutor()
) : EventEngine {

override fun start() {
executorService.submit {
try {
while (true) {
val event = eventSource.take()
performTransitionAndEmitEffects(event)
}
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
}
}
}

override fun stop() {
executorService.shutdownNow()
}

internal fun performTransitionAndEmitEffects(presenceEvent: PresenceEvent) { // todo add unit tests
val (newState, invocations) = transition(currentState, presenceEvent)
currentState = newState
invocations.forEach { invocation -> effectSink.add(invocation) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@ import com.pubnub.api.eventengine.Effect
import com.pubnub.api.eventengine.EffectFactory
import com.pubnub.api.eventengine.Sink
import com.pubnub.api.presence.eventengine.effect.effectprovider.HeartbeatProvider
import com.pubnub.api.presence.eventengine.effect.effectprovider.LeaveProvider
import com.pubnub.api.presence.eventengine.event.PresenceEvent
import com.pubnub.api.subscribe.eventengine.effect.RetryPolicy
import java.util.concurrent.ScheduledExecutorService

internal class PresenceEffectFactory(
private val heartbeatProvider: HeartbeatProvider,
private val leaveProvider: HeartbeatProvider,
private val leaveProvider: LeaveProvider,
private val presenceEventSink: Sink<PresenceEvent>,
private val policy: RetryPolicy,
private val executorService: ScheduledExecutorService,
private val heartbeatIntervalInSec: Int,
) : EffectFactory<PresenceEffectInvocation> {
override fun create(effectInvocation: PresenceEffectInvocation): Effect? {
return when (effectInvocation) {
Expand All @@ -39,22 +41,19 @@ internal class PresenceEffectFactory(
)
}
is PresenceEffectInvocation.Leave -> {
val heartbeatRemoteAction = leaveProvider.getHeartbeatRemoteAction(
val leaveRemoteAction = leaveProvider.getLeaveRemoteAction(
effectInvocation.channels,
effectInvocation.channelGroups
)
LeaveEffect(heartbeatRemoteAction)
LeaveEffect(leaveRemoteAction)
}
is PresenceEffectInvocation.Wait -> {
TODO()
WaitEffect(heartbeatIntervalInSec, presenceEventSink)
}

PresenceEffectInvocation.CancelDelayedHeartbeat -> {
TODO()
}
PresenceEffectInvocation.CancelWait -> {
TODO()
}
PresenceEffectInvocation.CancelDelayedHeartbeat,
PresenceEffectInvocation.CancelWait
-> null
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ sealed class PresenceEvent : Event {
data class HeartbeatFailure(val reason: PubNubException) : PresenceEvent()
data class HeartbeatGiveup(val reason: PubNubException) : PresenceEvent()

data class StateSet(val channels: Set<String>, val channelGroups: Set<String>) : PresenceEvent()
data class StateSet(val channels: Set<String>, val channelGroups: Set<String>) : PresenceEvent() // todo how to from setPresenceState operation call Presence EE
}
Loading

0 comments on commit dd8786e

Please sign in to comment.