Skip to content

Commit

Permalink
Ws deque update : documentation improvement and drop functions (#168)
Browse files Browse the repository at this point in the history
 Improve documentation, add of_list, drop_exn and steal_drop.
  • Loading branch information
lyrm authored Dec 2, 2024
1 parent fc0e6a2 commit 2bb4d4b
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 54 deletions.
2 changes: 1 addition & 1 deletion src/dune
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,5 @@ let () =
(<> %{os_type} Win32)
(>= %{ocaml_version} 5.0.0)))
(libraries saturn)
(files treiber_stack.mli bounded_stack.mli))
(files treiber_stack.mli bounded_stack.mli ws_deque.mli))
|}
39 changes: 32 additions & 7 deletions src/ws_deque.ml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,22 @@ let create () =
let top_cache = ref 0 |> Multicore_magic.copy_as_padded in
{ top; bottom; top_cache; tab } |> Multicore_magic.copy_as_padded

let next_pow2 n =
let rec loop acc = if acc >= n then acc else loop (acc lsl 1) in
loop 1

let of_list l =
let len = List.length l in
let capacity = max min_capacity (next_pow2 len) in
let top = Atomic.make_contended 0 in
let tab = Array.make capacity (Obj.magic ()) in
List.iteri (fun i x -> Array.unsafe_set tab i (ref x)) l;
let bottom = Atomic.make_contended len in
let top_cache = ref 0 |> Multicore_magic.copy_as_padded in
{ top; bottom; top_cache; tab } |> Multicore_magic.copy_as_padded

(* *)

let realloc a t b sz new_sz =
let new_a = Array.make new_sz (Obj.magic ()) in
ArrayExtra.blit_circularly a
Expand Down Expand Up @@ -81,7 +97,12 @@ let push q v =
q.tab <- a;
Atomic.incr q.bottom

type ('a, _) poly = Option : ('a, 'a option) poly | Value : ('a, 'a) poly
(* *)

type ('a, _) poly =
| Option : ('a, 'a option) poly
| Value : ('a, 'a) poly
| Unit : ('a, unit) poly

exception Empty

Expand All @@ -100,7 +121,7 @@ let pop_as : type a r. a t -> (a, r) poly -> r =
out := Obj.magic ();
if size + size + size <= capacity - min_capacity then
q.tab <- realloc a t b capacity (capacity lsr 1);
match poly with Option -> Some res | Value -> res
match poly with Option -> Some res | Value -> res | Unit -> ()
end
else if b = t then begin
(* Whether or not the [compare_and_set] below succeeds, [top_cache] can be
Expand All @@ -115,19 +136,22 @@ let pop_as : type a r. a t -> (a, r) poly -> r =
let out = Array.unsafe_get a (b land (Array.length a - 1)) in
let res = !out in
out := Obj.magic ();
match poly with Option -> Some res | Value -> res
match poly with Option -> Some res | Value -> res | Unit -> ()
end
else match poly with Option -> None | Value -> raise Empty
else match poly with Option -> None | Value | Unit -> raise Empty
end
else begin
(* This write of [bottom] requires no fence. The deque is empty and
remains so until the next [push]. *)
Atomic.fenceless_set q.bottom (b + 1);
match poly with Option -> None | Value -> raise Empty
match poly with Option -> None | Value | Unit -> raise Empty
end

let pop_exn q = pop_as q Value
let pop_opt q = pop_as q Option
let drop_exn q = pop_as q Unit

(* *)

let rec steal_as : type a r. a t -> Backoff.t -> (a, r) poly -> r =
fun q backoff poly ->
Expand All @@ -143,10 +167,11 @@ let rec steal_as : type a r. a t -> Backoff.t -> (a, r) poly -> r =
if Atomic.compare_and_set q.top t (t + 1) then begin
let res = !out in
out := Obj.magic ();
match poly with Option -> Some res | Value -> res
match poly with Option -> Some res | Value -> res | Unit -> ()
end
else steal_as q (Backoff.once backoff) poly
else match poly with Option -> None | Value -> raise Empty
else match poly with Option -> None | Value | Unit -> raise Empty

let steal_exn q = steal_as q Backoff.default Value
let steal_opt q = steal_as q Backoff.default Option
let steal_drop_exn q = steal_as q Backoff.default Unit
157 changes: 129 additions & 28 deletions src/ws_deque.mli
Original file line number Diff line number Diff line change
@@ -1,55 +1,156 @@
(** Lock-free single-producer, multi-consumer dynamic-size double-ended queue (deque).
The main strength of deque in a typical work-stealing setup with per-core structure
is efficient work distribution. Owner uses [push] and [pop] method to operate at
one end of the deque, while other (free) cores can efficiently steal work on the
other side.
The main strength of a deque in a typical work-stealing setup with a
per-core structure, is efficient work distribution. The owner uses [push]
and [pop] methods to operate at one end of the deque, while other (free)
cores can efficiently steal work from the other side.
This approach is great for throughput. Stealers and owner working on different sides
reduces contention in work distribution. Further, local LIFO order runs related tasks
one after one improves locality.
This approach is great for throughput. Stealers and the owner working on
different sides, reduce contention in work distribution. Further, the
local LIFO order, running related tasks one after another, improves locality.
On the other hand, the local LIFO order does not offer any fairness guarantees.
Thus, it is not the best choice when tail latency matters.
On the other hand, the local LIFO order does not offer any fairness
guarantees. Thus, it is not the best choice when tail latency matters.
*)

(** {1 API} *)

type 'a t
(** Type of work-stealing queue *)

val create : unit -> 'a t
(** [create ()] returns a new empty work-stealing queue. *)

val of_list : 'a list -> 'a t
(** [of_list list] creates a new work-stealing queue from [list].
🐌 This is a linear-time operation.
{[
# open Saturn.Work_stealing_deque
# let t : int t = of_list [1;2;3;4]
val t : int t = <abstr>
# pop_opt t
- : int option = Some 4
# pop_opt t
- : int option = Some 3
]}
*)

exception Empty

(** {1 Queue owner functions} *)
(** {2 Queue owner functions} *)

val push : 'a t -> 'a -> unit
(** [push t v] adds [v] to the front of the queue [q].
It should only be invoked by the domain which owns the queue [q]. *)
(** [push queue element] adds [element] at the end of the [queue].
It should only be invoked by the domain that owns the [queue]. *)

val pop_exn : 'a t -> 'a
(** [pop_exn q] removes and returns the first element in queue
[q].It should only be invoked by the domain which owns the queue
[q].
(** [pop_exn queue] removes and returns the last element of the [queue]. It
should only be invoked by the domain that owns the [queue].
@raise Exit if the queue is empty.
*)
@raises Empty if the [queue] is empty. *)

val pop_opt : 'a t -> 'a option
(** [pop_opt q] removes and returns the first element in queue [q], or
returns [None] if the queue is empty. *)
(** [pop_opt queue] removes and returns [Some] of the last element of the
[queue], or returns [None] if the [queue] is empty. It should only
be invoked by the domain that owns the [queue]. *)

(** {1 Stealers function} *)
val drop_exn : 'a t -> unit
(** [drop_exn queue] removes the last element of the [queue]. It should only
be invoked by the domain that owns the [queue].
@raises Empty if the [queue] is empty. *)

(** {2 Stealer functions} *)

val steal_exn : 'a t -> 'a
(** [steal_exn q] removes and returns the last element from queue
[q]. It should only be invoked by domain which doesn't own the
queue [q].
(** [steal_exn queue] removes and returns the first element of the [queue].
@raise Exit if the queue is empty.
*)
@raises Empty if the [queue] is empty. *)

val steal_opt : 'a t -> 'a option
(** [steal_opt q] removes and returns the last element from queue
[q], or returns [None] if the queue is empty. It should only be
invoked by domain which doesn't own the queue [q]. *)
(** [steal_opt queue] removes and returns [Some] of the first element of the
[queue], or returns [None] if the [queue] is empty. *)

val steal_drop_exn : 'a t -> unit
(** [steal_drop_exn queue] removes the first element of the [queue].
@raises Empty if the [queue] is empty. *)

(** {1 Examples} *)

(** {2 Sequential example}
An example top-level session:
{[
# open Saturn.Work_stealing_deque
# let t : int t = of_list [1;2;3;4;5;6]
val t : int t = <abstr>
# pop_opt t
- : int option = Some 6
# steal_opt t
- : int option = Some 1
# drop_exn t
- : unit = ()
# pop_opt t
- : int option = Some 4
# steal_drop_exn t
- : unit = ()
# steal_exn t
- : int = 3
# steal_exn t
Exception: Saturn__Ws_deque.Empty.
]}
*)

(** {2 Multicore example}
Note: The barrier is used in this example solely to make the results more
interesting by increasing the likelihood of parallelism. Spawning a domain is
a costly operation, especially compared to the relatively small amount of work
being performed here. In practice, using a barrier in this manner is unnecessary.
{@ocaml non-deterministic=command[
# open Saturn.Work_stealing_deque
# let t : int t = create ()
val t : int t = <abstr>
# let barrier = Atomic.make 3
val barrier : int Atomic.t = <abstr>
# let owner () =
Atomic.decr barrier;
while Atomic.get barrier <> 0 do Domain.cpu_relax () done;
for i = 1 to 10 do push t i; Domain.cpu_relax () done
val owner : unit -> unit = <fun>
# let stealer id () =
Atomic.decr barrier;
while Atomic.get barrier <> 0 do Domain.cpu_relax () done;
for _ = 1 to 5 do
match steal_opt t with
| None -> ()
| Some v -> Format.printf "Stealer %s stole %d@." id v
done
val stealer : string -> unit -> unit = <fun>
# let stealerA = Domain.spawn (stealer "A")
val stealerA : unit Domain.t = <abstr>
# let stealerB = Domain.spawn (stealer "B")
val stealerB : unit Domain.t = <abstr>
# owner ()
Stealer A stole 1
Stealer B stole 2
Stealer A stole 3
Stealer B stole 4
Stealer A stole 5
Stealer A stole 7
Stealer B stole 6
Stealer A stole 8
Stealer B stole 9
Stealer B stole 10
- : unit = ()
# Domain.join stealerA; Domain.join stealerB
- : unit = ()
]}
*)
22 changes: 18 additions & 4 deletions test/ws_deque/stm_ws_deque.ml
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ open Util
module Ws_deque = Saturn.Work_stealing_deque

module Spec = struct
type cmd = Push of int | Pop | Steal
type cmd = Push of int | Pop | Drop | Steal | Steal_drop

let show_cmd c =
match c with
| Push i -> "Push " ^ string_of_int i
| Pop -> "Pop"
| Drop -> "Drop"
| Steal -> "Steal"
| Steal_drop -> "Steal_drop"

type state = int list
type sut = int Ws_deque.t
Expand All @@ -24,11 +26,15 @@ module Spec = struct
[
Gen.map (fun i -> Push i) int_gen;
Gen.return Pop;
(*Gen.return Steal;*)
(* No point in stealing from yourself :-D *)
Gen.return Drop;
Gen.return Steal;
Gen.return Steal_drop;
])

let stealer_cmd _s = QCheck.make ~print:show_cmd (Gen.return Steal)
let stealer_cmd _s =
QCheck.make ~print:show_cmd
(Gen.oneof [ Gen.return Steal; Gen.return Steal_drop ])

let init_state = []
let init_sut () = Ws_deque.create ()
let cleanup _ = ()
Expand All @@ -40,23 +46,31 @@ module Spec = struct
(*if i<>1213 then i::s else s*)
(* an artificial fault *)
| Pop -> ( match s with [] -> s | _ :: s' -> s')
| Drop -> ( match s with [] -> s | _ :: s' -> s')
| Steal -> ( match List.rev s with [] -> s | _ :: s' -> List.rev s')
| Steal_drop -> ( match List.rev s with [] -> s | _ :: s' -> List.rev s')

let precond _ _ = true

let run c d =
match c with
| Push i -> Res (unit, Ws_deque.push d i)
| Pop -> Res (result int exn, protect Ws_deque.pop_exn d)
| Drop -> Res (result unit exn, protect Ws_deque.drop_exn d)
| Steal -> Res (result int exn, protect Ws_deque.steal_exn d)
| Steal_drop -> Res (result unit exn, protect Ws_deque.steal_drop_exn d)

let postcond c (s : state) res =
match (c, res) with
| Push _, Res ((Unit, _), _) -> true
| Pop, Res ((Result (Int, Exn), _), res) -> (
match s with [] -> res = Error Ws_deque.Empty | j :: _ -> res = Ok j)
| Drop, Res ((Result (Unit, Exn), _), res) -> (
match s with [] -> res = Error Ws_deque.Empty | _ -> res = Ok ())
| Steal, Res ((Result (Int, Exn), _), res) -> (
match List.rev s with [] -> Result.is_error res | j :: _ -> res = Ok j)
| Steal_drop, Res ((Result (Unit, Exn), _), res) -> (
match List.rev s with [] -> Result.is_error res | _ -> res = Ok ())
| _, _ -> false
end

Expand Down
Loading

0 comments on commit 2bb4d4b

Please sign in to comment.