diff --git a/src/dune b/src/dune index c4d88b1c..0d23379e 100644 --- a/src/dune +++ b/src/dune @@ -42,5 +42,5 @@ let () = (<> %{os_type} Win32) (>= %{ocaml_version} 5.0.0))) (libraries saturn) - (files treiber_stack.mli bounded_stack.mli)) + (files treiber_stack.mli bounded_stack.mli ws_deque.mli)) |} diff --git a/src/ws_deque.ml b/src/ws_deque.ml index 5a5b6bd7..31995b36 100644 --- a/src/ws_deque.ml +++ b/src/ws_deque.ml @@ -47,6 +47,22 @@ let create () = let top_cache = ref 0 |> Multicore_magic.copy_as_padded in { top; bottom; top_cache; tab } |> Multicore_magic.copy_as_padded +let next_pow2 n = + let rec loop acc = if acc >= n then acc else loop (acc lsl 1) in + loop 1 + +let of_list l = + let len = List.length l in + let capacity = max min_capacity (next_pow2 len) in + let top = Atomic.make_contended 0 in + let tab = Array.make capacity (Obj.magic ()) in + List.iteri (fun i x -> Array.unsafe_set tab i (ref x)) l; + let bottom = Atomic.make_contended len in + let top_cache = ref 0 |> Multicore_magic.copy_as_padded in + { top; bottom; top_cache; tab } |> Multicore_magic.copy_as_padded + +(* *) + let realloc a t b sz new_sz = let new_a = Array.make new_sz (Obj.magic ()) in ArrayExtra.blit_circularly a @@ -81,7 +97,12 @@ let push q v = q.tab <- a; Atomic.incr q.bottom -type ('a, _) poly = Option : ('a, 'a option) poly | Value : ('a, 'a) poly +(* *) + +type ('a, _) poly = + | Option : ('a, 'a option) poly + | Value : ('a, 'a) poly + | Unit : ('a, unit) poly exception Empty @@ -100,7 +121,7 @@ let pop_as : type a r. a t -> (a, r) poly -> r = out := Obj.magic (); if size + size + size <= capacity - min_capacity then q.tab <- realloc a t b capacity (capacity lsr 1); - match poly with Option -> Some res | Value -> res + match poly with Option -> Some res | Value -> res | Unit -> () end else if b = t then begin (* Whether or not the [compare_and_set] below succeeds, [top_cache] can be @@ -115,19 +136,22 @@ let pop_as : type a r. a t -> (a, r) poly -> r = let out = Array.unsafe_get a (b land (Array.length a - 1)) in let res = !out in out := Obj.magic (); - match poly with Option -> Some res | Value -> res + match poly with Option -> Some res | Value -> res | Unit -> () end - else match poly with Option -> None | Value -> raise Empty + else match poly with Option -> None | Value | Unit -> raise Empty end else begin (* This write of [bottom] requires no fence. The deque is empty and remains so until the next [push]. *) Atomic.fenceless_set q.bottom (b + 1); - match poly with Option -> None | Value -> raise Empty + match poly with Option -> None | Value | Unit -> raise Empty end let pop_exn q = pop_as q Value let pop_opt q = pop_as q Option +let drop_exn q = pop_as q Unit + +(* *) let rec steal_as : type a r. a t -> Backoff.t -> (a, r) poly -> r = fun q backoff poly -> @@ -143,10 +167,11 @@ let rec steal_as : type a r. a t -> Backoff.t -> (a, r) poly -> r = if Atomic.compare_and_set q.top t (t + 1) then begin let res = !out in out := Obj.magic (); - match poly with Option -> Some res | Value -> res + match poly with Option -> Some res | Value -> res | Unit -> () end else steal_as q (Backoff.once backoff) poly - else match poly with Option -> None | Value -> raise Empty + else match poly with Option -> None | Value | Unit -> raise Empty let steal_exn q = steal_as q Backoff.default Value let steal_opt q = steal_as q Backoff.default Option +let steal_drop_exn q = steal_as q Backoff.default Unit diff --git a/src/ws_deque.mli b/src/ws_deque.mli index 5c111ea8..0e7beef9 100644 --- a/src/ws_deque.mli +++ b/src/ws_deque.mli @@ -1,55 +1,156 @@ (** Lock-free single-producer, multi-consumer dynamic-size double-ended queue (deque). - The main strength of deque in a typical work-stealing setup with per-core structure - is efficient work distribution. Owner uses [push] and [pop] method to operate at - one end of the deque, while other (free) cores can efficiently steal work on the - other side. + The main strength of a deque in a typical work-stealing setup with a + per-core structure, is efficient work distribution. The owner uses [push] + and [pop] methods to operate at one end of the deque, while other (free) + cores can efficiently steal work from the other side. - This approach is great for throughput. Stealers and owner working on different sides - reduces contention in work distribution. Further, local LIFO order runs related tasks - one after one improves locality. + This approach is great for throughput. Stealers and the owner working on + different sides, reduce contention in work distribution. Further, the + local LIFO order, running related tasks one after another, improves locality. - On the other hand, the local LIFO order does not offer any fairness guarantees. - Thus, it is not the best choice when tail latency matters. + On the other hand, the local LIFO order does not offer any fairness + guarantees. Thus, it is not the best choice when tail latency matters. *) +(** {1 API} *) + type 'a t (** Type of work-stealing queue *) val create : unit -> 'a t (** [create ()] returns a new empty work-stealing queue. *) +val of_list : 'a list -> 'a t +(** [of_list list] creates a new work-stealing queue from [list]. + + 🐌 This is a linear-time operation. + + {[ + # open Saturn.Work_stealing_deque + # let t : int t = of_list [1;2;3;4] + val t : int t = + # pop_opt t + - : int option = Some 4 + # pop_opt t + - : int option = Some 3 + ]} +*) + exception Empty -(** {1 Queue owner functions} *) +(** {2 Queue owner functions} *) val push : 'a t -> 'a -> unit -(** [push t v] adds [v] to the front of the queue [q]. - It should only be invoked by the domain which owns the queue [q]. *) +(** [push queue element] adds [element] at the end of the [queue]. + It should only be invoked by the domain that owns the [queue]. *) val pop_exn : 'a t -> 'a -(** [pop_exn q] removes and returns the first element in queue - [q].It should only be invoked by the domain which owns the queue - [q]. +(** [pop_exn queue] removes and returns the last element of the [queue]. It + should only be invoked by the domain that owns the [queue]. - @raise Exit if the queue is empty. - *) + @raises Empty if the [queue] is empty. *) val pop_opt : 'a t -> 'a option -(** [pop_opt q] removes and returns the first element in queue [q], or - returns [None] if the queue is empty. *) +(** [pop_opt queue] removes and returns [Some] of the last element of the + [queue], or returns [None] if the [queue] is empty. It should only + be invoked by the domain that owns the [queue]. *) -(** {1 Stealers function} *) +val drop_exn : 'a t -> unit +(** [drop_exn queue] removes the last element of the [queue]. It should only + be invoked by the domain that owns the [queue]. + + @raises Empty if the [queue] is empty. *) + +(** {2 Stealer functions} *) val steal_exn : 'a t -> 'a -(** [steal_exn q] removes and returns the last element from queue - [q]. It should only be invoked by domain which doesn't own the - queue [q]. +(** [steal_exn queue] removes and returns the first element of the [queue]. - @raise Exit if the queue is empty. - *) + @raises Empty if the [queue] is empty. *) val steal_opt : 'a t -> 'a option -(** [steal_opt q] removes and returns the last element from queue - [q], or returns [None] if the queue is empty. It should only be - invoked by domain which doesn't own the queue [q]. *) +(** [steal_opt queue] removes and returns [Some] of the first element of the + [queue], or returns [None] if the [queue] is empty. *) + +val steal_drop_exn : 'a t -> unit +(** [steal_drop_exn queue] removes the first element of the [queue]. + + @raises Empty if the [queue] is empty. *) + +(** {1 Examples} *) + +(** {2 Sequential example} + An example top-level session: +{[ + # open Saturn.Work_stealing_deque + # let t : int t = of_list [1;2;3;4;5;6] + val t : int t = + # pop_opt t + - : int option = Some 6 + # steal_opt t + - : int option = Some 1 + # drop_exn t + - : unit = () + # pop_opt t + - : int option = Some 4 + # steal_drop_exn t + - : unit = () + # steal_exn t + - : int = 3 + # steal_exn t + Exception: Saturn__Ws_deque.Empty. +]} +*) + +(** {2 Multicore example} + Note: The barrier is used in this example solely to make the results more + interesting by increasing the likelihood of parallelism. Spawning a domain is + a costly operation, especially compared to the relatively small amount of work + being performed here. In practice, using a barrier in this manner is unnecessary. + + +{@ocaml non-deterministic=command[ + # open Saturn.Work_stealing_deque + # let t : int t = create () + val t : int t = + # let barrier = Atomic.make 3 + val barrier : int Atomic.t = + + # let owner () = + Atomic.decr barrier; + while Atomic.get barrier <> 0 do Domain.cpu_relax () done; + for i = 1 to 10 do push t i; Domain.cpu_relax () done + val owner : unit -> unit = + + # let stealer id () = + Atomic.decr barrier; + while Atomic.get barrier <> 0 do Domain.cpu_relax () done; + + for _ = 1 to 5 do + match steal_opt t with + | None -> () + | Some v -> Format.printf "Stealer %s stole %d@." id v + done + val stealer : string -> unit -> unit = + + # let stealerA = Domain.spawn (stealer "A") + val stealerA : unit Domain.t = + # let stealerB = Domain.spawn (stealer "B") + val stealerB : unit Domain.t = + # owner () + Stealer A stole 1 + Stealer B stole 2 + Stealer A stole 3 + Stealer B stole 4 + Stealer A stole 5 + Stealer A stole 7 + Stealer B stole 6 + Stealer A stole 8 + Stealer B stole 9 + Stealer B stole 10 + - : unit = () + # Domain.join stealerA; Domain.join stealerB + - : unit = () +]} +*) diff --git a/test/ws_deque/stm_ws_deque.ml b/test/ws_deque/stm_ws_deque.ml index c2090ddc..fd15fc84 100644 --- a/test/ws_deque/stm_ws_deque.ml +++ b/test/ws_deque/stm_ws_deque.ml @@ -6,13 +6,15 @@ open Util module Ws_deque = Saturn.Work_stealing_deque module Spec = struct - type cmd = Push of int | Pop | Steal + type cmd = Push of int | Pop | Drop | Steal | Steal_drop let show_cmd c = match c with | Push i -> "Push " ^ string_of_int i | Pop -> "Pop" + | Drop -> "Drop" | Steal -> "Steal" + | Steal_drop -> "Steal_drop" type state = int list type sut = int Ws_deque.t @@ -24,11 +26,15 @@ module Spec = struct [ Gen.map (fun i -> Push i) int_gen; Gen.return Pop; - (*Gen.return Steal;*) - (* No point in stealing from yourself :-D *) + Gen.return Drop; + Gen.return Steal; + Gen.return Steal_drop; ]) - let stealer_cmd _s = QCheck.make ~print:show_cmd (Gen.return Steal) + let stealer_cmd _s = + QCheck.make ~print:show_cmd + (Gen.oneof [ Gen.return Steal; Gen.return Steal_drop ]) + let init_state = [] let init_sut () = Ws_deque.create () let cleanup _ = () @@ -40,7 +46,9 @@ module Spec = struct (*if i<>1213 then i::s else s*) (* an artificial fault *) | Pop -> ( match s with [] -> s | _ :: s' -> s') + | Drop -> ( match s with [] -> s | _ :: s' -> s') | Steal -> ( match List.rev s with [] -> s | _ :: s' -> List.rev s') + | Steal_drop -> ( match List.rev s with [] -> s | _ :: s' -> List.rev s') let precond _ _ = true @@ -48,15 +56,21 @@ module Spec = struct match c with | Push i -> Res (unit, Ws_deque.push d i) | Pop -> Res (result int exn, protect Ws_deque.pop_exn d) + | Drop -> Res (result unit exn, protect Ws_deque.drop_exn d) | Steal -> Res (result int exn, protect Ws_deque.steal_exn d) + | Steal_drop -> Res (result unit exn, protect Ws_deque.steal_drop_exn d) let postcond c (s : state) res = match (c, res) with | Push _, Res ((Unit, _), _) -> true | Pop, Res ((Result (Int, Exn), _), res) -> ( match s with [] -> res = Error Ws_deque.Empty | j :: _ -> res = Ok j) + | Drop, Res ((Result (Unit, Exn), _), res) -> ( + match s with [] -> res = Error Ws_deque.Empty | _ -> res = Ok ()) | Steal, Res ((Result (Int, Exn), _), res) -> ( match List.rev s with [] -> Result.is_error res | j :: _ -> res = Ok j) + | Steal_drop, Res ((Result (Unit, Exn), _), res) -> ( + match List.rev s with [] -> Result.is_error res | _ -> res = Ok ()) | _, _ -> false end diff --git a/test/ws_deque/ws_deque_dscheck.ml b/test/ws_deque/ws_deque_dscheck.ml index 1cfe9775..40053328 100644 --- a/test/ws_deque/ws_deque_dscheck.ml +++ b/test/ws_deque/ws_deque_dscheck.ml @@ -1,9 +1,8 @@ let drain_remaining queue = - let remaining = ref 0 in + let remaining = ref [] in (try while true do - Ws_deque.pop_exn queue |> ignore; - remaining := !remaining + 1 + remaining := Ws_deque.pop_exn queue :: !remaining done with _ -> ()); !remaining @@ -36,31 +35,130 @@ let owner_stealer () = Atomic.final (fun () -> Atomic.check (fun () -> - let remaining = drain_remaining queue in + let remaining = drain_remaining queue |> List.length in remaining + !popped == total_items))) -let popper_stealer () = +let of_list_stealers () = Atomic.trace (fun () -> - let queue = Ws_deque.create () in let total_items = 3 in - for _ = 1 to total_items do - Ws_deque.push queue 0 + let queue = Ws_deque.of_list (List.init total_items (fun x -> x + 1)) in + + (* stealers *) + let popped = ref 0 in + Atomic.spawn (fun () -> + Ws_deque.push queue (total_items + 1); + popped := Ws_deque.pop_exn queue); + + let stolen = ref [] in + let stealer () = + match Ws_deque.steal_exn queue with + | exception _ -> () + | v -> stolen := v :: !stolen + in + for _ = 1 to total_items - 1 do + Atomic.spawn stealer done; + Atomic.final (fun () -> + Atomic.check (fun () -> !popped = total_items + 1); + Atomic.check (fun () -> + List.sort Int.compare !stolen + = List.init (total_items - 1) (fun x -> x + 1)); + Atomic.check (fun () -> drain_remaining queue = [ total_items ]))) + +let popper_stealer () = + Atomic.trace (fun () -> + let total_items = 5 in + let queue = Ws_deque.of_list (List.init total_items (fun _ -> 0)) in + (* stealers *) let popped = ref 0 in + Atomic.spawn (fun () -> + match Ws_deque.pop_opt queue with + | None -> () + | _ -> popped := !popped + 1); + let stealer () = match Ws_deque.steal_exn queue with | exception _ -> () | _ -> popped := !popped + 1 in - Atomic.spawn stealer |> ignore; - Atomic.spawn stealer |> ignore; + for _ = 1 to total_items - 2 do + Atomic.spawn stealer + done; + + Atomic.final (fun () -> + Atomic.check (fun () -> + let remaining = drain_remaining queue |> List.length in + remaining = 1 && !popped = total_items - 1))) + +let popper_stealer_drop () = + Atomic.trace (fun () -> + let total_items = 5 in + let queue = Ws_deque.of_list (List.init total_items (fun _ -> 0)) in + + (* stealers *) + let popped = ref 0 in + Atomic.spawn (fun () -> + match Ws_deque.drop_exn queue with + | exception Ws_deque.Empty -> () + | _ -> popped := !popped + 1); + + let stealer () = + match Ws_deque.steal_drop_exn queue with + | exception Ws_deque.Empty -> () + | _ -> popped := !popped + 1 + in + for _ = 1 to total_items - 2 do + Atomic.spawn stealer + done; + + Atomic.final (fun () -> + Atomic.check (fun () -> + let remaining = drain_remaining queue |> List.length in + remaining = 1 && !popped = total_items - 1))) + +let owner_2_stealers () = + Atomic.trace (fun () -> + let queue = Ws_deque.create () in + let total_items = 6 in + + let popped = ref [] in + (* owner thr *) + Atomic.spawn (fun () -> + for i = 1 to total_items do + Ws_deque.push queue i + done; + for _ = 1 to total_items / 3 do + match Ws_deque.pop_exn queue with + | exception _ -> () + | v -> popped := v :: !popped + done); + + let stolen1 = ref [] in + (* stealer *) + Atomic.spawn (fun () -> + for _ = 1 to total_items / 3 do + match Ws_deque.steal_exn queue with + | exception _ -> () + | v -> stolen1 := v :: !stolen1 + done); + + let stolen2 = ref [] in + (* stealer *) + Atomic.spawn (fun () -> + for _ = 1 to total_items / 3 do + match Ws_deque.steal_exn queue with + | exception _ -> () + | v -> stolen2 := v :: !stolen2 + done); Atomic.final (fun () -> Atomic.check (fun () -> let remaining = drain_remaining queue in - remaining = 1 && !popped = 2))) + let total = !popped @ !stolen1 @ !stolen2 @ remaining in + List.sort Int.compare total + = List.init total_items (fun x -> x + 1)))) let () = let open Alcotest in @@ -69,8 +167,9 @@ let () = ( "basic", [ test_case "1-owner-1-stealer" `Slow owner_stealer; - test_case "1-pusher-2-stealers" `Slow popper_stealer; - (* we'd really want to test cases with more threads here, - but dscheck is not optimized enough for that yet *) + test_case "1-popped-n-stealers" `Slow popper_stealer; + test_case "1-popped-n-stealers-drop" `Slow popper_stealer_drop; + test_case "1-owner-2-stealers" `Slow owner_2_stealers; + test_case "of_list-n-stealers" `Slow of_list_stealers; ] ); ]