-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Additional communication and synchronization primitives
- Loading branch information
Showing
17 changed files
with
615 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
open Kcas | ||
|
||
(* TODO: The semantics are not quite right here *) | ||
|
||
type 'a t = { | ||
givers : 'a option Loc.t Queue.t; | ||
takers : [ `Accepting | `Offer of 'a | `Finished ] Loc.t Queue.t; | ||
} | ||
|
||
let create () = | ||
let givers = Queue.create () and takers = Queue.create () in | ||
{ givers; takers } | ||
|
||
module Xt = struct | ||
let rec try_give ~xt ch value = | ||
match Queue.Xt.take_opt ~xt ch.takers with | ||
| None -> false | ||
| Some slot -> ( | ||
match Xt.compare_and_swap ~xt slot `Accepting (`Offer value) with | ||
| `Accepting -> true | ||
| `Finished -> try_give ~xt ch value | ||
| `Offer _ -> Retry.later ()) | ||
|
||
let rec take_opt ~xt ch = | ||
match Queue.Xt.take_opt ~xt ch.givers with | ||
| None -> None | ||
| Some slot -> ( | ||
match Xt.exchange ~xt slot None with | ||
| None -> take_opt ~xt ch | ||
| Some _ as offer -> offer) | ||
end | ||
|
||
let give ch value = | ||
let tx ~xt = | ||
if Xt.try_give ~xt ch value then None | ||
else | ||
let offer = Some value in | ||
let slot = Loc.make offer in | ||
Queue.Xt.add ~xt slot ch.givers; | ||
Some (slot, offer) | ||
in | ||
match Kcas.Xt.commit { tx } with | ||
| None -> () | ||
| Some (slot, offer) -> ( | ||
try Loc.get_as (fun offer -> Retry.unless (offer == None)) slot | ||
with exn -> | ||
Loc.compare_and_set slot offer None |> ignore; | ||
raise exn) | ||
|
||
let rec take ch = | ||
let tx ~xt = | ||
match Xt.take_opt ~xt ch with | ||
| None -> | ||
let slot = Loc.make `Accepting in | ||
Queue.Xt.add ~xt slot ch.takers; | ||
`Block slot | ||
| Some value -> `Offer value | ||
in | ||
match Kcas.Xt.commit { tx } with | ||
| `Offer value -> value | ||
| `Block slot -> ( | ||
let tx ~xt = | ||
match Kcas.Xt.exchange ~xt slot `Finished with | ||
| `Offer value -> Some value | ||
| `Finished -> None | ||
| `Accepting -> Retry.later () | ||
in | ||
match Kcas.Xt.commit { tx } with | ||
| None -> take ch | ||
| Some value -> value | ||
| exception exn -> | ||
Loc.compare_and_set slot `Accepting `Finished |> ignore; | ||
raise exn) | ||
|
||
let take_opt ch = Kcas.Xt.commit { tx = Xt.take_opt ch } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
open Kcas | ||
|
||
type 'a t | ||
(** Type of a synchronous channel. *) | ||
|
||
val create : unit -> 'a t | ||
(** [create ()] returns a new synchronous channel. *) | ||
|
||
module Xt : sig | ||
val try_give : xt:'x Xt.t -> 'a t -> 'a -> bool | ||
(** *) | ||
|
||
val take_opt : xt:'x Xt.t -> 'a t -> 'a option | ||
(** *) | ||
end | ||
|
||
val give : 'a t -> 'a -> unit | ||
(** *) | ||
|
||
val take : 'a t -> 'a | ||
(** *) | ||
|
||
val take_opt : 'a t -> 'a option | ||
(** *) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
type t = Waiters.t | ||
|
||
let create = Waiters.create | ||
let signal = Waiters.signal | ||
let broadcast = Waiters.broadcast | ||
|
||
let await cond mutex = | ||
let self = Waiters.enqueue cond in | ||
Mutex.unlock mutex; | ||
Fun.protect | ||
~finally:(fun () -> Mutex.lock mutex ~await:`Protected) | ||
(fun () -> Waiters.await self ~on_cancel:signal ~the:cond) | ||
|
||
let await_no_mutex cond = | ||
Waiters.enqueue cond |> Waiters.await ~on_cancel:signal ~the:cond | ||
|
||
module Xt = struct | ||
let signal = Waiters.Xt.signal ~on_none:(fun ~xt:_ -> ()) | ||
let broadcast = Waiters.Xt.broadcast | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
open Kcas | ||
|
||
(** Condition variable. *) | ||
|
||
(** {1 Common interface} *) | ||
|
||
type t | ||
(** *) | ||
|
||
val create : unit -> t | ||
(** *) | ||
|
||
(** {1 Compositional interface} *) | ||
|
||
module Xt : | ||
Condition_intf.Ops | ||
with type t := t | ||
with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn | ||
|
||
(** {1 Non-compositional interface} *) | ||
|
||
include Condition_intf.Ops with type t := t with type ('x, 'fn) fn := 'fn | ||
|
||
val await : t -> Mutex.t -> unit | ||
(** *) | ||
|
||
val await_no_mutex : t -> unit | ||
(** *) | ||
|
||
val broadcast : t -> unit | ||
(** *) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
module type Ops = sig | ||
type t | ||
type ('x, 'fn) fn | ||
|
||
val signal : ('x, t -> unit) fn | ||
(** *) | ||
|
||
val broadcast : ('x, t -> unit) fn | ||
(** *) | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
open Kcas | ||
|
||
type t = { | ||
state : [ `Unlocked | `Locked | `Poisoned of exn ] Loc.t; | ||
waiters : Waiters.t; | ||
} | ||
|
||
exception Poisoned of exn | ||
|
||
let poisoned exn = raise @@ Poisoned exn [@@inline never] | ||
|
||
let create () = | ||
let state = Loc.make `Unlocked and waiters = Waiters.create () in | ||
{ state; waiters } | ||
|
||
let unlock mutex = | ||
let tx ~xt = | ||
match Xt.get ~xt mutex.state with | ||
| `Locked -> | ||
Waiters.Xt.signal ~xt mutex.waiters | ||
~on_none:(Xt.set mutex.state `Unlocked); | ||
None | ||
| `Unlocked -> | ||
let exn = Sys_error "Mutex.unlock: already unlocked!" in | ||
Xt.set ~xt mutex.state @@ `Poisoned exn; | ||
Some exn | ||
| `Poisoned exn -> Some (Poisoned exn) | ||
in | ||
Xt.commit { tx } |> Option.iter raise | ||
|
||
let lock ?await mutex = | ||
let tx ~xt = | ||
match Xt.compare_and_swap ~xt mutex.state `Unlocked `Locked with | ||
| `Unlocked -> None | ||
| `Locked -> Some (Waiters.Xt.enqueue ~xt mutex.waiters) | ||
| `Poisoned exn -> poisoned exn | ||
in | ||
Xt.commit { tx } | ||
|> Option.iter (Waiters.await ?await ~on_cancel:unlock ~the:mutex) | ||
|
||
let try_lock mutex = | ||
match | ||
Loc.update mutex.state @@ function | ||
| `Unlocked -> `Locked | ||
| (`Locked | `Poisoned _) as other -> other | ||
with | ||
| `Unlocked -> true | ||
| `Locked -> false | ||
| `Poisoned exn -> poisoned exn | ||
|
||
let poison mutex exn = | ||
match | ||
Loc.update mutex.state @@ function | ||
| `Locked | `Unlocked -> `Poisoned exn | ||
| `Poisoned _ as poisoned -> poisoned | ||
with | ||
| `Locked | `Unlocked -> Waiters.broadcast mutex.waiters | ||
| `Poisoned _ -> () | ||
|
||
let use_rw mutex thunk = | ||
lock mutex; | ||
match thunk () with | ||
| value -> | ||
unlock mutex; | ||
value | ||
| exception exn -> | ||
poison mutex exn; | ||
raise exn | ||
|
||
let use_ro mutex thunk = | ||
lock mutex; | ||
Fun.protect ~finally:(fun () -> unlock mutex) thunk | ||
|
||
module Xt = struct | ||
let lock ~xt mutex = | ||
match Xt.compare_and_swap ~xt mutex.state `Unlocked `Locked with | ||
| `Unlocked -> () | ||
| `Locked -> Retry.later () | ||
| `Poisoned exn -> poisoned exn | ||
|
||
let try_lock ~xt mutex = | ||
match Xt.compare_and_swap ~xt mutex.state `Unlocked `Locked with | ||
| `Unlocked -> true | ||
| `Locked -> false | ||
| `Poisoned exn -> poisoned exn | ||
|
||
let unlock ~xt mutex = | ||
match Xt.get ~xt mutex.state with | ||
| `Locked -> | ||
Waiters.Xt.signal ~xt mutex.waiters | ||
~on_none:(Xt.set mutex.state `Unlocked) | ||
| `Unlocked -> | ||
let exn = Sys_error "Mutex.unlock: already unlocked!" in | ||
Xt.set ~xt mutex.state @@ `Poisoned exn; | ||
raise exn | ||
| `Poisoned exn -> poisoned exn | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
open Kcas | ||
|
||
(** Mutual exclusion. *) | ||
|
||
(* TODO: Consider ordering of mutexes *) | ||
|
||
(** {1 Common interface} *) | ||
|
||
exception Poisoned of exn | ||
(** *) | ||
|
||
type t | ||
(** *) | ||
|
||
val create : unit -> t | ||
(** *) | ||
|
||
(** {1 Compositional interface} *) | ||
|
||
module Xt : sig | ||
val lock : xt:'x Xt.t -> t -> unit | ||
(** *) | ||
|
||
val try_lock : xt:'x Xt.t -> t -> bool | ||
(** *) | ||
|
||
val unlock : xt:'x Xt.t -> t -> unit | ||
(** *) | ||
end | ||
|
||
(** {1 Non-compositional interface} *) | ||
|
||
val lock : ?await:[ `Cancelable | `Protected ] -> t -> unit | ||
(** *) | ||
|
||
val try_lock : t -> bool | ||
(** *) | ||
|
||
val unlock : t -> unit | ||
(** *) | ||
|
||
val use_rw : t -> (unit -> 'a) -> 'a | ||
(** *) | ||
|
||
val use_ro : t -> (unit -> 'a) -> 'a | ||
(** *) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
open Kcas | ||
|
||
type 'a t = 'a Magic_option.t Loc.t | ||
|
||
let create () = Loc.make Magic_option.none | ||
|
||
module Xt = struct | ||
let is_empty ~xt mv = Magic_option.is_none (Xt.get ~xt mv) | ||
|
||
let put_some ~xt mv some = | ||
Retry.unless (Magic_option.is_none (Xt.exchange ~xt mv some)) | ||
|
||
let put ~xt mv value = put_some ~xt mv (Magic_option.some value) | ||
|
||
let take_opt ~xt mv = | ||
Magic_option.to_option (Xt.exchange ~xt mv Magic_option.none) | ||
|
||
let take ~xt mv = | ||
Magic_option.get_or_retry (Xt.exchange ~xt mv Magic_option.none) | ||
end | ||
|
||
let is_empty mv = Magic_option.is_none (Loc.get mv) | ||
|
||
let put mv value = | ||
Loc.modify mv @@ fun current -> | ||
if Magic_option.is_none current then Magic_option.some value | ||
else Retry.later () | ||
|
||
let take mv = | ||
Magic_option.get_unsafe @@ Loc.update mv | ||
@@ fun current -> | ||
if Magic_option.is_none current then Retry.later () else Magic_option.none | ||
|
||
let take_opt mv = Magic_option.to_option (Loc.exchange mv Magic_option.none) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
open Kcas | ||
|
||
(** Synchronizing variable. *) | ||
|
||
(** {1 Common interface} *) | ||
|
||
type !'a t | ||
(** The type of a synchronizing variable that may contain a value of type | ||
['a]. *) | ||
|
||
val create : unit -> 'a t | ||
(** [create ()] returns a new empty synchronizing variable. *) | ||
|
||
(** {1 Compositional interface} *) | ||
|
||
module Xt : | ||
Mvar_intf.Ops | ||
with type 'a t := 'a t | ||
with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn | ||
(** Explicit transaction passing on synchronizing variables. *) | ||
|
||
(** {1 Non-compositional interface} *) | ||
|
||
include Mvar_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := 'fn |
Oops, something went wrong.