Skip to content

Commit

Permalink
Add skiplist with generalized size computation
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Nov 16, 2023
1 parent bc3ddce commit 6d90696
Show file tree
Hide file tree
Showing 16 changed files with 564 additions and 214 deletions.
4 changes: 3 additions & 1 deletion .ocamlformat
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
profile = default
version = 0.26.0
version = 0.26.1

exp-grouping=preserve
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ clean:
dune clean

bench:
@dune exec -- ./bench/main.exe
@dune exec --release -- ./bench/main.exe
4 changes: 2 additions & 2 deletions bench/bench_skiplist.ml
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
open Saturn

let workload num_elems num_threads add remove =
let sl = Skiplist.create () in
let sl = Skiplist.create ~compare:Int.compare () in
let elems = Array.init num_elems (fun _ -> Random.int 10000) in
let push () =
Domain.spawn (fun () ->
let start_time = Unix.gettimeofday () in
for i = 0 to (num_elems - 1) / num_threads do
Domain.cpu_relax ();
let prob = Random.float 1.0 in
if prob < add then Skiplist.add sl (Random.int 10000) |> ignore
if prob < add then Skiplist.add sl (Random.int 10000) () |> ignore
else if prob >= add && prob < add +. remove then
Skiplist.remove sl (Random.int 10000) |> ignore
else Skiplist.mem sl elems.(i) |> ignore
Expand Down
1 change: 1 addition & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
(depends
(ocaml (>= 4.12))
(domain_shims (>= 0.1.0))
(multicore-magic (>= 2.0.0))
(qcheck (and (>= 0.18.1) :with-test))
(qcheck-stm (and (>= 0.2) :with-test))
(qcheck-alcotest (and (>= 0.18.1) :with-test))
Expand Down
1 change: 1 addition & 0 deletions saturn_lockfree.opam
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ depends: [
"dune" {>= "3.2"}
"ocaml" {>= "4.12"}
"domain_shims" {>= "0.1.0"}
"multicore-magic" {>= "2.0.0"}
"qcheck" {>= "0.18.1" & with-test}
"qcheck-stm" {>= "0.2" & with-test}
"qcheck-alcotest" {>= "0.18.1" & with-test}
Expand Down
14 changes: 14 additions & 0 deletions src_lockfree/bits.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
let ceil_pow_2_minus_1 n =
let n = n lor (n lsr 1) in
let n = n lor (n lsr 2) in
let n = n lor (n lsr 4) in
let n = n lor (n lsr 8) in
let n = n lor (n lsr 16) in
if Sys.int_size > 32 then n lor (n lsr 32) else n

let ceil_pow_2 n =
if n <= 1 then 1
else
let n = n - 1 in
let n = ceil_pow_2_minus_1 n in
n + 1
7 changes: 7 additions & 0 deletions src_lockfree/bits.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
(** *)

val ceil_pow_2_minus_1 : int -> int
(** *)

val ceil_pow_2 : int -> int
(** *)
2 changes: 1 addition & 1 deletion src_lockfree/dune
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(library
(name saturn_lockfree)
(public_name saturn_lockfree)
(libraries domain_shims))
(libraries domain_shims multicore-magic))
158 changes: 158 additions & 0 deletions src_lockfree/size.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
module Snapshot = struct
type t = int Atomic.t array

let zero = [| Atomic.make 0 |]
let collecting = Int.min_int
let computing = -1

let[@inline] is_collecting (s : t) =
Atomic.get (Array.unsafe_get s 0) = collecting

let create n = Array.init n @@ fun _ -> Atomic.make collecting

let add s i after =
let snap = Array.unsafe_get s i in
while
let before = Atomic.get snap in
before < after && not (Atomic.compare_and_set snap before after)
do
()
done

let rec compute s decrs incrs i =
if i < Array.length s then
compute s
(decrs + Atomic.get (Array.unsafe_get s i))
(incrs + Atomic.get (Array.unsafe_get s (i + 1)))
(i + 2)
else incrs - decrs

let get s =
let status = Array.unsafe_get s 0 in
if Atomic.get status = collecting then
Atomic.compare_and_set status collecting computing |> ignore;
if Atomic.get status = computing then begin
let computed = compute s 0 0 1 in
if Atomic.get status = computing then
Atomic.compare_and_set status computing computed |> ignore
end;
Atomic.get status
end

type t = Obj.t Atomic.t array

let[@inline] get_current_snapshot (t : t) : Snapshot.t =
Obj.magic (Atomic.get (Array.unsafe_get t 0))

let[@inline] cas_current_snapshot (t : t) (before : Snapshot.t)
(after : Snapshot.t) =
Atomic.compare_and_set (Array.unsafe_get t 0) (Obj.repr before)
(Obj.repr after)

(* *)

let n_way_max = Domain.recommended_domain_count () |> Bits.ceil_pow_2
let n_way_default = n_way_max |> Int.min 8

let create ?n_way () =
let n_way =
match n_way with
| None -> n_way_default
| Some n_way -> n_way |> Int.min n_way_max |> Bits.ceil_pow_2
in
Array.init ((n_way * 2) + 1) @@ fun i ->
Atomic.make (if i = 0 then Obj.repr Snapshot.zero else Obj.repr 0)
|> Multicore_magic.copy_as_padded

(* *)

type _ state =
| Open : { mutable index : int } -> [ `Open ] state
| Used : [ `Used ] state

type once = Once : _ state -> once [@@unboxed]

let used_index = 0
let get_index (Open r) = r.index
let use_index (Open r) = r.index <- used_index

(* *)

let used_once = Once Used

(* *)

let new_once t ~incr =
let mask = Array.length t - 2 in
(* TODO: Consider hashing the domain id. *)
let index = ((Domain.self () :> int) * 2 land mask) + 1 + Bool.to_int incr in
Once (Open { index })

(* *)

type tx = { counter : Obj.t Atomic.t; value : int; once : [ `Open ] state }

let finish (t : t) tx =
(* At this point the [tx.counter] already has the [tx]. Before updating the
[tx.counter] we need to make make sure the [tx.once] will not be used
again. *)
let index = get_index tx.once in
if index != used_index then use_index tx.once;
(* At this point the [add_once] is essentially done. To free memory, we do
one more [compare_and_set]. *)
Atomic.compare_and_set tx.counter (Obj.repr tx) (Obj.repr tx.value) |> ignore;
if index != used_index then
let snapshot = get_current_snapshot t in
if Snapshot.is_collecting snapshot then
(* A size computation is running concurrently. We need to forward the
update. *)
Snapshot.add snapshot index tx.value

let rec update_once (t : t) once =
let index = get_index once in
if index != used_index then
let counter = Array.unsafe_get t index in
let counter_state = Atomic.get counter in
if index = get_index once then
if Obj.is_int counter_state then begin
let value = Obj.magic counter_state + 1 in
let tx = { counter; value; once } in
if Atomic.compare_and_set counter counter_state (Obj.repr tx) then
finish t tx
else update_once t once
end
else begin
finish t (Obj.magic counter_state);
update_once t once
end

let update_once t once =
match once with
| Once Used -> ()
| Once (Open _ as once) -> update_once t once

(* *)

let rec get_collecting_snapshot t =
let before = get_current_snapshot t in
if Snapshot.is_collecting before then before
else
let after = Snapshot.create (Array.length t) in
if cas_current_snapshot t before after then after
else get_collecting_snapshot t

let rec collect t snapshot i =
if i < Array.length t then begin
let after =
let counter_state = Atomic.get (Array.unsafe_get t i) in
if Obj.is_int counter_state then Obj.magic counter_state
else (Obj.magic counter_state).value
in
Snapshot.add snapshot i after;
collect t snapshot (i + 1)
end

let get t =
let snapshot = get_collecting_snapshot t in
collect t snapshot 1;
Snapshot.get snapshot
22 changes: 22 additions & 0 deletions src_lockfree/size.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
(** *)

type t
(** *)

val create : ?n_way:int -> unit -> t
(** *)

type once
(** *)

val used_once : once
(** *)

val new_once : t -> incr:bool -> once
(** *)

val update_once : t -> once -> unit
(** *)

val get : t -> int
(** *)
Loading

0 comments on commit 6d90696

Please sign in to comment.