Skip to content

Commit

Permalink
Merge pull request #11 from yahoo/sal/multithreaded
Browse files Browse the repository at this point in the history
Make threadsafe
  • Loading branch information
slevin authored Oct 16, 2024
2 parents d97eb1a + 4b752c4 commit 8c76e95
Show file tree
Hide file tree
Showing 13 changed files with 425 additions and 104 deletions.
2 changes: 2 additions & 0 deletions behavior-graph/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,6 @@ mavenPublishing {
dependencies {
implementation "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
testImplementation "org.jetbrains.kotlin:kotlin-test:$kotlin_version"
// implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.9.0"
// testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:1.9.0"
}
71 changes: 67 additions & 4 deletions behavior-graph/src/main/kotlin/behaviorgraph/Action.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
//
package behaviorgraph

import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutionException
import java.util.concurrent.Future
import java.util.concurrent.Semaphore
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException

/**
* An __Action__ is a block of code which initiates a Behavior Graph [Event].
* You create actions with the [action], [actionAsync] methods on an [Extent].
Expand All @@ -14,18 +21,74 @@ interface Action {
val debugName: String?
}

internal interface RunnableAction: Action {
fun runAction()
abstract class RunnableAction: Action, Future<Nothing?> {
abstract fun runAction()
private val semaphore: Semaphore = Semaphore(0)
private var completed: Boolean = false
private var underlyingProblem: Throwable? = null

fun fail(underlyingProblem: Throwable) {
this.underlyingProblem = underlyingProblem
semaphore.release()
}

fun complete() {
if (!completed) {
completed = true
semaphore.release()
}
}

override fun cancel(p0: Boolean): Boolean {
return false
}

override fun isCancelled(): Boolean {
return false
}

override fun isDone(): Boolean {
return completed
}

override fun get(): Nothing? {
try {
semaphore.acquire()
if (underlyingProblem == null) {
return null
} else {
throw ExecutionException(underlyingProblem)
}
} finally {
semaphore.release()
}
}

override fun get(p0: Long, p1: TimeUnit): Nothing? {
if (semaphore.tryAcquire(p0, p1)) {
try {
if (underlyingProblem == null) {
return null
} else {
throw ExecutionException(underlyingProblem)
}
} finally {
semaphore.release()
}
} else {
throw TimeoutException()
}
}
}

internal class GraphAction(val thunk: Thunk, override val debugName: String? = null): RunnableAction {
internal class GraphAction(val thunk: Thunk, override val debugName: String? = null): RunnableAction() {
override fun runAction() {
thunk.invoke()
}
}

internal class ExtentAction<T>(val thunk: ExtentThunk<T>, val context: T, override val debugName: String? = null):
RunnableAction {
RunnableAction() {
override fun runAction() {
thunk.invoke(context)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package behaviorgraph

internal data class EventLoopState(val action: RunnableAction, val actionUpdates: MutableList<Resource> = mutableListOf(), var currentSideEffect: SideEffect? = null, var phase: EventLoopPhase = EventLoopPhase.Queued) {
internal data class EventLoopState(val action: RunnableAction, val actionUpdates: MutableList<Resource> = mutableListOf(), var currentSideEffect: SideEffect? = null, var phase: EventLoopPhase = EventLoopPhase.Queued, var thread: Thread = Thread.currentThread()) {
override fun toString(): String {
var rows = mutableListOf<String>("Action")
actionUpdates?.forEach { resource ->
Expand Down
19 changes: 6 additions & 13 deletions behavior-graph/src/main/kotlin/behaviorgraph/Extent.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
//
package behaviorgraph

import java.util.concurrent.Future

/**
* An **Extent** is a collection of resources and behaviors. Extents allow us to
* add (and remove) all those resources and behaviors to a graph at the same time.
Expand Down Expand Up @@ -94,8 +96,8 @@ open class Extent<ExtentContext: Any> @JvmOverloads constructor(val graph: Graph
* Creates an Action on the graph and calls [addToGraph]
*/
@JvmOverloads
fun addToGraphWithAction(debugName: String? = null) {
this.graph.action(debugName) {
fun addToGraphWithAction(debugName: String? = null): Future<*> {
return this.graph.action(debugName) {
this.addToGraph()
}
}
Expand Down Expand Up @@ -215,22 +217,13 @@ open class Extent<ExtentContext: Any> @JvmOverloads constructor(val graph: Graph
graph.sideEffectHelper(sideEffect)
}

/**
* Calls [Graph.actionAsync] on the Graph instance associated with this [Extent].
*/
@JvmOverloads
fun actionAsync(debugName: String? = null, thunk: ExtentThunk<ExtentContext>) {
val action = ExtentAction(thunk, (context ?: this) as ExtentContext, debugName)
graph.asyncActionHelper(action)
}

/**
* Calls [Graph.action] on the Graph instance associated with this [Extent].
*/
@JvmOverloads
fun action(debugName: String? = null, thunk: ExtentThunk<ExtentContext>) {
fun action(debugName: String? = null, thunk: ExtentThunk<ExtentContext>): Future<*> {
val action = ExtentAction(thunk, (context ?: this) as ExtentContext, debugName)
graph.actionHelper(action)
return graph.actionHelper(action)
}

override fun toString(): String {
Expand Down
129 changes: 77 additions & 52 deletions behavior-graph/src/main/kotlin/behaviorgraph/Graph.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@ import behaviorgraph.Event.Companion.InitialEvent
import java.lang.System.currentTimeMillis
import java.util.ArrayDeque
import java.util.PriorityQueue
import java.util.concurrent.Executor
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.locks.ReentrantLock
import kotlin.math.max


/**
* The core construct that represents the graph of behavior and resource nodes.
* As many graphs can exist in the same program as you like; however nodes in one graph cannot directly link to nodes in another graph.
Expand All @@ -21,6 +27,7 @@ class Graph @JvmOverloads constructor(private val dateProvider: DateProvider? =
*/
var currentEvent: Event? = null
private set

/**
* The last completed event (ie all behaviors and side effects have run)
*/
Expand All @@ -34,15 +41,17 @@ class Graph @JvmOverloads constructor(private val dateProvider: DateProvider? =
var currentBehavior: Behavior<*>? = null
private set
private var effects: ArrayDeque<RunnableSideEffect> = ArrayDeque()
private var actions: ArrayDeque<RunnableAction> = ArrayDeque()
internal var actions: ArrayDeque<RunnableAction> = ArrayDeque()
private var untrackedBehaviors: MutableList<Behavior<*>> = mutableListOf()
private var modifiedDemandBehaviors: MutableList<Behavior<*>> = mutableListOf()
private var modifiedSupplyBehaviors: MutableList<Behavior<*>> = mutableListOf()
private var updatedTransients: MutableList<Transient> = mutableListOf()
private var needsOrdering: MutableList<Behavior<*>> = mutableListOf()
private var eventLoopState: EventLoopState? = null
internal var eventLoopState: EventLoopState? = null
private var extentsAdded: MutableList<Extent<*>> = mutableListOf()
private var extentsRemoved: MutableList<Extent<*>> = mutableListOf()
private val eventMutex: ReentrantLock = ReentrantLock()
private val eventExecutor: ExecutorService = Executors.newSingleThreadExecutor()

/**
* Validating dependencies between nodes in the graph can take some additional time.
Expand All @@ -69,7 +78,7 @@ class Graph @JvmOverloads constructor(private val dateProvider: DateProvider? =
* The current action may update one or more resources. Inspecting this list lets us
* identify which action initiated the current event.
*/
val actionUpdates: List<Resource>? get() = eventLoopState?.actionUpdates
val actionUpdates: List<Resource>? get() = eventLoopState?.actionUpdates

/**
* The action belonging to the current event if one is running.
Expand All @@ -81,34 +90,21 @@ class Graph @JvmOverloads constructor(private val dateProvider: DateProvider? =
*/
val currentSideEffect: SideEffect? get() = eventLoopState?.currentSideEffect

/**
* Side effects will run on the same thread of the action by default.
* Provide a different executor run side effects on your preferred thread (UI thread)
*/
var sideEffectExecutor: Executor = object : Executor {
override fun execute(r: Runnable) {
r.run()
}
}

init {
activatedBehaviors = PriorityQueue()
lastEvent = InitialEvent
}

/**
* Creates a new action but will not necessarily block until the passed in function is run.
* This is useful when coordinating side effects that lead to new actions.
*
* Example:
* ```kotlin
* graph.actionAsync { resource1.update() }
* afterFunction()
* ```
*
* - If the graph is currently running an event and we call `actionAsync` like above, the internal
* block of code will be put on an internal queue and `afterFunction()` will get called next.
* - If the graph is __not__ running an event and we call `actionAsync`, `resource`.update()` will
* get called, the entire graph will update and `afterFunction()` will finally run.
*
* @param debugName let's us add additional context to an action for debugging
*/
@JvmOverloads
fun actionAsync(debugName: String? = null, thunk: Thunk) {
val graphAction = GraphAction(thunk, debugName)
asyncActionHelper(graphAction)
}

/**
* Creates a new action. It will always run the passed in function and subsequent graph event
* before continuing to the next line.
Expand All @@ -122,29 +118,55 @@ class Graph @JvmOverloads constructor(private val dateProvider: DateProvider? =
* In the above example `resource1.update()` and associated event will always run before `afterFunction()`
* is called.
*
* @param debugName let's us add additional context to an action for debugging
* @param debugName lets us add additional context to an action for debugging
*/
@JvmOverloads
fun action(debugName: String? = null, thunk: Thunk) {
fun action(
debugName: String? = null,
thunk: Thunk
): Future<*> {
val graphAction = GraphAction(thunk, debugName)
actionHelper(graphAction)
return actionHelper(graphAction)
}

internal fun actionHelper(action: RunnableAction) {
if (eventLoopState != null && (eventLoopState!!.phase == EventLoopPhase.Action || eventLoopState!!.phase == EventLoopPhase.Updates)) {
throw BehaviorGraphException("Action cannot be created directly inside another action or behavior. Consider wrapping it in a side effect block.")
}
actions.addLast(action)
eventLoop()
}

internal fun asyncActionHelper(action: RunnableAction) {
if (eventLoopState != null && (eventLoopState!!.phase == EventLoopPhase.Action || eventLoopState!!.phase == EventLoopPhase.Updates)) {
throw BehaviorGraphException("Action cannot be created directly inside another action or behavior. Consider wrapping it in a side effect block.")
}
actions.addLast(action)
if (currentEvent == null) {
eventLoop()
internal fun actionHelper(
action: RunnableAction
): Future<*> {
// Check if not running or already running on same thread
if (eventMutex.tryLock()) {
try {
// Check that an action wasn't created from inside another action or behavior.
// It is easy to accidentally do an updateWithAction inside a behavior or action
// We want to alert the programmer to that mistake.
val wrongAction = eventLoopState != null &&
eventLoopState!!.thread == Thread.currentThread() &&
(eventLoopState!!.phase == EventLoopPhase.Action || eventLoopState!!.phase == EventLoopPhase.Updates)
assert(
!wrongAction,
{ "Action cannot be created directly inside another action or behavior. Consider wrapping it in a side effect block." })
// queue up the action and start the event loop if one isn't already running
actions.addLast(action)
if (currentEvent == null) {
eventLoop()
}
// action is an implementation of Future interface
return action
} finally {
eventMutex.unlock()
}
} else {
eventExecutor.execute {
try {
eventMutex.lock()
actions.addLast(action)
if (currentEvent == null) {
eventLoop()
}
} finally {
eventMutex.unlock()
}
}
return action
}
}

Expand Down Expand Up @@ -182,7 +204,7 @@ class Graph @JvmOverloads constructor(private val dateProvider: DateProvider? =
val effect = this.effects.removeFirst()
eventLoopState!!.phase = EventLoopPhase.SideEffects
eventLoopState!!.currentSideEffect = effect
effect.runSideEffect()
sideEffectExecutor!!.execute(effect)
if (eventLoopState != null) {
// side effect could create a synchronous action which would create a nested event loop
// which would clear out any existing event loop states
Expand All @@ -192,11 +214,13 @@ class Graph @JvmOverloads constructor(private val dateProvider: DateProvider? =
}

currentEvent?.let { aCurrentEvent ->
val eventAction = eventLoopState!!.action
clearTransients()
lastEvent = aCurrentEvent
currentEvent = null
eventLoopState = null
currentBehavior = null
eventAction.complete()
}

if (actions.isNotEmpty()) {
Expand All @@ -210,9 +234,10 @@ class Graph @JvmOverloads constructor(private val dateProvider: DateProvider? =
action.runAction()
continue
}
} catch (e: Exception) {
} catch (e: Throwable) {
//put graph into clean state and rethrow exception
currentEvent = null
eventLoopState?.action?.fail(e) // put exception into action's Future
eventLoopState = null
actions.clear()
effects.clear()
Expand Down Expand Up @@ -243,9 +268,9 @@ class Graph @JvmOverloads constructor(private val dateProvider: DateProvider? =
}
}
}
if (needAdding.size > 0) {
throw BehaviorGraphException("All extents with unified or parent lifetimes must be added during the same event. Extents=$needAdding")
}
assert(
needAdding.size == 0,
{ "All extents with unified or parent lifetimes must be added during the same event. Extents=$needAdding" })
}

private fun validateRemovedExtents() {
Expand All @@ -260,9 +285,9 @@ class Graph @JvmOverloads constructor(private val dateProvider: DateProvider? =
}
}
}
if (needRemoving.size > 0) {
throw BehaviorGraphException("All extents with unified or child lifetimes must be removed during the same event. Extents=$needRemoving")
}
assert(
needRemoving.size == 0,
{ "All extents with unified or child lifetimes must be removed during the same event. Extents=$needRemoving" })

// validate removed resources are not still linked to remaining behaviors
for (removed in extentsRemoved) {
Expand Down
Loading

0 comments on commit 8c76e95

Please sign in to comment.