diff --git a/src/kcas/kcas.ml b/src/kcas/kcas.ml index 799e12cc..5bded5f4 100644 --- a/src/kcas/kcas.ml +++ b/src/kcas/kcas.ml @@ -671,97 +671,135 @@ module Xt = struct validate_one which node_r.loc node_r.state; validate_all_rec which node_r.gt - let[@inline] maybe_validate_log (Xt xt_r as xt : _ t) = - let c0 = xt_r.validate_counter in - let c1 = c0 + 1 in - xt_r.validate_counter <- c1; - (* Validate whenever counter reaches next power of 2. *) - if c0 land c1 = 0 then begin - Timeout.check xt_r.timeout; - validate_all_rec xt !(tree_as_ref xt) - end - let[@inline] is_obstruction_free (Xt xt_r : _ t) loc = (* Fenceless is safe as we are accessing a private location. *) xt_r.mode == `Obstruction_free && 0 <= loc.id - let[@inline] update_new loc f xt lt gt = - (* Fenceless is safe inside transactions as each log update has a fence. *) + type (_, _) up = + | Compare_and_swap : ('a * 'a, 'a) up + | Fetch_and_add : (int, int) up + | Fn : ('a -> 'a, 'a) up + | Exchange : ('a, 'a) up + | Get : (unit, 'a) up + + let update_new : type c a. _ -> a loc -> c -> (c, a) up -> _ -> _ -> a = + fun xt loc c up lt gt -> let state = fenceless_get (as_atomic loc) in let before = eval state in - match f before with - | after -> - let state = - if before == after && is_obstruction_free xt loc then state - else { before; after; which = W xt; awaiters = [] } - in - tree_as_ref xt := T (Node { loc; state; lt; gt; awaiters = [] }); - before - | exception exn -> - tree_as_ref xt := T (Node { loc; state; lt; gt; awaiters = [] }); - raise exn + let after : a = + match up with + | Compare_and_swap -> if fst c == before then snd c else before + | Fetch_and_add -> before + c + | Fn -> begin + let rot = !(tree_as_ref xt) in + match c before with + | after -> + assert (rot == !(tree_as_ref xt)); + after + | exception exn -> + assert (rot == !(tree_as_ref xt)); + tree_as_ref xt := T (Node { loc; state; lt; gt; awaiters = [] }); + raise exn + end + | Exchange -> c + | Get -> before + in + let state = + if before == after && is_obstruction_free xt loc then state + else { before; after; which = W xt; awaiters = [] } + in + tree_as_ref xt := T (Node { loc; state; lt; gt; awaiters = [] }); + before + + let update_old : type c a. _ -> a loc -> c -> (c, a) up -> _ -> _ -> _ -> a = + fun (Xt xt_r as xt : _ t) loc c up lt gt state' -> + let c0 = xt_r.validate_counter in + let c1 = c0 + 1 in + xt_r.validate_counter <- c1; + (* Validate whenever counter reaches next power of 2. - let[@inline] update_top loc f xt state' lt gt = - let state = Obj.magic state' in + The assumption is that potentially infinite loops will repeatedly access + the same locations. *) + if c0 land c1 = 0 then begin + Timeout.check xt_r.timeout; + validate_all_rec xt !(tree_as_ref xt) + end; + let state : a state = Obj.magic state' in if is_cmp xt state then begin - let before = eval state in - let after = f before in + let current = eval state in + let after : a = + match up with + | Compare_and_swap -> if fst c == current then snd c else current + | Fetch_and_add -> current + c + | Fn -> + let rot = !(tree_as_ref xt) in + let after = c current in + assert (rot == !(tree_as_ref xt)); + after + | Exchange -> c + | Get -> current + in let state = - if before == after then state - else { before; after; which = W xt; awaiters = [] } + if current == after then state + else { before = current; after; which = W xt; awaiters = [] } in tree_as_ref xt := T (Node { loc; state; lt; gt; awaiters = [] }); - before + current end else let current = state.after in - let state = { state with after = f current } in + let after : a = + match up with + | Compare_and_swap -> if fst c == current then snd c else current + | Fetch_and_add -> current + c + | Fn -> + let rot = !(tree_as_ref xt) in + let after = c current in + assert (rot == !(tree_as_ref xt)); + after + | Exchange -> c + | Get -> current + in + let state = + if current == after then state + else { before = state.before; after; which = W xt; awaiters = [] } + in tree_as_ref xt := T (Node { loc; state; lt; gt; awaiters = [] }); current - let[@inline] unsafe_update ~xt loc f = + let update_as ~xt loc c up = let loc = Loc.to_loc loc in - maybe_validate_log xt; let x = loc.id in match !(tree_as_ref xt) with - | T Leaf -> update_new loc f xt (T Leaf) (T Leaf) + | T Leaf -> update_new xt loc c up (T Leaf) (T Leaf) | T (Node { loc = a; lt = T Leaf; _ }) as tree when x < a.id -> - update_new loc f xt (T Leaf) tree + update_new xt loc c up (T Leaf) tree | T (Node { loc = a; gt = T Leaf; _ }) as tree when a.id < x -> - update_new loc f xt tree (T Leaf) + update_new xt loc c up tree (T Leaf) | T (Node { loc = a; state; lt; gt; _ }) when Obj.magic a == loc -> - update_top loc f xt state lt gt + update_old xt loc c up lt gt state | tree -> begin match splay ~hit_parent:false x tree with - | l, T Leaf, r -> update_new loc f xt l r - | l, T (Node node_r), r -> update_top loc f xt node_r.state l r + | l, T Leaf, r -> update_new xt loc c up l r + | l, T (Node node_r), r -> update_old xt loc c up l r node_r.state end - let[@inline] protect xt f x = - let tree = !(tree_as_ref xt) in - let y = f x in - assert (!(tree_as_ref xt) == tree); - y - - let get ~xt loc = unsafe_update ~xt loc Fun.id - let set ~xt loc after = unsafe_update ~xt loc (fun _ -> after) |> ignore - let modify ~xt loc f = unsafe_update ~xt loc (protect xt f) |> ignore + let get ~xt loc = update_as ~xt loc () Get + let set ~xt loc after = update_as ~xt loc after Exchange |> ignore + let modify ~xt loc f = update_as ~xt loc f Fn |> ignore let compare_and_swap ~xt loc before after = - unsafe_update ~xt loc (fun actual -> - if actual == before then after else actual) + update_as ~xt loc (before, after) Compare_and_swap let compare_and_set ~xt loc before after = compare_and_swap ~xt loc before after == before - let exchange ~xt loc after = unsafe_update ~xt loc (fun _ -> after) - let fetch_and_add ~xt loc n = unsafe_update ~xt loc (( + ) n) - let incr ~xt loc = unsafe_update ~xt loc inc |> ignore - let decr ~xt loc = unsafe_update ~xt loc dec |> ignore - let update ~xt loc f = unsafe_update ~xt loc (protect xt f) + let exchange ~xt loc after = update_as ~xt loc after Exchange + let fetch_and_add ~xt loc n = update_as ~xt loc n Fetch_and_add + let incr ~xt loc = update_as ~xt loc 1 Fetch_and_add |> ignore + let decr ~xt loc = update_as ~xt loc (-1) Fetch_and_add |> ignore + let update ~xt loc f = update_as ~xt loc f Fn let swap ~xt l1 l2 = set ~xt l1 @@ exchange ~xt l2 @@ get ~xt l1 - let unsafe_modify ~xt loc f = unsafe_update ~xt loc f |> ignore - let unsafe_update ~xt loc f = unsafe_update ~xt loc f let[@inline] to_blocking ~xt tx = match tx ~xt with None -> Retry.later () | Some value -> value @@ -889,7 +927,7 @@ module Xt = struct else stop | T (Node _) as stop -> stop - let initial_validate_period = 16 + let initial_validate_period = 4 let success (Xt xt_r : _ t) result = Timeout.cancel xt_r.timeout; diff --git a/src/kcas/kcas.mli b/src/kcas/kcas.mli index 2061c9ee..185a2c23 100644 --- a/src/kcas/kcas.mli +++ b/src/kcas/kcas.mli @@ -415,10 +415,11 @@ module Xt : sig To mitigate potential issues due to this read skew anomaly and due to very long running transactions, all of the access recording operations in this - section periodically validate the entire transaction log. An important - guideline for writing transactions is that loops inside a transaction - should always include an access of some shared memory location through the - transaction log or should otherwise be guaranteed to be bounded. *) + section periodically validate the entire transaction log when a previously + accessed location is accessed again. An important guideline for writing + transactions is that loops inside a transaction should always include an + access of some shared memory location through the transaction log or + should otherwise be guaranteed to be bounded. *) val get : xt:'x t -> 'a Loc.t -> 'a (** [get ~xt r] returns the current value of the shared memory location [r] in @@ -558,14 +559,4 @@ module Xt : sig The default {{!Mode.t} [mode]} for [commit] is [`Obstruction_free]. However, after enough attempts have failed during the verification step, [commit] switches to [`Lock_free]. *) - - (**/**) - - val unsafe_modify : xt:'x t -> 'a Loc.t -> ('a -> 'a) -> unit - (** [unsafe_modify ~xt r f] is equivalent to [modify ~xt r f], but does not - assert against misuse. *) - - val unsafe_update : xt:'x t -> 'a Loc.t -> ('a -> 'a) -> 'a - (** [unsafe_update ~xt r f] is equivalent to [update ~xt r f], but does not - assert against misuse. *) end diff --git a/src/kcas_data/hashtbl.ml b/src/kcas_data/hashtbl.ml index 8cfa8f21..c094714a 100644 --- a/src/kcas_data/hashtbl.ml +++ b/src/kcas_data/hashtbl.ml @@ -219,7 +219,7 @@ module Xt = struct Array.unsafe_get old_buckets i |> Xt.get ~xt |> Assoc.iter_rev @@ fun k v -> - Xt.unsafe_modify ~xt + Xt.modify ~xt (Array.unsafe_get new_buckets (hash k land mask)) (Assoc.cons k v) done @@ -337,7 +337,7 @@ module Xt = struct let buckets = r.buckets in let mask = Array.length buckets - 1 in let bucket = Array.unsafe_get buckets (r.hash k land mask) in - match Xt.unsafe_modify ~xt bucket (Assoc.remove r.equal k) with + match Xt.modify ~xt bucket (Assoc.remove r.equal k) with | () -> Accumulator.Xt.decr ~xt r.length; if r.min_buckets <= mask && Random.bits () land mask = 0 then @@ -353,7 +353,7 @@ module Xt = struct let buckets = r.buckets in let mask = Array.length buckets - 1 in let bucket = Array.unsafe_get buckets (r.hash k land mask) in - Xt.unsafe_modify ~xt bucket (Assoc.cons k v); + Xt.modify ~xt bucket (Assoc.cons k v); Accumulator.Xt.incr ~xt r.length; if mask + 1 < r.max_buckets && Random.bits () land mask = 0 then let capacity = mask + 1 in @@ -367,7 +367,7 @@ module Xt = struct let mask = Array.length buckets - 1 in let bucket = Array.unsafe_get buckets (r.hash k land mask) in let change = ref Assoc.Nop in - Xt.unsafe_modify ~xt bucket (fun kvs -> + Xt.modify ~xt bucket (fun kvs -> let kvs' = Assoc.replace r.equal change k v kvs in if !change != Assoc.Nop then kvs' else kvs); if !change == Assoc.Added then begin diff --git a/src/kcas_data/mvar.ml b/src/kcas_data/mvar.ml index 7276d15c..ba31e192 100644 --- a/src/kcas_data/mvar.ml +++ b/src/kcas_data/mvar.ml @@ -11,14 +11,13 @@ module Xt = struct Magic_option.is_none (Xt.compare_and_swap ~xt mv Magic_option.none (Magic_option.some value)) - let put ~xt mv value = - Xt.unsafe_modify ~xt mv (Magic_option.put_or_retry value) + let put ~xt mv value = Xt.modify ~xt mv (Magic_option.put_or_retry value) let take_opt ~xt mv = Magic_option.to_option (Xt.exchange ~xt mv Magic_option.none) let take ~xt mv = - Magic_option.get_unsafe (Xt.unsafe_update ~xt mv Magic_option.take_or_retry) + Magic_option.get_unsafe (Xt.update ~xt mv Magic_option.take_or_retry) let peek ~xt mv = Magic_option.get_or_retry (Xt.get ~xt mv) let peek_opt ~xt mv = Magic_option.to_option (Xt.get ~xt mv) diff --git a/src/kcas_data/queue.ml b/src/kcas_data/queue.ml index ec0fc8c1..543bb0c7 100644 --- a/src/kcas_data/queue.ml +++ b/src/kcas_data/queue.ml @@ -34,7 +34,7 @@ module Xt = struct + Elems.length (Xt.get ~xt middle) + Elems.length (Xt.get ~xt back) - let add ~xt x q = Xt.unsafe_modify ~xt q.back @@ Elems.cons x + let add ~xt x q = Xt.modify ~xt q.back @@ Elems.cons x let push = add (** Cooperative helper to move elems from back to middle. *) @@ -53,7 +53,7 @@ module Xt = struct let take_opt ~xt t = let front = t.front in - let elems = Xt.unsafe_update ~xt front Elems.tl_safe in + let elems = Xt.update ~xt front Elems.tl_safe in if elems != Elems.empty then Elems.hd_opt elems else let middle = t.middle and back = t.back in diff --git a/src/kcas_data/stack.ml b/src/kcas_data/stack.ml index a42abe29..c4d0170a 100644 --- a/src/kcas_data/stack.ml +++ b/src/kcas_data/stack.ml @@ -9,13 +9,10 @@ let of_seq xs = Loc.make ~padded:true (Elems.of_seq_rev xs) module Xt = struct let length ~xt s = Xt.get ~xt s |> Elems.length let is_empty ~xt s = Xt.get ~xt s == Elems.empty - let push ~xt x s = Xt.unsafe_modify ~xt s @@ Elems.cons x - let pop_opt ~xt s = Xt.unsafe_update ~xt s Elems.tl_safe |> Elems.hd_opt + let push ~xt x s = Xt.modify ~xt s @@ Elems.cons x + let pop_opt ~xt s = Xt.update ~xt s Elems.tl_safe |> Elems.hd_opt let pop_all ~xt s = Elems.to_seq @@ Xt.exchange ~xt s Elems.empty - - let pop_blocking ~xt s = - Xt.unsafe_update ~xt s Elems.tl_safe |> Elems.hd_or_retry - + let pop_blocking ~xt s = Xt.update ~xt s Elems.tl_safe |> Elems.hd_or_retry let top_opt ~xt s = Xt.get ~xt s |> Elems.hd_opt let top_blocking ~xt s = Xt.get ~xt s |> Elems.hd_or_retry let clear ~xt s = Xt.set ~xt s Elems.empty