Skip to content

Commit

Permalink
counters
Browse files Browse the repository at this point in the history
  • Loading branch information
kjnilsson committed Oct 16, 2024
1 parent a4daa6d commit 856cec8
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 246 deletions.
10 changes: 5 additions & 5 deletions src/ra.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,11 @@
{read_ops, ?C_RA_LOG_READ_OPS, counter,
"Total number of read ops"},
{read_cache, ?C_RA_LOG_READ_CACHE, counter,
"Total number of cache reads"},
{read_open_mem_tbl, ?C_RA_LOG_READ_OPEN_MEM_TBL, counter,
"Total number of opened memory tables"},
"Unused. Total number of cache reads"},
{read_mem_table, ?C_RA_LOG_READ_MEM_TBL, counter,
"Total number of reads from mem tables"},
{read_closed_mem_tbl, ?C_RA_LOG_READ_CLOSED_MEM_TBL, counter,
"Total number of closed memory tables"},
"Unused. Total number of closed memory tables"},
{read_segment, ?C_RA_LOG_READ_SEGMENT, counter,
"Total number of read segments"},
{fetch_term, ?C_RA_LOG_FETCH_TERM, counter,
Expand All @@ -269,7 +269,7 @@
-define(C_RA_LOG_WRITE_RESENDS, 2).
-define(C_RA_LOG_READ_OPS, 3).
-define(C_RA_LOG_READ_CACHE, 4).
-define(C_RA_LOG_READ_OPEN_MEM_TBL, 5).
-define(C_RA_LOG_READ_MEM_TBL, 5).
-define(C_RA_LOG_READ_CLOSED_MEM_TBL, 6).
-define(C_RA_LOG_READ_SEGMENT, 7).
-define(C_RA_LOG_FETCH_TERM, 8).
Expand Down
51 changes: 20 additions & 31 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -351,17 +351,23 @@ append({Idx, _, _}, #?MODULE{last_index = LastIdx}) ->
-spec write(Entries :: [log_entry()], State :: state()) ->
{ok, state()} |
{error, {integrity_error, term()} | wal_down}.
write([{FstIdx, _, _} = _First | _Rest] = Entries,
#?MODULE{last_index = LastIdx,
snapshot_state = _SnapState} = State00)
write([{FstIdx, _, _} | _Rest] = Entries,
#?MODULE{cfg = Cfg,
last_index = LastIdx,
mem_table = Mt0} = State0)
when FstIdx =< LastIdx + 1 andalso
FstIdx >= 0 ->
write_entries(Entries, State00);
case stage_entries(Cfg, Entries, Mt0) of
{ok, Mt} ->
wal_write_batch(State0#?MODULE{mem_table = Mt}, Entries);
Error ->
Error
end;
write([], State) ->
{ok, State};
write([{Idx, _, _} | _], #?MODULE{cfg = #cfg{uid = UId},
last_index = LastIdx}) ->
Msg = lists:flatten(io_lib:format("~p: ra_log:write/2 "
Msg = lists:flatten(io_lib:format("~s: ra_log:write/2 "
"tried writing ~b - expected ~b",
[UId, Idx, LastIdx+1])),
{error, {integrity_error, Msg}}.
Expand Down Expand Up @@ -396,7 +402,7 @@ fold(From0, To0, Fun, Acc0,
{Reader, Acc1} = ra_log_reader:fold(F, T, Fun, Acc0, Reader0),
Acc = ra_log_memtbl:fold(CF, CT, Fun, Acc1, Mt),
NumRead = CT - CF + 1,
ok = incr_counter(Cfg, ?C_RA_LOG_READ_CACHE, NumRead),
ok = incr_counter(Cfg, ?C_RA_LOG_READ_MEM_TBL, NumRead),
{Acc, State#?MODULE{reader = Reader}}
end;
fold(_From, _To, _Fun, Acc, State) ->
Expand Down Expand Up @@ -426,7 +432,7 @@ sparse_read(Indexes0, #?MODULE{cfg = Cfg,
%% drop any indexes that are larger than the last index available
Indexes2 = lists:dropwhile(fun (I) -> I > LastIdx end, Indexes1),
{Entries0, CacheNumRead, Indexes} = ra_log_memtbl:get_items(Indexes2, Mt),
ok = incr_counter(Cfg, ?C_RA_LOG_READ_CACHE, CacheNumRead),
ok = incr_counter(Cfg, ?C_RA_LOG_READ_MEM_TBL, CacheNumRead),
{Entries1, Reader} = ra_log_reader:sparse_read(Reader0, Indexes, Entries0),
%% here we recover the original order of indexes
Entries = case Sort of
Expand Down Expand Up @@ -1122,6 +1128,8 @@ wal_write_batch(#?MODULE{cfg = #cfg{uid = UId,
mem_table = Mt0} = State,
Entries) ->
WriterId = {UId, self()},
%% TODO: this isn't quite right, entries could theoretically be written to
%% different tids, although the way this is called currently prevents that
Tid = ra_log_memtbl:tid(Mt0),
{WalCommands, Num} =
lists:foldl(fun ({Idx, Term, Cmd0}, {WC, N}) ->
Expand All @@ -1132,18 +1140,18 @@ wal_write_batch(#?MODULE{cfg = #cfg{uid = UId,

[{_, _, _, LastIdx, LastTerm, _} | _] = WalCommands,
{_, Mt} = ra_log_memtbl:commit(Mt0),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, LastIdx),
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, Num),
case ra_log_wal:write_batch(Wal, lists:reverse(WalCommands)) of
{ok, Pid} ->
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, Num),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, LastIdx),
{ok, State#?MODULE{last_index = LastIdx,
last_term = LastTerm,
last_wal_write = {Pid, now_ms()},
mem_table = Mt}};
{error, wal_down} = Err ->
%% TODO: mt: if we get there the entry has already been inserted
%% if we get there the entry has already been inserted
%% into the mem table but never reached the wal
%% consider ra_log_memtbl:abort(Entries, Mt)
%% the resend logic will take care of that
Err
end.

Expand Down Expand Up @@ -1198,25 +1206,6 @@ resend_from0(Idx, #?MODULE{last_resend_time = {LastResend, WalPid},
State
end.

% verify_entries(_, []) ->
% ok;
% verify_entries(Idx, [{NextIdx, _, _} | Tail]) when Idx + 1 == NextIdx ->
% verify_entries(NextIdx, Tail);
% verify_entries(Idx, Tail) ->
% Msg = io_lib:format("ra_log:verify_entries/2 "
% "tried writing ~p - expected ~b",
% [Tail, Idx+1]),
% %% TODO mt: ra_log_memtbl:abort/1
% {error, {integrity_error, lists:flatten(Msg)}}.

write_entries(Entries, #?MODULE{cfg = Cfg, mem_table = Mt0} = State0) ->
case stage_entries(Cfg, Entries, Mt0) of
{ok, Mt} ->
wal_write_batch(State0#?MODULE{mem_table = Mt}, Entries);
Error ->
Error
end.

stage_entries(Cfg, [Entry | Rem] = Entries, Mt0) ->
case ra_log_memtbl:stage(Entry, Mt0) of
{ok, Mt} ->
Expand Down Expand Up @@ -1288,7 +1277,7 @@ recover_range(UId, MtRange, SegWriter) ->
end
end, [], SegFiles),
SegRanges = [{F, L} || {F, L, _} <- SegRefs],
Ranges = [MtRange | SegRanges], % OpenRanges ++ ClosedRanges ++ SegRanges,
Ranges = [MtRange | SegRanges],
{pick_range(Ranges, undefined), SegRefs}.

% picks the current range from a sorted (newest to oldest) list of ranges
Expand Down
192 changes: 5 additions & 187 deletions src/ra_log_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
-record(cfg, {uid :: ra_uid(),
counter :: undefined | counters:counters_ref(),
directory :: file:filename(),
% open_mem_tbls :: ets:tid(),
% closed_mem_tbls :: ets:tid(),
access_pattern = random :: access_pattern()
}).

Expand Down Expand Up @@ -156,33 +154,14 @@ segment_refs(#?STATE{segment_refs = SegmentRefs}) ->
num_open_segments(#?STATE{open_segments = Open}) ->
ra_flru:size(Open).

% mem_tbl_fold(_Tid, From, To, _Fun, Acc)
% when From > To ->
% Acc;
% mem_tbl_fold(Tid, From, To, Fun, Acc0) ->
% [Entry] = ets:lookup(Tid, From),
% Acc = Fun(Entry, Acc0),
% mem_tbl_fold(Tid, From+1, To, Fun, Acc).


-spec fold(ra_index(), ra_index(), fun(), term(), state()) ->
{state(), term()}.
fold(FromIdx, ToIdx, Fun, Acc,
#?STATE{cfg = #cfg{} = Cfg} = State)
#?STATE{cfg = #cfg{} = Cfg} = State0)
when ToIdx >= FromIdx ->
% Plan = read_plan(Cfg, FromIdx, ToIdx),
Plan = [{segments, FromIdx, ToIdx}],
lists:foldl(
fun
% {ets, Tid, CIx, From, To}, {S, Ac}) ->
% ok = incr_counter(Cfg, CIx, To - From + 1),
% {S, mem_tbl_fold(Tid, From, To, Fun, Ac)};
({segments, From, To}, {S, Ac}) ->
ok = incr_counter(Cfg, ?C_RA_LOG_READ_SEGMENT, To - From + 1),
segment_fold(S, From, To, Fun, Ac)
end, {State, Acc}, Plan);
fold(_FromIdx, _ToIdx, _Fun, Acc,
#?STATE{} = State) ->
ok = incr_counter(Cfg, ?C_RA_LOG_READ_SEGMENT, ToIdx - FromIdx + 1),
segment_fold(State0, FromIdx, ToIdx, Fun, Acc);
fold(_FromIdx, _ToIdx, _Fun, Acc, #?STATE{} = State) ->
{State, Acc}.

-spec sparse_read(state(), [ra_index()], [log_entry()]) ->
Expand All @@ -191,63 +170,11 @@ sparse_read(#?STATE{cfg = #cfg{} = Cfg} = State, Indexes, Entries0) ->
{Open, _, SegC, Entries} = (catch segment_sparse_read(State, Indexes, Entries0)),
ok = incr_counter(Cfg, ?C_RA_LOG_READ_SEGMENT, SegC),
{Entries, State#?MODULE{open_segments = Open}}.
% try open_mem_tbl_sparse_read(Cfg, Indexes0, Entries0) of
% {Entries1, OpenC, []} ->
% ok = incr_counter(Cfg, ?C_RA_LOG_READ_OPEN_MEM_TBL, OpenC),
% {Entries1, State};
% {Entries1, OpenC, Rem1} ->
% ok = incr_counter(Cfg, ?C_RA_LOG_READ_OPEN_MEM_TBL, OpenC),
% try closed_mem_tbl_sparse_read(Cfg, Rem1, Entries1) of
% {Entries2, ClosedC, []} ->
% ok = incr_counter(Cfg, ?C_RA_LOG_READ_CLOSED_MEM_TBL, ClosedC),
% {Entries2, State};
% {Entries2, ClosedC, Rem2} ->
% ok = incr_counter(Cfg, ?C_RA_LOG_READ_CLOSED_MEM_TBL, ClosedC),
% {Open, _, SegC, Entries} = (catch segment_sparse_read(State, Rem2, Entries2)),
% ok = incr_counter(Cfg, ?C_RA_LOG_READ_SEGMENT, SegC),
% {Entries, State#?MODULE{open_segments = Open}}
% catch _:_ ->
% sparse_read(State, Indexes0, Entries0)
% end
% catch _:_ ->
% %% table was most likely concurrently deleted
% %% try again
% %% TODO: avoid infinite loop
% sparse_read(State, Indexes0, Entries0)
% end.

-spec fetch_term(ra_index(), state()) -> {option(ra_index()), state()}.
fetch_term(Idx, #?STATE{cfg = #cfg{} = Cfg} = State0) ->
incr_counter(Cfg, {?C_RA_LOG_FETCH_TERM, 1}),
incr_counter(Cfg, ?C_RA_LOG_FETCH_TERM, 1),
segment_term_query(Idx, State0).
% case ets:lookup(OpenTbl, UId) of
% [{_, From, To, Tid}] when Idx >= From andalso Idx =< To ->
% Term = ets:lookup_element(Tid, Idx, 2),
% {Term, State0};
% _ ->
% case closed_mem_table_term_query(ClosedTbl, Idx, UId) of
% undefined ->
% segment_term_query(Idx, State0);
% Term ->
% {Term, State0}
% end
% end.

% -spec delete_closed_mem_table_object(state(), term()) -> true.
% delete_closed_mem_table_object(#?STATE{cfg =
% #cfg{closed_mem_tbls = Tbl}}, Id) ->
% true = ets:delete_object(Tbl, Id).

% -spec closed_mem_tables(state()) -> list().
% closed_mem_tables(#?STATE{cfg = #cfg{uid = UId,
% closed_mem_tbls = Tbl}}) ->
% closed_mem_tables(Tbl, UId).

% -spec open_mem_table_lookup(state()) -> list().
% open_mem_table_lookup(#?STATE{cfg = #cfg{uid = UId,
% open_mem_tbls = Tbl}}) ->
% ets:lookup(Tbl, UId).


%% LOCAL

Expand Down Expand Up @@ -289,76 +216,6 @@ range_overlap(F, L, S, E)
range_overlap(F, L, _, _) ->
{undefined, F, L}.

% read_plan(#cfg{uid = UId,
% open_mem_tbls = OpenTbl,
% closed_mem_tbls = ClosedTbl
% },
% FromIdx, ToIdx) ->
% Acc0 = case ets:lookup(OpenTbl, UId) of
% [{_, TStart, TEnd, Tid}] ->
% case range_overlap(FromIdx, ToIdx, TStart, TEnd) of
% {undefined, _, _} ->
% {FromIdx, ToIdx, []};
% {S, E, F, T} ->
% {F, T,
% [{ets, Tid, ?C_RA_LOG_READ_OPEN_MEM_TBL, S, E}]}
% end;
% _ ->
% {FromIdx, ToIdx, []}
% end,

% {RemF, RemL, Plan} =
% case closed_mem_tables(ClosedTbl, UId) of
% [] ->
% Acc0;
% Tables ->
% lists:foldl(
% fun({_, _, S, E, Tid}, {F, T, Plan} = Acc) ->
% case range_overlap(F, T, S, E) of
% {undefined, _, _} ->
% Acc;
% {S1, E1, F1, T1} ->
% {F1, T1,
% [{ets, Tid, ?C_RA_LOG_READ_CLOSED_MEM_TBL, S1, E1}
% | Plan]}
% end
% end, Acc0, Tables)
% end,
% case RemF =< RemL of
% true ->
% [{segments, RemF, RemL} | Plan];
% false ->
% Plan
% end.

% open_mem_tbl_sparse_read(#cfg{uid = UId,
% open_mem_tbls = OpenTbl},
% Indexes, Acc0) ->
% case ets:lookup(OpenTbl, UId) of
% [{_, TStart, TEnd, Tid}] ->
% mem_tbl_sparse_read(Indexes, TStart, TEnd, Tid, 0, Acc0);
% [] ->
% {Acc0, 0, Indexes}
% end.

% closed_mem_tbl_sparse_read(#cfg{uid = UId,
% closed_mem_tbls = ClosedTbl}, Indexes, Acc0) ->
% case closed_mem_tables(ClosedTbl, UId) of
% [] ->
% {Acc0, 0, Indexes};
% Tables ->
% lists:foldl(fun({_, _, TblSt, TblEnd, Tid}, {Ac, Num, Idxs}) ->
% mem_tbl_sparse_read(Idxs, TblSt, TblEnd, Tid, Num, Ac)
% end, {Acc0, 0, Indexes}, Tables)
% end.

% mem_tbl_sparse_read([I | Rem], TblStart, TblEnd, Tid, C, Entries0)
% when I >= TblStart andalso I =< TblEnd ->
% [Entry] = ets:lookup(Tid, I),
% mem_tbl_sparse_read(Rem, TblStart, TblEnd, Tid, C + 1, [Entry | Entries0]);
% mem_tbl_sparse_read(Rem, _TblStart, _TblEnd, _Tid, C, Entries0) ->
% {Entries0, C, Rem}.

segrefs_to_read(From0, To0, _SegRefs, Acc)
when To0 < From0 ->
Acc;
Expand Down Expand Up @@ -466,11 +323,6 @@ incr_counter(#cfg{counter = Cnt}, Ix, N) when Cnt =/= undefined ->
incr_counter(#cfg{counter = undefined}, _, _) ->
ok.

incr_counter(#cfg{counter = Cnt}, {Ix, N}) when Cnt =/= undefined ->
counters:add(Cnt, Ix, N);
incr_counter(#cfg{counter = undefined}, _) ->
ok.

decr_counter(#cfg{counter = Cnt}, Ix, N) when Cnt =/= undefined ->
counters:sub(Cnt, Ix, N);
decr_counter(#cfg{counter = undefined}, _, _) ->
Expand Down Expand Up @@ -498,40 +350,6 @@ range_overlap_test() ->
% {7, 10} = range_overlap(7, 10, 5, 30),
ok.

% read_plan_test() ->
% UId = <<"this_uid">>,
% OTbl = ra_log_open_mem_tables,
% OpnTbl = ets:new(OTbl, []),
% CTbl = ra_log_closed_mem_tables,
% ClsdTbl = ets:new(CTbl, [bag]),
% M1 = erlang:unique_integer([monotonic, positive]),
% M2 = erlang:unique_integer([monotonic, positive]),

% true = ets:insert(OpnTbl, {UId, 75, 111, OTbl}),
% true = ets:insert(ClsdTbl, {UId, M2, 50, 176, CTbl}),
% true = ets:insert(ClsdTbl, {UId, M1, 25, 49, CTbl}),
% %% segments 0 - 24
% Cfg = #cfg{uid = UId,
% open_mem_tbls = OpnTbl,
% closed_mem_tbls = ClsdTbl},
% ?debugFmt("Read Plan: ~p~n", [read_plan(Cfg, 0, 100)]),
% ?assertMatch([{segments, 0, 24},
% {ets, _, _, 25, 49},
% {ets, _, _, 50, 74},
% {ets, _, _, 75, 100}],
% read_plan(Cfg, 0, 100)),

% ?debugFmt("Read Plan: ~p~n", [read_plan(Cfg, 10, 55)]),
% ?assertMatch([{segments, 10, 24},
% {ets, _, _, 25, 49},
% {ets, _, _, 50, 55}],
% read_plan(Cfg, 10, 55)),
% ?assertMatch([
% {ets, _, _, 79, 99}
% ],
% read_plan(Cfg, 79, 99)),
% ok.

segrefs_to_read_test() ->
SegRefs = [{412,499,"00000005.segment"},
{284,411,"00000004.segment"},
Expand Down
Loading

0 comments on commit 856cec8

Please sign in to comment.