From 505d69362759e950f6efa2901d3f8088e00931c6 Mon Sep 17 00:00:00 2001 From: Vesa Karvonen Date: Fri, 1 Dec 2023 14:00:32 +0200 Subject: [PATCH] WIP: Make `Accumulator` grow automatically --- dune-project | 8 ++++ kcas_data.opam | 2 + src/kcas_data/accumulator.ml | 47 +++++++++++++-------- src/kcas_data/accumulator.mli | 14 ++---- src/kcas_data/hashtbl.ml | 16 +++---- src/kcas_data/hashtbl.mli | 9 +--- test/kcas_data/accumulator_test.ml | 54 ++++++++++++++++++++++++ test/kcas_data/dune | 10 ++++- test/kcas_data/stm_run/dune | 22 ++++++++++ test/kcas_data/stm_run/stm_run.ignore.ml | 0 test/kcas_data/stm_run/stm_run.ocaml4.ml | 8 ++++ test/kcas_data/stm_run/stm_run.ocaml5.ml | 8 ++++ 12 files changed, 153 insertions(+), 45 deletions(-) create mode 100644 test/kcas_data/accumulator_test.ml create mode 100644 test/kcas_data/stm_run/dune create mode 100644 test/kcas_data/stm_run/stm_run.ignore.ml create mode 100644 test/kcas_data/stm_run/stm_run.ocaml4.ml create mode 100644 test/kcas_data/stm_run/stm_run.ocaml5.ml diff --git a/dune-project b/dune-project index 964218db..84fa797a 100644 --- a/dune-project +++ b/dune-project @@ -82,6 +82,14 @@ (and (>= 1.7.0) :with-test)) + (qcheck-core + (and + (>= 0.21.2) + :with-test)) + (qcheck-stm + (and + (>= 0.3) + :with-test)) (mdx (and (>= 2.3.0) diff --git a/kcas_data.opam b/kcas_data.opam index 41b1d7e5..ecd70c1e 100644 --- a/kcas_data.opam +++ b/kcas_data.opam @@ -20,6 +20,8 @@ depends: [ "domain_shims" {>= "0.1.0" & with-test} "mtime" {>= "2.0.0" & with-test} "alcotest" {>= "1.7.0" & with-test} + "qcheck-core" {>= "0.21.2" & with-test} + "qcheck-stm" {>= "0.3" & with-test} "mdx" {>= "2.3.0" & with-test} "yojson" {>= "2.1.0" & with-test} "odoc" {>= "2.2.0" & with-doc} diff --git a/src/kcas_data/accumulator.ml b/src/kcas_data/accumulator.ml index 1f1a1b2d..e17f0bea 100644 --- a/src/kcas_data/accumulator.ml +++ b/src/kcas_data/accumulator.ml @@ -1,39 +1,50 @@ open Kcas -let n_way_max = Domain.recommended_domain_count () |> Bits.ceil_pow_2 -let n_way_default = n_way_max |> Int.min 8 +(** TODO: + - Ensure [get] and [set] always work correctly. + - Limit array length to [Domain.recommended_domain_count ()]. + - CPUs do not necessarily have power of two number of cores. *) -type t = int Loc.t array +type t = int Loc.t array Atomic.t -let make ?n_way n = - let n_way = - match n_way with - | None -> n_way_default - | Some n_way -> n_way |> Int.min n_way_max |> Bits.ceil_pow_2 - in - let a = Loc.make_array ~padded:true ~mode:Mode.lock_free n_way 0 in +let make n = + let a = Loc.make_array ~padded:true ~mode:Mode.lock_free 1 0 in Loc.set (Array.unsafe_get a 0) n; - a + Atomic.make a + +let n_way_of a = + let a = Multicore_magic.fenceless_get a in + Array.length a -let n_way_of = Array.length +let[@inline never] get_self a i cs n = + let add_cs = Loc.make_array ~padded:true ~mode:Mode.lock_free n 0 in + let new_cs = + Array.init (n * 2) @@ fun i -> + if i < n then Array.unsafe_get cs i else Array.unsafe_get add_cs (i - n) + in + Atomic.compare_and_set a cs new_cs |> ignore; + Array.unsafe_get cs (i land (n - 1)) -let get_self a = - let h = Multicore_magic.instantaneous_domain_index () in - Array.unsafe_get a (h land (Array.length a - 1)) +let[@inline] get_self a = + let i = Multicore_magic.instantaneous_domain_index () in + let cs = Multicore_magic.fenceless_get a in + let n = Array.length cs in + if i < n then Array.unsafe_get cs i else get_self a i cs n module Xt = struct let add ~xt a n = if n <> 0 then Xt.fetch_and_add ~xt (get_self a) n |> ignore let incr ~xt a = Xt.incr ~xt (get_self a) let decr ~xt a = Xt.decr ~xt (get_self a) - let rec get ~xt a s i = + let rec get_rec ~xt a s i = let s = s + Xt.get ~xt (Array.unsafe_get a i) in - if i = 0 then s else get ~xt a s (i - 1) + if i = 0 then s else get_rec ~xt a s (i - 1) let get ~xt a = + let a = Multicore_magic.fenceless_get a in let i = Array.length a - 1 in let s = Xt.get ~xt (Array.unsafe_get a i) in - if i = 0 then s else get ~xt a s (i - 1) + if i = 0 then s else get_rec ~xt a s (i - 1) let set ~xt a n = add ~xt a (n - get ~xt a) end diff --git a/src/kcas_data/accumulator.mli b/src/kcas_data/accumulator.mli index 31fbee89..3a2d9c7d 100644 --- a/src/kcas_data/accumulator.mli +++ b/src/kcas_data/accumulator.mli @@ -10,19 +10,13 @@ open Kcas type t (** The type of a scalable accumulator. *) -val make : ?n_way:int -> int -> t -(** [make n] returns a new accumulator whose initial value is [n]. - - The optional [n_way] argument can be used to specify a desired level of - parallelism, i.e. maximum number of non-interfering parallel updates. The - default value is chosen to strike a balance between scalability and memory - use and a given value may be adjusted by the implementation. *) +val make : int -> t +(** [make n] returns a new accumulator whose initial value is [n]. *) val n_way_of : t -> int (** [n_way_of a] returns the maximum number of non-interfering parallel updates - supported by the accumulator [a]. - - {b NOTE}: The returned value may not be the same as given to {!make}. *) + currently supported by the accumulator [a]. An accumulator grows + automatically to maximize throughput. *) (** {1 Compositional interface} *) diff --git a/src/kcas_data/hashtbl.ml b/src/kcas_data/hashtbl.ml index 7f91b487..f52f1a23 100644 --- a/src/kcas_data/hashtbl.ml +++ b/src/kcas_data/hashtbl.ml @@ -127,7 +127,7 @@ module HashedType = struct hash == HashedType.hash && equal == HashedType.equal end -let create ?hashed_type ?min_buckets ?max_buckets ?n_way () = +let create ?hashed_type ?min_buckets ?max_buckets () = let min_buckets = match min_buckets with | None -> min_buckets_default @@ -144,7 +144,7 @@ let create ?hashed_type ?min_buckets ?max_buckets ?n_way () = | Some hashed_type -> HashedType.unpack hashed_type and pending = Nothing and buckets = Loc.make_array min_buckets Assoc.Nil - and length = Accumulator.make ?n_way 0 in + and length = Accumulator.make 0 in Loc.set t (Multicore_magic.copy_as_padded { pending; length; buckets; hash; equal; min_buckets; max_buckets }); @@ -444,12 +444,12 @@ let to_seq t = let to_seq_keys t = to_seq t |> Seq.map fst let to_seq_values t = to_seq t |> Seq.map snd -let of_seq ?hashed_type ?min_buckets ?max_buckets ?n_way xs = - let t = create ?hashed_type ?min_buckets ?max_buckets ?n_way () in +let of_seq ?hashed_type ?min_buckets ?max_buckets xs = + let t = create ?hashed_type ?min_buckets ?max_buckets () in Seq.iter (fun (k, v) -> replace t k v) xs; t -let rebuild ?hashed_type ?min_buckets ?max_buckets ?n_way t = +let rebuild ?hashed_type ?min_buckets ?max_buckets t = let record = ref (Obj.magic ()) and length = ref 0 in let snapshot = snapshot ~length ~record t in let r = !record in @@ -462,8 +462,6 @@ let rebuild ?hashed_type ?min_buckets ?max_buckets ?n_way t = match max_buckets with | None -> Int.max min_buckets r.max_buckets | Some c -> Int.max min_buckets c |> Int.min hi_buckets |> Bits.ceil_pow_2 - and n_way = - match n_way with None -> Accumulator.n_way_of r.length | Some n -> n in let is_same_hashed_type = match hashed_type with @@ -474,14 +472,14 @@ let rebuild ?hashed_type ?min_buckets ?max_buckets ?n_way t = let t = Loc.make ~padded:true (Obj.magic ()) in let pending = Nothing and buckets = Array.map Loc.make snapshot - and length = Accumulator.make ~n_way length in + and length = Accumulator.make length in Loc.set t @@ Multicore_magic.copy_as_padded { r with pending; length; buckets; min_buckets; max_buckets }; t end else - let t = create ?hashed_type ~min_buckets ~max_buckets ~n_way () in + let t = create ?hashed_type ~min_buckets ~max_buckets () in snapshot |> Array.iter (Assoc.iter_rev (add t)); t diff --git a/src/kcas_data/hashtbl.mli b/src/kcas_data/hashtbl.mli index eea7754a..bfa1e0f0 100644 --- a/src/kcas_data/hashtbl.mli +++ b/src/kcas_data/hashtbl.mli @@ -43,7 +43,6 @@ val create : ?hashed_type:'k hashed_type -> ?min_buckets:int -> ?max_buckets:int -> - ?n_way:int -> unit -> ('k, 'v) t (** [create ()] returns a new empty hash table. @@ -55,8 +54,6 @@ val create : - The default [max_buckets] is the minimum of [1 lsl 30] and suitably adjusted [Sys.max_array_length] and a given [max_buckets] may be adjusted by the implementation. - - The [n_way] argument is passed to the internal {!Accumulator} used to keep - track of the number of bindings. Hash tables are automatically internally resized. *) @@ -78,14 +75,13 @@ val max_buckets_of : ('k, 'v) t -> int val n_way_of : ('k, 'v) t -> int (** [n_way_of t] returns the maximum number of non-interfering parallel updates - allowed by the internal {!Accumulator} used to keep track of the number of - bindings in the hash table [t]. *) + currently supported by the internal {!Accumulator} used to keep track of the + number of bindings in the hash table [t]. *) val of_seq : ?hashed_type:'k hashed_type -> ?min_buckets:int -> ?max_buckets:int -> - ?n_way:int -> ('k * 'v) Seq.t -> ('k, 'v) t (** [of_seq assoc] creates a new hash table from the given association sequence @@ -138,7 +134,6 @@ val rebuild : ?hashed_type:'k hashed_type -> ?min_buckets:int -> ?max_buckets:int -> - ?n_way:int -> ('k, 'v) t -> ('k, 'v) t (** [rebuild t] returns a copy of the given hash table [t] optionally rehashing diff --git a/test/kcas_data/accumulator_test.ml b/test/kcas_data/accumulator_test.ml new file mode 100644 index 00000000..0784833e --- /dev/null +++ b/test/kcas_data/accumulator_test.ml @@ -0,0 +1,54 @@ +open Kcas_data + +module Spec = struct + type cmd = Incr | Decr | Get | Set of int + + let show_cmd = function + | Incr -> "Incr" + | Decr -> "Decr" + | Get -> "Get" + | Set v -> "Set " ^ string_of_int v + + type state = int + type sut = Accumulator.t + + let arb_cmd _s = + QCheck.( + [ + Gen.return Incr; + Gen.return Decr; + Gen.return Get; + Gen.map (fun i -> Set i) Gen.nat; + ] + |> Gen.oneof |> make ~print:show_cmd) + + let init_state = 0 + let init_sut () = Accumulator.make 0 + let cleanup _ = () + + let next_state c s = + match c with Incr -> s + 1 | Decr -> s - 1 | Get -> s | Set v -> v + + let precond _ _ = true + + let run c d = + let open STM in + match c with + | Incr -> Res (unit, Accumulator.incr d) + | Decr -> Res (unit, Accumulator.decr d) + | Get -> Res (int, Accumulator.get d) + | Set v -> Res (unit, Accumulator.set d v) + + let postcond c (s : state) res = + let open STM in + match (c, res) with + | Incr, Res ((Unit, _), ()) -> true + | Decr, Res ((Unit, _), ()) -> true + | Set _, Res ((Unit, _), ()) -> true + | Get, Res ((Int, _), res) -> res = s + | _, _ -> false +end + +let () = + Stm_run.run ~count:1000 ~verbose:true ~name:"Accumulator" (module Spec) + |> exit diff --git a/test/kcas_data/dune b/test/kcas_data/dune index 7aac5585..b0052eed 100644 --- a/test/kcas_data/dune +++ b/test/kcas_data/dune @@ -1,5 +1,6 @@ (tests (names + accumulator_test dllist_test hashtbl_test lru_cache_example @@ -7,5 +8,12 @@ queue_test stack_test xt_test) - (libraries alcotest kcas kcas_data domain_shims) + (libraries + alcotest + kcas + kcas_data + domain_shims + stm_run + qcheck-core + qcheck-stm.stm) (package kcas_data)) diff --git a/test/kcas_data/stm_run/dune b/test/kcas_data/stm_run/dune new file mode 100644 index 00000000..72ea68e7 --- /dev/null +++ b/test/kcas_data/stm_run/dune @@ -0,0 +1,22 @@ +(library + (name stm_run) + (libraries + qcheck-stm.sequential + (select + stm_run.ml + from + (qcheck-core + qcheck-core.runner + qcheck-stm.stm + qcheck-stm.thread + qcheck-stm.domain + -> + stm_run.ocaml5.ml) + (qcheck-core + qcheck-core.runner + qcheck-stm.stm + qcheck-stm.thread + -> + stm_run.ocaml4.ml) + (-> stm_run.ignore.ml))) + (package kcas_data)) diff --git a/test/kcas_data/stm_run/stm_run.ignore.ml b/test/kcas_data/stm_run/stm_run.ignore.ml new file mode 100644 index 00000000..e69de29b diff --git a/test/kcas_data/stm_run/stm_run.ocaml4.ml b/test/kcas_data/stm_run/stm_run.ocaml4.ml new file mode 100644 index 00000000..0f82f68c --- /dev/null +++ b/test/kcas_data/stm_run/stm_run.ocaml4.ml @@ -0,0 +1,8 @@ +let run ~verbose ~count ~name (module Spec : STM.Spec) = + let module Seq = STM_sequential.Make (Spec) in + let module Con = STM_thread.Make (Spec) [@alert "-experimental"] in + QCheck_base_runner.run_tests ~verbose + [ + Seq.agree_test ~count ~name:(name ^ " sequential"); + Con.agree_test_conc ~count ~name:(name ^ " concurrent"); + ] diff --git a/test/kcas_data/stm_run/stm_run.ocaml5.ml b/test/kcas_data/stm_run/stm_run.ocaml5.ml new file mode 100644 index 00000000..9f12f283 --- /dev/null +++ b/test/kcas_data/stm_run/stm_run.ocaml5.ml @@ -0,0 +1,8 @@ +let run ~verbose ~count ~name (module Spec : STM.Spec) = + let module Seq = STM_sequential.Make (Spec) in + let module Dom = STM_domain.Make (Spec) in + QCheck_base_runner.run_tests ~verbose + [ + Seq.agree_test ~count ~name:(name ^ " sequential"); + Dom.agree_test_par ~count ~name:(name ^ " parallel"); + ]