diff --git a/README.md b/README.md index 09e78914..5440da46 100644 --- a/README.md +++ b/README.md @@ -613,33 +613,6 @@ argument for potentially blocking operations. For example, to perform a blocking pop with a timeout, one can simply explicitly pass the desired timeout in seconds: -```ocaml -# let an_empty_stack = stack () in - Xt.commit ~timeoutf:0.1 { tx = pop an_empty_stack } -Exception: Failure "Domain_local_timeout.set_timeoutf not implemented". -``` - -Oops! What happened above is that the -[_domain local timeout_](https://github.com/ocaml-multicore/domain-local-timeout) -mechanism used by **Kcas** was not implemented on the current domain. The idea -is that, in the future, concurrent schedulers provide the mechanism out of the -box, but there is also a default implementation using the Stdlib `Thread` and -`Unix` modules that works on most platforms. However, to avoid direct -dependencies to `Thread` and `Unix`, we need to explicitly tell the library that -it can use those modules: - -```ocaml -# Domain_local_timeout.set_system (module Thread) (module Unix) -- : unit = () -``` - -This initialization, if needed, should be done by application code rather than -by libraries. - -If we now retry the previous example we will get a -[`Timeout`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Timeout/index.html#exception-Timeout) -exception as expected: - ```ocaml # let an_empty_stack = stack () in Xt.commit ~timeoutf:0.1 { tx = pop an_empty_stack } diff --git a/bench/bench.ml b/bench/bench.ml index 88f67360..2d4c6863 100644 --- a/bench/bench.ml +++ b/bench/bench.ml @@ -30,37 +30,11 @@ module Times = struct results.(domain_i) done in - let prepare_for_await () = - let open struct - type state = Init | Released | Awaiting of { mutable released : bool } - end in - let state = Atomic.make Init in - let release () = - if Multicore_magic.fenceless_get state != Released then - match Atomic.exchange state Released with - | Awaiting r -> r.released <- true - | _ -> () - in - let await () = - if Multicore_magic.fenceless_get state != Released then - let awaiting = Awaiting { released = false } in - if Atomic.compare_and_set state Init awaiting then - match awaiting with - | Awaiting r -> - (* Avoid sleeping *) - while not r.released do - Domain.cpu_relax () - done - | _ -> () - in - Domain_local_await.{ release; await } + let domains = + Array.init n_domains @@ fun domain_i -> + Domain.spawn @@ fun () -> main domain_i in - Domain_local_await.using ~prepare_for_await ~while_running:(fun () -> - let domains = - Array.init n_domains @@ fun domain_i -> - Domain.spawn @@ fun () -> main domain_i - in - Array.iter Domain.join domains); + Array.iter Domain.join domains; let n = Stack.length results.(0) in let times = Array.create_float n in for run_i = 0 to n - 1 do diff --git a/bench/dune b/bench/dune index 06c316c6..15e8cc33 100644 --- a/bench/dune +++ b/bench/dune @@ -3,7 +3,6 @@ (package kcas_data) (libraries kcas_data - domain-local-await multicore-magic yojson domain_shims diff --git a/doc/scheduler-interop.md b/doc/scheduler-interop.md index 827a4e96..0c3f52a9 100644 --- a/doc/scheduler-interop.md +++ b/doc/scheduler-interop.md @@ -21,6 +21,8 @@ implementations that are conveniently provided by ```ocaml # #thread # #require "kcas_data" +# #require "picos" +# open Picos # open Kcas_data # open Kcas ``` @@ -36,45 +38,29 @@ module Scheduler : sig val fiber : t -> (unit -> 'a) -> 'a Promise.t end = struct open Effect.Deep - type _ Effect.t += - | Suspend : (('a, unit) continuation -> unit) -> 'a Effect.t type t = { queue: (unit -> unit) Queue.t; domain: unit Domain.t } let spawn () = - let queue = Queue.create () in + let queue: (unit -> unit) Queue.t = Queue.create () in let rec scheduler work = let effc (type a) : a Effect.t -> _ = function - | Suspend ef -> Some ef - | _ -> None in + | Trigger.Await release -> + Some (fun (k: (a, _) continuation) -> + if not (Trigger.on_signal release () () @@ fun _ () () -> + Queue.add (fun () -> continue k None) queue) then + continue k None) + | _ -> + None in try_with work () { effc }; match Queue.take_opt queue with | Some work -> scheduler work | None -> () in - let prepare_for_await _ = - let state = Atomic.make `Init in - let release () = - if Atomic.get state != `Released then - match Atomic.exchange state `Released with - | `Awaiting k -> - Queue.add (continue k) queue - | _ -> () in - let await () = - if Atomic.get state != `Released then - Effect.perform @@ Suspend (fun k -> - if not (Atomic.compare_and_set state `Init - (`Awaiting k)) then - continue k ()) - in - Domain_local_await.{ release; await } in let domain = Domain.spawn @@ fun () -> try while true do - let work = Queue.take_blocking queue in - Domain_local_await.using - ~prepare_for_await - ~while_running:(fun () -> scheduler work) + scheduler (Queue.take_blocking queue) done with Exit -> () in { queue; domain } diff --git a/dune-project b/dune-project index 01389617..b2417069 100644 --- a/dune-project +++ b/dune-project @@ -32,10 +32,7 @@ (>= 4.13.0)) (backoff (>= 0.1.0)) - (domain-local-await - (>= 1.0.0)) - (domain-local-timeout - (>= 1.0.0)) + picos (multicore-magic (>= 2.0.0)) (domain_shims @@ -66,10 +63,7 @@ (= :version)) (multicore-magic (>= 2.0.0)) - (domain-local-await - (and - (>= 1.0.0) - :with-test)) + (picos :with-test) (domain_shims (and (>= 0.1.0) diff --git a/kcas.opam b/kcas.opam index 14e35571..eeb89072 100644 --- a/kcas.opam +++ b/kcas.opam @@ -16,8 +16,7 @@ depends: [ "dune" {>= "3.8"} "ocaml" {>= "4.13.0"} "backoff" {>= "0.1.0"} - "domain-local-await" {>= "1.0.0"} - "domain-local-timeout" {>= "1.0.0"} + "picos" "multicore-magic" {>= "2.0.0"} "domain_shims" {>= "0.1.0" & with-test} "alcotest" {>= "1.7.0" & with-test} @@ -40,3 +39,6 @@ build: [ ] dev-repo: "git+https://github.com/ocaml-multicore/kcas.git" doc: "https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/" +pin-depends: [ + [ "picos.dev" "git+https://github.com/ocaml-multicore/picos#aaf9a284882328afffa1bdefc309623deb272aaa" ] +] diff --git a/kcas.opam.template b/kcas.opam.template index 0fd71d5e..e628d0dd 100644 --- a/kcas.opam.template +++ b/kcas.opam.template @@ -1 +1,4 @@ doc: "https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/" +pin-depends: [ + [ "picos.dev" "git+https://github.com/ocaml-multicore/picos#aaf9a284882328afffa1bdefc309623deb272aaa" ] +] diff --git a/kcas_data.opam b/kcas_data.opam index eb44d68d..215ff773 100644 --- a/kcas_data.opam +++ b/kcas_data.opam @@ -16,7 +16,7 @@ depends: [ "dune" {>= "3.8"} "kcas" {= version} "multicore-magic" {>= "2.0.0"} - "domain-local-await" {>= "1.0.0" & with-test} + "picos" {with-test} "domain_shims" {>= "0.1.0" & with-test} "mtime" {>= "2.0.0" & with-test} "alcotest" {>= "1.7.0" & with-test} diff --git a/src/kcas/dune b/src/kcas/dune index 70b19af5..39a0c91f 100644 --- a/src/kcas/dune +++ b/src/kcas/dune @@ -1,7 +1,7 @@ (library (name kcas) (public_name kcas) - (libraries domain-local-await domain-local-timeout backoff multicore-magic)) + (libraries picos backoff multicore-magic)) (mdx (package kcas) diff --git a/src/kcas/kcas.ml b/src/kcas/kcas.ml index e8de90a0..e07c58d3 100644 --- a/src/kcas/kcas.ml +++ b/src/kcas/kcas.ml @@ -3,6 +3,8 @@ * Copyright (c) 2023, Vesa Karvonen *) +open Picos + (** Work around CSE bug in OCaml 5-5.1. *) let[@inline] atomic_get x = Atomic.get ((* Prevents CSE *) Sys.opaque_identity x) @@ -27,68 +29,32 @@ let fenceless_set = Atomic.set module Timeout = struct exception Timeout - let[@inline never] timeout () = raise Timeout - - type t = Unset | Elapsed | Call of (unit -> unit) - - let unset = Atomic.make Unset - - (* Fenceless operations are safe here as the timeout state is not not visible - outside of the library and we don't always need the latest value and, when - we do, there is a fence after. *) + type t = (unit, [ `Await | `Cancel | `Return ]) Computation.t - let[@inline] check state = if fenceless_get state == Elapsed then timeout () + let unset = Computation.finished + let timeout = Exn_bt.get_callstack 0 Timeout - let set seconds state = - Domain_local_timeout.set_timeoutf seconds @@ fun () -> - match Atomic.exchange state Elapsed with - | Call release_or_cancel -> release_or_cancel () - | Unset | Elapsed -> () - - let[@inline never] alloc_opt = function + let opt = function | None -> unset | Some seconds -> - let state = Atomic.make Unset in - let cancel = set seconds state in - fenceless_set state @@ Call cancel; - state - - let[@inline] alloc_opt seconds = - if seconds == None then unset else alloc_opt seconds + let computation = Computation.create () in + Computation.cancel_after computation ~seconds timeout; + computation - let[@inline never] set_opt state = function - | None -> () - | Some seconds -> - let cancel = set seconds state in - fenceless_set state @@ Call cancel + let check t = if t != unset then Computation.check t + let cancel t = if t != unset then Computation.finish t - let[@inline] set_opt state seconds = - if seconds != None then set_opt state seconds + let attach t trigger = + if t != unset then + if not (Computation.try_attach t trigger) then Computation.check t - let[@inline never] await state release = - match fenceless_get state with - | Call _ as alive -> - if Atomic.compare_and_set state alive (Call release) then alive - else timeout () - | Unset | Elapsed -> timeout () - - let[@inline] await state release = - let alive = fenceless_get state in - if alive == Unset then Unset else await state release - - let[@inline never] unawait state alive = - match fenceless_get state with - | Call _ as await -> - if not (Atomic.compare_and_set state await alive) then timeout () - | Unset | Elapsed -> timeout () - - let[@inline] unawait state alive = if alive != Unset then unawait state alive - - let[@inline never] cancel_alive alive = - match alive with Call cancel -> cancel () | Unset | Elapsed -> () + let check_and_detach t trigger = + if t != unset then begin + Computation.check t; + Computation.detach t trigger + end - let[@inline] cancel_alive alive = if alive != Unset then cancel_alive alive - let[@inline] cancel state = cancel_alive (fenceless_get state) + let detach t trigger = if t != unset then Computation.detach t trigger end module Id = struct @@ -121,9 +87,9 @@ end = struct x end -type awaiter = unit -> unit +type awaiter = [ `Signal ] Trigger.t -let[@inline] resume_awaiter awaiter = awaiter () +let[@inline] resume_awaiter awaiter = Trigger.signal awaiter |> ignore let[@inline] resume_awaiters = function | [] -> () @@ -415,10 +381,11 @@ let add_awaiter loc before awaiter = (* Fenceless is safe as we have fence after. *) let state_old = fenceless_get (as_atomic loc) in let state_new = - let awaiters = awaiter :: state_old.awaiters in + let awaiters = (awaiter :> [ `Signal ] Trigger.t) :: state_old.awaiters in { before; after = before; casn = casn_after; awaiters } in - before == eval state_old + (not (Trigger.is_signaled awaiter)) + && before == eval state_old && Atomic.compare_and_set (as_atomic loc) state_old state_new let[@tail_mod_cons] rec remove_first x' removed = function @@ -432,23 +399,24 @@ let rec remove_awaiter loc before awaiter = let state_old = fenceless_get (as_atomic loc) in if before == eval state_old then let removed = ref true in - let awaiters = remove_first awaiter removed state_old.awaiters in + let awaiters = + remove_first (awaiter :> [ `Signal ] Trigger.t) removed state_old.awaiters + in if !removed then let state_new = { before; after = before; casn = casn_after; awaiters } in if not (Atomic.compare_and_set (as_atomic loc) state_old state_new) then remove_awaiter loc before awaiter let block timeout loc before = - let t = Domain_local_await.prepare_for_await () in - let alive = Timeout.await timeout t.release in - if add_awaiter loc before t.release then begin - try t.await () - with cancellation_exn -> - remove_awaiter loc before t.release; - Timeout.cancel_alive alive; - raise cancellation_exn - end; - Timeout.unawait timeout alive + let t = Trigger.create () in + Timeout.attach timeout t; + if add_awaiter loc before t then + match Trigger.await t with + | None -> Timeout.check_and_detach timeout t + | Some cancelation_exn -> + remove_awaiter loc before t; + Timeout.cancel timeout; + Exn_bt.raise cancelation_exn let rec update_no_alloc timeout backoff loc state f = (* Fenceless is safe as we have had a fence before if needed and there is a fence after. *) @@ -573,7 +541,7 @@ module Loc = struct raise exn let[@inline] get_as ?timeoutf f loc = - get_as (Timeout.alloc_opt timeoutf) f loc (atomic_get (as_atomic loc)) + get_as (Timeout.opt timeoutf) f loc (atomic_get (as_atomic loc)) let[@inline] get_mode loc = if loc.id < 0 then Mode.lock_free else Mode.obstruction_free @@ -584,14 +552,14 @@ module Loc = struct cas_with_state loc before state state_old let fenceless_update ?timeoutf ?(backoff = Backoff.default) loc f = - let timeout = Timeout.alloc_opt timeoutf in + let timeout = Timeout.opt timeoutf in update_with_state timeout backoff loc f (fenceless_get (as_atomic loc)) let[@inline] fenceless_modify ?timeoutf ?backoff loc f = fenceless_update ?timeoutf ?backoff loc f |> ignore let update ?timeoutf ?(backoff = Backoff.default) loc f = - let timeout = Timeout.alloc_opt timeoutf in + let timeout = Timeout.opt timeoutf in update_with_state timeout backoff loc f (atomic_get (as_atomic loc)) let[@inline] modify ?timeoutf ?backoff loc f = @@ -681,36 +649,14 @@ module Op = struct end module Xt = struct - (* NOTE: You can adjust comment blocks below to select whether or not to use - an unsafe cast to avoid a level of indirection due to [Atomic.t]. *) - - (**) type 'x t = { - mutable _timeout : Timeout.t; + timeout : Timeout.t; mutable casn : casn; mutable cass : cass; mutable validate_counter : int; mutable post_commit : Action.t; } - let[@inline] timeout_unset () = Timeout.Unset - - external timeout_as_atomic : 'x t -> Timeout.t Atomic.t = "%identity" - (**) - - (* - type 'x t = { - mutable _timeout : Timeout.t Atomic.t; - mutable casn : casn; - mutable cass : cass; - mutable validate_counter : int; - mutable post_commit : Action.t; - } - - let[@inline] timeout_unset () = Atomic.make Timeout.Unset - let[@inline] timeout_as_atomic r = r._timeout - *) - let[@inline] validate_one casn loc state = let before = if is_cmp casn state then eval state else state.before in (* Fenceless is safe inside transactions as each log update has a fence. *) @@ -730,7 +676,7 @@ module Xt = struct xt.validate_counter <- c1; (* Validate whenever counter reaches next power of 2. *) if c0 land c1 = 0 then begin - Timeout.check (timeout_as_atomic xt); + Timeout.check xt.timeout; validate_all xt.casn xt.cass end @@ -948,11 +894,11 @@ module Xt = struct | result -> begin match xt.cass with | NIL -> - Timeout.cancel (timeout_as_atomic xt); + Timeout.cancel xt.timeout; Action.run xt.post_commit result | CASN { loc; state; lt = NIL; gt = NIL; _ } -> if is_cmp xt.casn state then begin - Timeout.cancel (timeout_as_atomic xt); + Timeout.cancel xt.timeout; Action.run xt.post_commit result end else @@ -962,14 +908,14 @@ module Xt = struct (* Fenceless is safe inside transactions as each log update has a fence. *) let state_old = fenceless_get (as_atomic loc) in if cas_with_state loc before state state_old then begin - Timeout.cancel (timeout_as_atomic xt); + Timeout.cancel xt.timeout; Action.run xt.post_commit result end else commit (Backoff.once backoff) mode (reset_quick xt) tx | cass -> begin match determine_for_owner xt.casn cass with | true -> - Timeout.cancel (timeout_as_atomic xt); + Timeout.cancel xt.timeout; Action.run xt.post_commit result | false -> commit (Backoff.once backoff) mode (reset mode xt) tx | exception Mode.Interference -> @@ -978,42 +924,40 @@ module Xt = struct end end | exception Retry.Invalid -> - Timeout.check (timeout_as_atomic xt); + Timeout.check xt.timeout; commit (Backoff.once backoff) mode (reset_quick xt) tx | exception Retry.Later -> begin if xt.cass == NIL then invalid_retry (); - let t = Domain_local_await.prepare_for_await () in - let alive = Timeout.await (timeout_as_atomic xt) t.release in - match add_awaiters t.release xt.casn xt.cass with + let t = Trigger.create () in + Timeout.attach xt.timeout t; + match add_awaiters t xt.casn xt.cass with | NIL -> begin - match t.await () with - | () -> - remove_awaiters t.release xt.casn NIL xt.cass; - Timeout.unawait (timeout_as_atomic xt) alive; + match Trigger.await t with + | None -> + remove_awaiters t xt.casn NIL xt.cass; + Timeout.check_and_detach xt.timeout t; commit (Backoff.reset backoff) mode (reset_quick xt) tx - | exception cancellation_exn -> - remove_awaiters t.release xt.casn NIL xt.cass; - Timeout.cancel_alive alive; - raise cancellation_exn + | Some cancellation_exn -> + remove_awaiters t xt.casn NIL xt.cass; + Timeout.cancel xt.timeout; + Exn_bt.raise cancellation_exn end | CASN _ as stop -> - remove_awaiters t.release xt.casn stop xt.cass; - Timeout.unawait (timeout_as_atomic xt) alive; + remove_awaiters t xt.casn stop xt.cass; + Timeout.detach xt.timeout t; commit (Backoff.once backoff) mode (reset_quick xt) tx end | exception exn -> - Timeout.cancel (timeout_as_atomic xt); + Timeout.cancel xt.timeout; raise exn let[@inline] commit ?timeoutf ?(backoff = Backoff.default) ?(mode = Mode.obstruction_free) tx = - let casn = Atomic.make (mode :> status) + let timeout = Timeout.opt timeoutf + and casn = Atomic.make (mode :> status) and cass = NIL and validate_counter = initial_validate_period and post_commit = Action.noop in - let xt = - { _timeout = timeout_unset (); casn; cass; validate_counter; post_commit } - in - Timeout.set_opt (timeout_as_atomic xt) timeoutf; + let xt = { timeout; casn; cass; validate_counter; post_commit } in commit backoff mode xt tx.tx end diff --git a/test/kcas/dune b/test/kcas/dune index 147c9b47..7953c44a 100644 --- a/test/kcas/dune +++ b/test/kcas/dune @@ -1,10 +1,4 @@ (tests (names test test_overlapping_loc ms_queue_test example threads loc_modes) - (libraries - alcotest - kcas - domain-local-timeout - threads.posix - unix - domain_shims) + (libraries alcotest kcas threads.posix unix domain_shims) (package kcas)) diff --git a/test/kcas/test.ml b/test/kcas/test.ml index b34da151..1a4b25e7 100644 --- a/test/kcas/test.ml +++ b/test/kcas/test.ml @@ -795,13 +795,15 @@ let test_call () = (** This is a non-deterministic test that might fail occasionally. *) let test_timeout () = - Domain_local_timeout.set_system (module Thread) (module Unix); - let check (op : ?timeoutf:float -> bool Loc.t -> unit) () = let rec loop n = let x = Loc.make false in - let (_ : unit -> unit) = - Domain_local_timeout.set_timeoutf 0.6 @@ fun () -> Loc.set x true + let (_ : Thread.t) = + Thread.create + (fun () -> + Unix.sleepf 0.6; + Loc.set x true) + () in match op ~timeoutf:0.02 x with | () -> if 0 < n then loop (n - 1) else assert false