-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add an actor model implementation example
- Loading branch information
Showing
7 changed files
with
285 additions
and
14 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
(library | ||
(name picos_meta_hoot) | ||
(public_name picos_meta.hoot) | ||
(libraries backoff multicore-magic picos)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,171 @@ | ||
open Picos | ||
|
||
let[@inline never] impossible () = failwith "impossible" | ||
let[@inline never] not_in_a_hoot () = invalid_arg "Not in a hoot" | ||
|
||
module Message = struct | ||
type t = .. | ||
end | ||
|
||
type _ tdt = | ||
| Nil : [> `Nil ] tdt | ||
| Message : { | ||
message : Message.t; | ||
next : [ `Nil | `Message ] tdt; | ||
} | ||
-> [> `Message ] tdt | ||
| Wait : Trigger.t -> [> `Wait ] tdt | ||
| Pid : { | ||
computation : unit Computation.t; | ||
terminated : unit Computation.t; | ||
incoming : incoming Atomic.t; | ||
mutable received : [ `Nil | `Message ] tdt; | ||
} | ||
-> [> `Pid ] tdt | ||
|
||
and incoming = In : [< `Nil | `Message | `Wait ] tdt -> incoming [@@unboxed] | ||
|
||
module Pid = struct | ||
type t = [ `Pid ] tdt | ||
|
||
let key : [ `Nil | `Pid ] tdt Fiber.FLS.t = Fiber.FLS.create () | ||
end | ||
|
||
let self_of fiber = | ||
match Fiber.FLS.get_exn fiber Pid.key with | ||
| Pid _ as t -> t | ||
| Nil | (exception Fiber.FLS.Not_set) -> not_in_a_hoot () | ||
|
||
let self () : Pid.t = self_of @@ Fiber.current () | ||
|
||
exception Terminate | ||
|
||
let run main = | ||
let fiber = Fiber.current () in | ||
let before = Fiber.FLS.get fiber Pid.key ~default:Nil in | ||
let computation = Computation.create ~mode:`LIFO () in | ||
let inner = Computation.Packed computation in | ||
let (Pid p as t) : Pid.t = | ||
let terminated = Computation.create ~mode:`LIFO () | ||
and incoming = Atomic.make (In Nil) |> Multicore_magic.copy_as_padded in | ||
Pid { computation; terminated; incoming; received = Nil } | ||
in | ||
Fiber.FLS.set fiber Pid.key t; | ||
let (Packed parent as outer) = Fiber.get_computation fiber in | ||
let canceler = Computation.attach_canceler ~from:parent ~into:computation in | ||
Fiber.set_computation fiber inner; | ||
begin | ||
match main () with | ||
| () | (exception Terminate) -> Computation.finish p.terminated | ||
| exception exn -> | ||
let bt = Printexc.get_raw_backtrace () in | ||
Computation.cancel p.terminated exn bt | ||
end; | ||
Computation.finish p.computation; | ||
Fiber.set_computation fiber outer; | ||
Computation.detach parent canceler; | ||
Fiber.FLS.set fiber Pid.key before; | ||
(* An otherwise unhandled exception except [Terminate] will be raised. *) | ||
Computation.check p.terminated | ||
|
||
let wait (Pid p : Pid.t) = Computation.wait p.terminated | ||
|
||
let spawn main = | ||
let (Pid p as t) : Pid.t = | ||
let computation = Computation.create ~mode:`LIFO () | ||
and terminated = Computation.create ~mode:`LIFO () | ||
and incoming = Atomic.make (In Nil) in | ||
Pid { computation; terminated; incoming; received = Nil } | ||
in | ||
let fiber = Fiber.create ~forbid:false p.computation in | ||
Fiber.FLS.set fiber Pid.key t; | ||
begin | ||
Fiber.spawn fiber @@ fun fiber -> | ||
let (Pid p) : Pid.t = self_of fiber in | ||
(* An unhandled exception except [Terminate] will be treated as a fatal | ||
error. *) | ||
begin | ||
match main () with | ||
| () | (exception Terminate) -> Computation.finish p.terminated | ||
end; | ||
Computation.finish p.computation | ||
end; | ||
t | ||
|
||
let rec rev_to (Message _ as ms : [ `Message ] tdt) : | ||
[ `Nil | `Message ] tdt -> _ = function | ||
| Nil -> ms | ||
| Message r -> rev_to (Message { message = r.message; next = ms }) r.next | ||
|
||
let rev (Message r : [ `Message ] tdt) = | ||
rev_to (Message { message = r.message; next = Nil }) r.next | ||
|
||
let rec receive (Pid p as t : Pid.t) = | ||
match Atomic.get p.incoming with | ||
| In Nil as before -> | ||
let trigger = Trigger.create () in | ||
let after = In (Wait trigger) in | ||
if Atomic.compare_and_set p.incoming before after then begin | ||
match Trigger.await trigger with | ||
| None -> () | ||
| Some (exn, bt) -> | ||
(* At this point the trigger has been signaled and cannot leak | ||
arbitrary amoun of space. There is no need to remove it. *) | ||
Printexc.raise_with_backtrace exn bt | ||
end; | ||
receive t | ||
| _ -> begin | ||
match Atomic.exchange p.incoming (In Nil) with | ||
| In (Wait _ | Nil) -> impossible () | ||
| In (Message _ as ms) -> | ||
let (Message r : [ `Message ] tdt) = rev ms in | ||
p.received <- r.next; | ||
r.message | ||
end | ||
|
||
let receive () = | ||
let (Pid p as t) = self () in | ||
match p.received with | ||
| Message r -> | ||
p.received <- r.next; | ||
r.message | ||
| Nil -> receive t | ||
|
||
let rec send (Pid p as t : Pid.t) message backoff = | ||
match Atomic.get p.incoming with | ||
| In ((Nil | Message _) as before) -> | ||
let after = Message { message; next = before } in | ||
if not (Atomic.compare_and_set p.incoming (In before) (In after)) then | ||
send t message (Backoff.once backoff) | ||
| In (Wait trigger as before) -> | ||
let after = Message { message; next = Nil } in | ||
if Atomic.compare_and_set p.incoming (In before) (In after) then | ||
Trigger.signal trigger | ||
else send t message (Backoff.once backoff) | ||
|
||
let[@inline] send t message = send t message Backoff.default | ||
|
||
type Message.t += Terminated of Pid.t | ||
|
||
let monitor ~at ~the:(Pid the_p as the : Pid.t) = | ||
let[@alert "-handler"] trigger = | ||
Trigger.from_action at the @@ fun _ at the -> send at (Terminated the) | ||
in | ||
if not (Computation.try_attach the_p.terminated trigger) then | ||
send at (Terminated the) | ||
|
||
let empty_bt = Printexc.get_callstack 0 | ||
|
||
let link (Pid p1 as t1 : Pid.t) (Pid p2 as t2 : Pid.t) = | ||
let[@alert "-handler"] trigger = | ||
Trigger.from_action t1 t2 @@ fun _ (Pid p1 : Pid.t) (Pid p2 : Pid.t) -> | ||
Computation.cancel p1.computation Terminate empty_bt; | ||
Computation.cancel p2.computation Terminate empty_bt | ||
in | ||
if | ||
(not (Computation.try_attach p1.terminated trigger)) | ||
|| not (Computation.try_attach p2.terminated trigger) | ||
then begin | ||
Computation.cancel p1.computation Terminate empty_bt; | ||
Computation.cancel p2.computation Terminate empty_bt | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
module Hoot = Hoot |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
(** The beginnings of an actor model implementation loosely inspired by | ||
{{:https://github.com/riot-ml/riot} Riot}. | ||
⚠️ This is meant as an example only and would require further development to | ||
become a production ready actor model implementation. *) | ||
|
||
(** {1 Modules} *) | ||
|
||
module Hoot : sig | ||
(** {{:https://www.merriam-webster.com/thesaurus/riot} Hoot} is an actor model | ||
core loosely inspired by {{:https://github.com/riot-ml/riot} Riot}. *) | ||
|
||
exception Terminate | ||
(** Exception used by {!link} to terminate actor processes. | ||
An unhandled [Terminate] exception is not treated as an error. *) | ||
|
||
val run : (unit -> unit) -> unit | ||
(** [run main] establishes a new actor process on the current fiber and runs | ||
[main] as the process. | ||
⚠️ An unhandled exception, aside from {!Terminate}, raised from [main ()] | ||
will be raised from [run main]. Unlike with {!spawn} an unhandled | ||
exception is not treated as a fatal error. | ||
This can be called from any fiber, even from another actor process, which | ||
would then effectively get suspended while running [main], but typically | ||
this is used to start a "scope" for running actors. *) | ||
|
||
module Pid : sig | ||
(** Actor process or process identifier. *) | ||
|
||
type t | ||
end | ||
|
||
val spawn : (unit -> unit) -> Pid.t | ||
(** [spawn main] creates a new actor process to run [main]. | ||
⚠️ An unhandled exception, aside from {!Terminate}, raised from [main ()] | ||
will be treated as a fatal error and will either exit the entire program | ||
or stop the scheduler without completing other fibers. | ||
This can be called from any fiber, even from fibers that are not actor | ||
processes, but typically this would be used within a "scope" for running | ||
actors. *) | ||
|
||
val self : unit -> Pid.t | ||
(** [self ()], when called within an actor process, returns the {{!Pid.t} | ||
process identifier} of the actor process. | ||
@raise Invalid_argument when called outside of an actor process. *) | ||
|
||
val wait : Pid.t -> unit | ||
(** [wait pid] blocks until the specified actor process has terminated. *) | ||
|
||
module Message : sig | ||
(** Extensible message type. *) | ||
|
||
type t = .. | ||
end | ||
|
||
val receive : unit -> Message.t | ||
(** [receive ()] waits until at least one message has been added to the | ||
mailbox of the current process and then removes and returns the least | ||
recently added message from the mailbox. *) | ||
|
||
val send : Pid.t -> Message.t -> unit | ||
(** [send pid message] adds the given [message] to the mailbox of the process | ||
[pid]. | ||
ℹ️ Sending a message to a process that has already terminated is not | ||
considered an error. *) | ||
|
||
type Message.t += | ||
| Terminated of Pid.t | ||
(** Sent by the {!monitor} mechanism to notify of process | ||
termination. *) | ||
|
||
val monitor : at:Pid.t -> the:Pid.t -> unit | ||
(** [monitor ~at:observer ~the:subject] makes it so that when the [subject] | ||
process terminates the message {{!Terminated} [Terminated subject]} is | ||
{{!send} sent} to the [observer] process. *) | ||
|
||
val link : Pid.t -> Pid.t -> unit | ||
(** [link pid1 pid2] makes it so that when either one of the given processes | ||
terminates the other process will also be terminated with the [Terminate] | ||
exception. | ||
In case either one of the given processes is already terminated when | ||
[link] is called, the other process will then be terminated. *) | ||
end | ||
|
||
(** {1 Examples} *) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,11 @@ | ||
{0 Integration tests for Picos packages} | ||
{0 Integration tests and additional examples for Picos packages} | ||
|
||
This package contains integration (and other kinds of) tests for the core | ||
{!Picos} interface and other Picos libraries. A separate package is used to | ||
allow the dependencies of and between different Picos packages to be simplified. | ||
This package contains additional examples and integration (and other kinds of) | ||
tests for the core {!Picos} interface and other Picos libraries. A separate | ||
package is used to allow the dependencies of and between different Picos | ||
packages to be simplified. | ||
|
||
{1 Libraries} | ||
|
||
{!modules: | ||
Picos_meta_hoot} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters