Skip to content

Commit

Permalink
Use Multicore_magic.instantaneous_domain_index in Accumulator
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Dec 8, 2023
1 parent 854a41d commit 88bf537
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 62 deletions.
4 changes: 2 additions & 2 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
(domain-local-timeout
(>= 1.0.0))
(multicore-magic
(>= 2.0.0))
(>= 2.1.0))
(domain_shims
(and
(>= 0.1.0)
Expand Down Expand Up @@ -65,7 +65,7 @@
(kcas
(= :version))
(multicore-magic
(>= 2.0.0))
(>= 2.1.0))
(domain-local-await
(and
(>= 1.0.0)
Expand Down
5 changes: 4 additions & 1 deletion kcas.opam
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ depends: [
"backoff" {>= "0.1.0"}
"domain-local-await" {>= "1.0.0"}
"domain-local-timeout" {>= "1.0.0"}
"multicore-magic" {>= "2.0.0"}
"multicore-magic" {>= "2.1.0"}
"domain_shims" {>= "0.1.0" & with-test}
"alcotest" {>= "1.7.0" & with-test}
"mdx" {>= "2.3.0" & with-test}
Expand All @@ -40,3 +40,6 @@ build: [
]
dev-repo: "git+https://github.com/ocaml-multicore/kcas.git"
doc: "https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/"
pin-depends: [
[ "multicore-magic.dev" "git+https://github.com/ocaml-multicore/multicore-magic#dd9d8928fba7ab5dedef612a96bf01bd17cbd275" ]
]
3 changes: 3 additions & 0 deletions kcas.opam.template
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
doc: "https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/"
pin-depends: [
[ "multicore-magic.dev" "git+https://github.com/ocaml-multicore/multicore-magic#dd9d8928fba7ab5dedef612a96bf01bd17cbd275" ]
]
2 changes: 1 addition & 1 deletion kcas_data.opam
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ bug-reports: "https://github.com/ocaml-multicore/kcas/issues"
depends: [
"dune" {>= "3.8"}
"kcas" {= version}
"multicore-magic" {>= "2.0.0"}
"multicore-magic" {>= "2.1.0"}
"domain-local-await" {>= "1.0.0" & with-test}
"domain_shims" {>= "0.1.0" & with-test}
"mtime" {>= "2.0.0" & with-test}
Expand Down
58 changes: 33 additions & 25 deletions src/kcas_data/accumulator.ml
Original file line number Diff line number Diff line change
@@ -1,40 +1,48 @@
open Kcas

let n_way_max = Domain.recommended_domain_count () |> Bits.ceil_pow_2
let n_way_default = n_way_max |> Int.min 8

type t = int Loc.t array

let make ?n_way n =
let n_way =
match n_way with
| None -> n_way_default
| Some n_way -> n_way |> Int.min n_way_max |> Bits.ceil_pow_2
(** TODO:
- Limit array length to [Domain.recommended_domain_count ()].
- CPUs do not necessarily have power of two number of cores. *)

type t = { mutable cache : int Loc.t array; truth : int Loc.t array Loc.t }

let make n =
let cs = Loc.make_array ~padded:true ~mode:Mode.lock_free 1 0 in
Loc.set (Array.unsafe_get cs 0) n;
let truth = Loc.make ~padded:true cs in
Multicore_magic.copy_as_padded { cache = cs; truth }

let[@inline never] get_self a i cs n =
let add_cs = Loc.make_array ~padded:true ~mode:Mode.lock_free n 0 in
let new_cs =
Array.init (n * 2) @@ fun i ->
if i < n then Array.unsafe_get add_cs i else Array.unsafe_get cs (i - n)
in
let a = Loc.make_array ~padded:true ~mode:Mode.lock_free n_way 0 in
Loc.set (Array.unsafe_get a 0) n;
a

let n_way_of = Array.length
if Loc.compare_and_set a.truth cs new_cs then a.cache <- new_cs;
Array.unsafe_get cs (i land (n - 1))

let get_self a =
let h = (Domain.self () :> int) in
(* TODO: Consider mixing the bits of [h] to get better distribution *)
Array.unsafe_get a (h land (Array.length a - 1))
let[@inline] get_self a =
let i = Multicore_magic.instantaneous_domain_index () in
let cs = a.cache in
let n = Array.length cs in
if i < n then Array.unsafe_get cs i else get_self a i cs n

module Xt = struct
let add ~xt a n = if n <> 0 then Xt.fetch_and_add ~xt (get_self a) n |> ignore
let incr ~xt a = Xt.incr ~xt (get_self a)
let decr ~xt a = Xt.decr ~xt (get_self a)

let rec get ~xt a s i =
let s = s + Xt.get ~xt (Array.unsafe_get a i) in
if i = 0 then s else get ~xt a s (i - 1)
let rec get_rec ~xt cs s i =
let s = s + Xt.get ~xt (Array.unsafe_get cs i) in
if i = 0 then s else get_rec ~xt cs s (i - 1)

let get ~xt a =
let i = Array.length a - 1 in
let s = Xt.get ~xt (Array.unsafe_get a i) in
if i = 0 then s else get ~xt a s (i - 1)
let cs = Xt.get ~xt a.truth in
let cs_old = a.cache in
if cs != cs_old then a.cache <- cs;
let i = Array.length cs - 1 in
let s = Xt.get ~xt (Array.unsafe_get cs i) in
if i = 0 then s else get_rec ~xt cs s (i - 1)

let set ~xt a n = add ~xt a (n - get ~xt a)
end
Expand Down
15 changes: 2 additions & 13 deletions src/kcas_data/accumulator.mli
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,8 @@ open Kcas
type t
(** The type of a scalable accumulator. *)

val make : ?n_way:int -> int -> t
(** [make n] returns a new accumulator whose initial value is [n].
The optional [n_way] argument can be used to specify a desired level of
parallelism, i.e. maximum number of non-interfering parallel updates. The
default value is chosen to strike a balance between scalability and memory
use and a given value may be adjusted by the implementation. *)

val n_way_of : t -> int
(** [n_way_of a] returns the maximum number of non-interfering parallel updates
supported by the accumulator [a].
{b NOTE}: The returned value may not be the same as given to {!make}. *)
val make : int -> t
(** [make n] returns a new accumulator whose initial value is [n]. *)

(** {1 Compositional interface} *)

Expand Down
17 changes: 7 additions & 10 deletions src/kcas_data/hashtbl.ml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ module HashedType = struct
hash == HashedType.hash && equal == HashedType.equal
end

let create ?hashed_type ?min_buckets ?max_buckets ?n_way () =
let create ?hashed_type ?min_buckets ?max_buckets () =
let min_buckets =
match min_buckets with
| None -> min_buckets_default
Expand All @@ -144,13 +144,12 @@ let create ?hashed_type ?min_buckets ?max_buckets ?n_way () =
| Some hashed_type -> HashedType.unpack hashed_type
and pending = Nothing
and buckets = Loc.make_array min_buckets Assoc.Nil
and length = Accumulator.make ?n_way 0 in
and length = Accumulator.make 0 in
Loc.set t
(Multicore_magic.copy_as_padded
{ pending; length; buckets; hash; equal; min_buckets; max_buckets });
t

let n_way_of t = Accumulator.n_way_of (Loc.get t).length
let min_buckets_of t = (Loc.get t).min_buckets
let max_buckets_of t = (Loc.get t).max_buckets

Expand Down Expand Up @@ -444,12 +443,12 @@ let to_seq t =
let to_seq_keys t = to_seq t |> Seq.map fst
let to_seq_values t = to_seq t |> Seq.map snd

let of_seq ?hashed_type ?min_buckets ?max_buckets ?n_way xs =
let t = create ?hashed_type ?min_buckets ?max_buckets ?n_way () in
let of_seq ?hashed_type ?min_buckets ?max_buckets xs =
let t = create ?hashed_type ?min_buckets ?max_buckets () in
Seq.iter (fun (k, v) -> replace t k v) xs;
t

let rebuild ?hashed_type ?min_buckets ?max_buckets ?n_way t =
let rebuild ?hashed_type ?min_buckets ?max_buckets t =
let record = ref (Obj.magic ()) and length = ref 0 in
let snapshot = snapshot ~length ~record t in
let r = !record in
Expand All @@ -462,8 +461,6 @@ let rebuild ?hashed_type ?min_buckets ?max_buckets ?n_way t =
match max_buckets with
| None -> Int.max min_buckets r.max_buckets
| Some c -> Int.max min_buckets c |> Int.min hi_buckets |> Bits.ceil_pow_2
and n_way =
match n_way with None -> Accumulator.n_way_of r.length | Some n -> n
in
let is_same_hashed_type =
match hashed_type with
Expand All @@ -474,14 +471,14 @@ let rebuild ?hashed_type ?min_buckets ?max_buckets ?n_way t =
let t = Loc.make ~padded:true (Obj.magic ()) in
let pending = Nothing
and buckets = Array.map Loc.make snapshot
and length = Accumulator.make ~n_way length in
and length = Accumulator.make length in
Loc.set t
@@ Multicore_magic.copy_as_padded
{ r with pending; length; buckets; min_buckets; max_buckets };
t
end
else
let t = create ?hashed_type ~min_buckets ~max_buckets ~n_way () in
let t = create ?hashed_type ~min_buckets ~max_buckets () in
snapshot |> Array.iter (Assoc.iter_rev (add t));
t

Expand Down
10 changes: 0 additions & 10 deletions src/kcas_data/hashtbl.mli
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ val create :
?hashed_type:'k hashed_type ->
?min_buckets:int ->
?max_buckets:int ->
?n_way:int ->
unit ->
('k, 'v) t
(** [create ()] returns a new empty hash table.
Expand All @@ -55,8 +54,6 @@ val create :
- The default [max_buckets] is the minimum of [1 lsl 30] and suitably
adjusted [Sys.max_array_length] and a given [max_buckets] may be adjusted
by the implementation.
- The [n_way] argument is passed to the internal {!Accumulator} used to keep
track of the number of bindings.
Hash tables are automatically internally resized. *)

Expand All @@ -76,16 +73,10 @@ val max_buckets_of : ('k, 'v) t -> int
{b NOTE}: The returned value may not be the same as given to {!create}. *)

val n_way_of : ('k, 'v) t -> int
(** [n_way_of t] returns the maximum number of non-interfering parallel updates
allowed by the internal {!Accumulator} used to keep track of the number of
bindings in the hash table [t]. *)

val of_seq :
?hashed_type:'k hashed_type ->
?min_buckets:int ->
?max_buckets:int ->
?n_way:int ->
('k * 'v) Seq.t ->
('k, 'v) t
(** [of_seq assoc] creates a new hash table from the given association sequence
Expand Down Expand Up @@ -138,7 +129,6 @@ val rebuild :
?hashed_type:'k hashed_type ->
?min_buckets:int ->
?max_buckets:int ->
?n_way:int ->
('k, 'v) t ->
('k, 'v) t
(** [rebuild t] returns a copy of the given hash table [t] optionally rehashing
Expand Down

0 comments on commit 88bf537

Please sign in to comment.