Skip to content

Commit

Permalink
WIP: Make Accumulator grow automatically
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Dec 2, 2023
1 parent 47bc888 commit 5fc0665
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 45 deletions.
8 changes: 8 additions & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@
(and
(>= 1.7.0)
:with-test))
(qcheck-core
(and
(>= 0.21.2)
:with-test))
(qcheck-stm
(and
(>= 0.3)
:with-test))
(mdx
(and
(>= 2.3.0)
Expand Down
2 changes: 2 additions & 0 deletions kcas_data.opam
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ depends: [
"domain_shims" {>= "0.1.0" & with-test}
"mtime" {>= "2.0.0" & with-test}
"alcotest" {>= "1.7.0" & with-test}
"qcheck-core" {>= "0.21.2" & with-test}
"qcheck-stm" {>= "0.3" & with-test}
"mdx" {>= "2.3.0" & with-test}
"yojson" {>= "2.1.0" & with-test}
"odoc" {>= "2.2.0" & with-doc}
Expand Down
47 changes: 29 additions & 18 deletions src/kcas_data/accumulator.ml
Original file line number Diff line number Diff line change
@@ -1,39 +1,50 @@
open Kcas

let n_way_max = Domain.recommended_domain_count () |> Bits.ceil_pow_2
let n_way_default = n_way_max |> Int.min 8
(** TODO:
- Ensure [get] and [set] always work correctly.
- Limit array length to [Domain.recommended_domain_count ()].
- CPUs do not necessarily have power of two number of cores. *)

type t = int Loc.t array
type t = int Loc.t array Atomic.t

let make ?n_way n =
let n_way =
match n_way with
| None -> n_way_default
| Some n_way -> n_way |> Int.min n_way_max |> Bits.ceil_pow_2
in
let a = Loc.make_array ~padded:true ~mode:Mode.lock_free n_way 0 in
let make n =
let a = Loc.make_array ~padded:true ~mode:Mode.lock_free 1 0 in
Loc.set (Array.unsafe_get a 0) n;
a
Atomic.make a

let n_way_of a =
let a = Multicore_magic.fenceless_get a in
Array.length a

let n_way_of = Array.length
let[@inline never] get_self a i cs n =
let add_cs = Loc.make_array ~padded:true ~mode:Mode.lock_free n 0 in
let new_cs =
Array.init (n * 2) @@ fun i ->
if i < n then Array.unsafe_get cs i else Array.unsafe_get add_cs (i - n)
in
Atomic.compare_and_set a cs new_cs |> ignore;
Array.unsafe_get cs (i land (n - 1))

let get_self a =
let h = Multicore_magic.instantaneous_domain_index () in
Array.unsafe_get a (h land (Array.length a - 1))
let[@inline] get_self a =
let i = Multicore_magic.instantaneous_domain_index () in
let cs = Multicore_magic.fenceless_get a in
let n = Array.length cs in
if i < n then Array.unsafe_get cs i else get_self a i cs n

module Xt = struct
let add ~xt a n = if n <> 0 then Xt.fetch_and_add ~xt (get_self a) n |> ignore
let incr ~xt a = Xt.incr ~xt (get_self a)
let decr ~xt a = Xt.decr ~xt (get_self a)

let rec get ~xt a s i =
let rec get_rec ~xt a s i =
let s = s + Xt.get ~xt (Array.unsafe_get a i) in
if i = 0 then s else get ~xt a s (i - 1)
if i = 0 then s else get_rec ~xt a s (i - 1)

let get ~xt a =
let a = Multicore_magic.fenceless_get a in
let i = Array.length a - 1 in
let s = Xt.get ~xt (Array.unsafe_get a i) in
if i = 0 then s else get ~xt a s (i - 1)
if i = 0 then s else get_rec ~xt a s (i - 1)

let set ~xt a n = add ~xt a (n - get ~xt a)
end
Expand Down
14 changes: 4 additions & 10 deletions src/kcas_data/accumulator.mli
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,13 @@ open Kcas
type t
(** The type of a scalable accumulator. *)

val make : ?n_way:int -> int -> t
(** [make n] returns a new accumulator whose initial value is [n].
The optional [n_way] argument can be used to specify a desired level of
parallelism, i.e. maximum number of non-interfering parallel updates. The
default value is chosen to strike a balance between scalability and memory
use and a given value may be adjusted by the implementation. *)
val make : int -> t
(** [make n] returns a new accumulator whose initial value is [n]. *)

val n_way_of : t -> int
(** [n_way_of a] returns the maximum number of non-interfering parallel updates
supported by the accumulator [a].
{b NOTE}: The returned value may not be the same as given to {!make}. *)
currently supported by the accumulator [a]. An accumulator grows
automatically to maximize throughput. *)

(** {1 Compositional interface} *)

Expand Down
16 changes: 7 additions & 9 deletions src/kcas_data/hashtbl.ml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ module HashedType = struct
hash == HashedType.hash && equal == HashedType.equal
end

let create ?hashed_type ?min_buckets ?max_buckets ?n_way () =
let create ?hashed_type ?min_buckets ?max_buckets () =
let min_buckets =
match min_buckets with
| None -> min_buckets_default
Expand All @@ -144,7 +144,7 @@ let create ?hashed_type ?min_buckets ?max_buckets ?n_way () =
| Some hashed_type -> HashedType.unpack hashed_type
and pending = Nothing
and buckets = Loc.make_array min_buckets Assoc.Nil
and length = Accumulator.make ?n_way 0 in
and length = Accumulator.make 0 in
Loc.set t
(Multicore_magic.copy_as_padded
{ pending; length; buckets; hash; equal; min_buckets; max_buckets });
Expand Down Expand Up @@ -444,12 +444,12 @@ let to_seq t =
let to_seq_keys t = to_seq t |> Seq.map fst
let to_seq_values t = to_seq t |> Seq.map snd

let of_seq ?hashed_type ?min_buckets ?max_buckets ?n_way xs =
let t = create ?hashed_type ?min_buckets ?max_buckets ?n_way () in
let of_seq ?hashed_type ?min_buckets ?max_buckets xs =
let t = create ?hashed_type ?min_buckets ?max_buckets () in
Seq.iter (fun (k, v) -> replace t k v) xs;
t

let rebuild ?hashed_type ?min_buckets ?max_buckets ?n_way t =
let rebuild ?hashed_type ?min_buckets ?max_buckets t =
let record = ref (Obj.magic ()) and length = ref 0 in
let snapshot = snapshot ~length ~record t in
let r = !record in
Expand All @@ -462,8 +462,6 @@ let rebuild ?hashed_type ?min_buckets ?max_buckets ?n_way t =
match max_buckets with
| None -> Int.max min_buckets r.max_buckets
| Some c -> Int.max min_buckets c |> Int.min hi_buckets |> Bits.ceil_pow_2
and n_way =
match n_way with None -> Accumulator.n_way_of r.length | Some n -> n
in
let is_same_hashed_type =
match hashed_type with
Expand All @@ -474,14 +472,14 @@ let rebuild ?hashed_type ?min_buckets ?max_buckets ?n_way t =
let t = Loc.make ~padded:true (Obj.magic ()) in
let pending = Nothing
and buckets = Array.map Loc.make snapshot
and length = Accumulator.make ~n_way length in
and length = Accumulator.make length in
Loc.set t
@@ Multicore_magic.copy_as_padded
{ r with pending; length; buckets; min_buckets; max_buckets };
t
end
else
let t = create ?hashed_type ~min_buckets ~max_buckets ~n_way () in
let t = create ?hashed_type ~min_buckets ~max_buckets () in
snapshot |> Array.iter (Assoc.iter_rev (add t));
t

Expand Down
9 changes: 2 additions & 7 deletions src/kcas_data/hashtbl.mli
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ val create :
?hashed_type:'k hashed_type ->
?min_buckets:int ->
?max_buckets:int ->
?n_way:int ->
unit ->
('k, 'v) t
(** [create ()] returns a new empty hash table.
Expand All @@ -55,8 +54,6 @@ val create :
- The default [max_buckets] is the minimum of [1 lsl 30] and suitably
adjusted [Sys.max_array_length] and a given [max_buckets] may be adjusted
by the implementation.
- The [n_way] argument is passed to the internal {!Accumulator} used to keep
track of the number of bindings.
Hash tables are automatically internally resized. *)

Expand All @@ -78,14 +75,13 @@ val max_buckets_of : ('k, 'v) t -> int

val n_way_of : ('k, 'v) t -> int
(** [n_way_of t] returns the maximum number of non-interfering parallel updates
allowed by the internal {!Accumulator} used to keep track of the number of
bindings in the hash table [t]. *)
currently supported by the internal {!Accumulator} used to keep track of the
number of bindings in the hash table [t]. *)

val of_seq :
?hashed_type:'k hashed_type ->
?min_buckets:int ->
?max_buckets:int ->
?n_way:int ->
('k * 'v) Seq.t ->
('k, 'v) t
(** [of_seq assoc] creates a new hash table from the given association sequence
Expand Down Expand Up @@ -138,7 +134,6 @@ val rebuild :
?hashed_type:'k hashed_type ->
?min_buckets:int ->
?max_buckets:int ->
?n_way:int ->
('k, 'v) t ->
('k, 'v) t
(** [rebuild t] returns a copy of the given hash table [t] optionally rehashing
Expand Down
63 changes: 63 additions & 0 deletions test/kcas_data/accumulator_test.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
open Kcas_data

module Spec = struct
type cmd = Incr | Decr | Get | Set of int

let show_cmd = function
| Incr -> "Incr"
| Decr -> "Decr"
| Get -> "Get"
| Set v -> "Set " ^ string_of_int v

type state = int
type sut = Accumulator.t

let arb_cmd _s =
QCheck.(
make ~print:show_cmd
(Gen.oneof
[
Gen.return Incr;
Gen.return Decr;
Gen.return Get;
Gen.map (fun i -> Set i) Gen.nat;
]))

let init_state = 0
let init_sut () = Accumulator.make 0
let cleanup _ = ()

let next_state c s =
match c with Incr -> s + 1 | Decr -> s - 1 | Get -> s | Set v -> v

let precond _ _ = true

let run c d =
let open STM in
match c with
| Incr -> Res (unit, Accumulator.incr d)
| Decr -> Res (unit, Accumulator.decr d)
| Get -> Res (int, Accumulator.get d)
| Set v -> Res (unit, Accumulator.set d v)

let postcond c (s : state) res =
let open STM in
match (c, res) with
| Incr, Res ((Unit, _), ()) -> true
| Decr, Res ((Unit, _), ()) -> true
| Set _, Res ((Unit, _), ()) -> true
| Get, Res ((Int, _), res) -> res = s
| _, _ -> false
end

module Seq = STM_sequential.Make (Spec)
module Dom = STM_domain.Make (Spec)

let () =
let count = 1000 in
QCheck_base_runner.run_tests ~verbose:true
[
Seq.agree_test ~count ~name:"STM Accumulator_test test sequential";
Dom.agree_test_par ~count ~name:"STM Accumulator_test test parallel";
]
|> exit
12 changes: 11 additions & 1 deletion test/kcas_data/dune
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
(tests
(names
accumulator_test
dllist_test
hashtbl_test
lru_cache_example
mvar_test
queue_test
stack_test
xt_test)
(libraries alcotest kcas kcas_data domain_shims)
(libraries
alcotest
kcas
kcas_data
domain_shims
qcheck-core
qcheck-core.runner
qcheck-stm.stm
qcheck-stm.sequential
qcheck-stm.domain)
(package kcas_data))

0 comments on commit 5fc0665

Please sign in to comment.