Skip to content

Commit

Permalink
Additional communication and synchronization primitives
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Dec 31, 2023
1 parent ab19abe commit c69065c
Show file tree
Hide file tree
Showing 14 changed files with 537 additions and 0 deletions.
75 changes: 75 additions & 0 deletions src/kcas_data/ch.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
open Kcas

(* TODO: The semantics are not quite right here *)

type 'a t = {
givers : 'a option Loc.t Queue.t;
takers : [ `Accepting | `Offer of 'a | `Finished ] Loc.t Queue.t;
}

let create () =
let givers = Queue.create () and takers = Queue.create () in
{ givers; takers }

module Xt = struct
let rec try_give ~xt ch value =
match Queue.Xt.take_opt ~xt ch.takers with
| None -> false
| Some slot -> (
match Xt.compare_and_swap ~xt slot `Accepting (`Offer value) with
| `Accepting -> true
| `Finished -> try_give ~xt ch value
| `Offer _ -> Retry.later ())

let rec take_opt ~xt ch =
match Queue.Xt.take_opt ~xt ch.givers with
| None -> None
| Some slot -> (
match Xt.exchange ~xt slot None with
| None -> take_opt ~xt ch
| Some _ as offer -> offer)
end

let give ch value =
let tx ~xt =
if Xt.try_give ~xt ch value then None
else
let offer = Some value in
let slot = Loc.make offer in
Queue.Xt.add ~xt slot ch.givers;
Some (slot, offer)
in
match Kcas.Xt.commit { tx } with
| None -> ()
| Some (slot, offer) -> (
try Loc.get_as (fun offer -> Retry.unless (offer == None)) slot
with exn ->
Loc.compare_and_set slot offer None |> ignore;
raise exn)

let rec take ch =
let tx ~xt =
match Xt.take_opt ~xt ch with
| None ->
let slot = Loc.make `Accepting in
Queue.Xt.add ~xt slot ch.takers;
`Block slot
| Some value -> `Offer value
in
match Kcas.Xt.commit { tx } with
| `Offer value -> value
| `Block slot -> (
let tx ~xt =
match Kcas.Xt.exchange ~xt slot `Finished with
| `Offer value -> Some value
| `Finished -> None
| `Accepting -> Retry.later ()
in
match Kcas.Xt.commit { tx } with
| None -> take ch
| Some value -> value
| exception exn ->
Loc.compare_and_set slot `Accepting `Finished |> ignore;
raise exn)

let take_opt ch = Kcas.Xt.commit { tx = Xt.take_opt ch }
24 changes: 24 additions & 0 deletions src/kcas_data/ch.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
open Kcas

type 'a t
(** Type of a synchronous channel. *)

val create : unit -> 'a t
(** [create ()] returns a new synchronous channel. *)

module Xt : sig
val try_give : xt:'x Xt.t -> 'a t -> 'a -> bool
(** *)

val take_opt : xt:'x Xt.t -> 'a t -> 'a option
(** *)
end

val give : 'a t -> 'a -> unit
(** *)

val take : 'a t -> 'a
(** *)

val take_opt : 'a t -> 'a option
(** *)
22 changes: 22 additions & 0 deletions src/kcas_data/condition.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
type t = Waiters.t

let create = Waiters.create
let signal = Waiters.signal
let broadcast = Waiters.broadcast

let await cond mutex =
let self = Waiters.enqueue cond in
Mutex.unlock mutex;
Fun.protect
(fun () -> Waiters.await self ~on_cancel:signal ~the:cond)
~finally:(fun () ->
(* TODO: This should be protected also from cancellation *)
Mutex.lock mutex)

let await_no_mutex cond =
Waiters.enqueue cond |> Waiters.await ~on_cancel:signal ~the:cond

module Xt = struct
let signal = Waiters.Xt.signal ~on_none:(fun ~xt:_ -> ())
let broadcast = Waiters.Xt.broadcast
end
31 changes: 31 additions & 0 deletions src/kcas_data/condition.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
open Kcas

(** Condition variable. *)

(** {1 Common interface} *)

type t
(** *)

val create : unit -> t
(** *)

(** {1 Compositional interface} *)

module Xt :
Condition_intf.Ops
with type t := t
with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn

(** {1 Non-compositional interface} *)

include Condition_intf.Ops with type t := t with type ('x, 'fn) fn := 'fn

val await : t -> Mutex.t -> unit
(** *)

val await_no_mutex : t -> unit
(** *)

val broadcast : t -> unit
(** *)
10 changes: 10 additions & 0 deletions src/kcas_data/condition_intf.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module type Ops = sig
type t
type ('x, 'fn) fn

val signal : ('x, t -> unit) fn
(** *)

val broadcast : ('x, t -> unit) fn
(** *)
end
5 changes: 5 additions & 0 deletions src/kcas_data/kcas_data.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,10 @@ module Queue = Queue
module Stack = Stack
module Mvar = Mvar
module Promise = Promise
module Ch = Ch
module Condition = Condition
module Mutex = Mutex
module Semaphore = Semaphore
module Stream = Stream
module Dllist = Dllist
module Accumulator = Accumulator
96 changes: 96 additions & 0 deletions src/kcas_data/mutex.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
open Kcas

type t = {
state : [ `Unlocked | `Locked | `Poisoned of exn ] Loc.t;
waiters : Waiters.t;
}

exception Poisoned of exn

let poisoned exn = raise @@ Poisoned exn [@@inline never]

let create () =
let state = Loc.make `Unlocked and waiters = Waiters.create () in
{ state; waiters }

let unlock mutex =
let tx ~xt =
match Xt.get ~xt mutex.state with
| `Locked ->
Waiters.Xt.signal ~xt mutex.waiters
~on_none:(Xt.set mutex.state `Unlocked);
None
| `Unlocked ->
let exn = Sys_error "Mutex.unlock: already unlocked!" in
Xt.set ~xt mutex.state @@ `Poisoned exn;
Some exn
| `Poisoned exn -> Some (Poisoned exn)
in
Xt.commit { tx } |> Option.iter raise

let lock mutex =
let tx ~xt =
match Xt.compare_and_swap ~xt mutex.state `Unlocked `Locked with
| `Unlocked -> None
| `Locked -> Some (Waiters.Xt.enqueue ~xt mutex.waiters)
| `Poisoned exn -> poisoned exn
in
Xt.commit { tx } |> Option.iter (Waiters.await ~on_cancel:unlock ~the:mutex)

let try_lock mutex =
match
Loc.update mutex.state @@ function
| `Unlocked -> `Locked
| (`Locked | `Poisoned _) as other -> other
with
| `Unlocked -> true
| `Locked -> false
| `Poisoned exn -> poisoned exn

let poison mutex exn =
match
Loc.update mutex.state @@ function
| `Locked | `Unlocked -> `Poisoned exn
| `Poisoned _ as poisoned -> poisoned
with
| `Locked | `Unlocked -> Waiters.broadcast mutex.waiters
| `Poisoned _ -> ()

let use_rw mutex thunk =
lock mutex;
match thunk () with
| value ->
unlock mutex;
value
| exception exn ->
poison mutex exn;
raise exn

let use_ro mutex thunk =
lock mutex;
Fun.protect ~finally:(fun () -> unlock mutex) thunk

module Xt = struct
let lock ~xt mutex =
match Xt.compare_and_swap ~xt mutex.state `Unlocked `Locked with
| `Unlocked -> ()
| `Locked -> Retry.later ()
| `Poisoned exn -> poisoned exn

let try_lock ~xt mutex =
match Xt.compare_and_swap ~xt mutex.state `Unlocked `Locked with
| `Unlocked -> true
| `Locked -> false
| `Poisoned exn -> poisoned exn

let unlock ~xt mutex =
match Xt.get ~xt mutex.state with
| `Locked ->
Waiters.Xt.signal ~xt mutex.waiters
~on_none:(Xt.set mutex.state `Unlocked)
| `Unlocked ->
let exn = Sys_error "Mutex.unlock: already unlocked!" in
Xt.set ~xt mutex.state @@ `Poisoned exn;
raise exn
| `Poisoned exn -> poisoned exn
end
46 changes: 46 additions & 0 deletions src/kcas_data/mutex.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
open Kcas

(** Mutual exclusion. *)

(* TODO: Consider ordering of mutexes *)

(** {1 Common interface} *)

exception Poisoned of exn
(** *)

type t
(** *)

val create : unit -> t
(** *)

(** {1 Compositional interface} *)

module Xt : sig
val lock : xt:'x Xt.t -> t -> unit
(** *)

val try_lock : xt:'x Xt.t -> t -> bool
(** *)

val unlock : xt:'x Xt.t -> t -> unit
(** *)
end

(** {1 Non-compositional interface} *)

val lock : t -> unit
(** *)

val try_lock : t -> bool
(** *)

val unlock : t -> unit
(** *)

val use_rw : t -> (unit -> 'a) -> 'a
(** *)

val use_ro : t -> (unit -> 'a) -> 'a
(** *)
31 changes: 31 additions & 0 deletions src/kcas_data/semaphore.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
open Kcas

let non_neg_dec n = if 0 < n then n - 1 else n

type t = { count : int Loc.t; waiters : Waiters.t }

let make n =
if n < 0 then invalid_arg "n < 0";
let count = Loc.make n and waiters = Waiters.create () in
{ count; waiters }

let get_value sem = Loc.get sem.count

module Xt = struct
let get_value ~xt sem = Xt.get ~xt sem.count

let release ~xt sem =
Waiters.Xt.signal ~xt sem.waiters ~on_none:(Xt.incr sem.count)

let acquire ~xt sem = Retry.unless (0 < Xt.fetch_and_add ~xt sem.count (-1))
end

let release sem = Kcas.Xt.commit { tx = Xt.release sem }

let acquire sem =
let tx ~xt =
if 0 < Kcas.Xt.update ~xt sem.count non_neg_dec then None
else Some (Waiters.Xt.enqueue ~xt sem.waiters)
in
Kcas.Xt.commit { tx }
|> Option.iter (Waiters.await ~on_cancel:release ~the:sem)
23 changes: 23 additions & 0 deletions src/kcas_data/semaphore.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
open Kcas

(** *)

(** {1 Common interface} *)

type t
(** *)

val make : int -> t
(** *)

(** {1 Compositional interface} *)

module Xt :
Semaphore_intf.Ops
with type t := t
with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn
(** Explicit transaction log passing on semaphores. *)

(** {1 Non-compositional interface} *)

include Semaphore_intf.Ops with type t := t with type ('x, 'fn) fn := 'fn
13 changes: 13 additions & 0 deletions src/kcas_data/semaphore_intf.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
module type Ops = sig
type t
type ('x, 'fn) fn

val release : ('x, t -> unit) fn
(** *)

val acquire : ('x, t -> unit) fn
(** *)

val get_value : ('x, t -> int) fn
(** *)
end
Loading

0 comments on commit c69065c

Please sign in to comment.