diff --git a/MODULE.bazel b/MODULE.bazel index dc7e2f2e..dabf401e 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -72,7 +72,7 @@ erlang_package.git_package( erlang_package.hex_package( name = "seshat", - version = "0.4.0", + version = "0.6.0", ) use_repo( diff --git a/Makefile b/Makefile index 1170f63f..444281e6 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,7 @@ ESCRIPT_EMU_ARGS = -noinput -setcookie ra_fifo_cli dep_gen_batch_server = hex 0.8.8 dep_aten = hex 0.5.8 -dep_seshat = hex 0.4.0 +dep_seshat = hex 0.6.0 DEPS = aten gen_batch_server seshat TEST_DEPS = proper meck eunit_formatters inet_tcp_proxy diff --git a/src/ra.erl b/src/ra.erl index 14c9a84c..286bb46b 100644 --- a/src/ra.erl +++ b/src/ra.erl @@ -78,7 +78,8 @@ cast_aux_command/2, register_external_log_reader/1, member_overview/1, - member_overview/2 + member_overview/2, + key_metrics/1 ]). %% xref should pick these up @@ -1107,6 +1108,43 @@ member_overview(ServerId) -> member_overview(ServerId, Timeout) -> ra_server_proc:local_state_query(ServerId, overview, Timeout). +%% @doc Returns a map of key metrics about a Ra member +%% +%% The keys and values may vary depending on what state +%% the member is in. This function will never call into the +%% Ra process itself so is likely to return swiftly even +%% when the Ra process is busy (such as when it is recovering) +%% +%% @param ServerId the Ra server to obtain key metrics for +%% @end +key_metrics({Name, N} = ServerId) when N == node() -> + Fields = [last_applied, + commit_index, + snapshot_index, + last_written_index, + last_index, + commit_latency, + term], + Counters = case ra_counters:counters(ServerId, Fields) of + undefined -> + #{}; + C -> C + end, + case whereis(Name) of + undefined -> + Counters#{state => noproc}; + _ -> + case ets:lookup(ra_state, Name) of + [] -> + Counters#{state => unknown}; + [{_, State}] -> + Counters#{state => State} + end + end; +key_metrics({_, N} = ServerId) -> + erpc:call(N, ?MODULE, ?FUNCTION_NAME, [ServerId]). + + %% internal -spec usr(UserCommand, ReplyMode) -> Command when diff --git a/src/ra.hrl b/src/ra.hrl index e706fc5a..08d5c5b3 100644 --- a/src/ra.hrl +++ b/src/ra.hrl @@ -269,6 +269,7 @@ -define(C_RA_SRV_TERM_AND_VOTED_FOR_UPDATES, ?C_RA_LOG_RESERVED + 18). -define(C_RA_SRV_LOCAL_QUERIES, ?C_RA_LOG_RESERVED + 19). -define(C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, ?C_RA_LOG_RESERVED + 20). +-define(C_RA_SRV_RESERVED, ?C_RA_LOG_RESERVED + 21). -define(RA_SRV_COUNTER_FIELDS, @@ -312,7 +313,38 @@ {local_queries, ?C_RA_SRV_LOCAL_QUERIES, counter, "Total number of local queries"}, {invalid_reply_mode_commands, ?C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, counter, - "Total number of commands received with an invalid reply-mode"} + "Total number of commands received with an invalid reply-mode"}, + {reserved_2, ?C_RA_SRV_RESERVED, counter, "Reserved counter"} ]). --define(RA_COUNTER_FIELDS, ?RA_LOG_COUNTER_FIELDS ++ ?RA_SRV_COUNTER_FIELDS). +-define(C_RA_SVR_METRIC_LAST_APPLIED, ?C_RA_SRV_RESERVED + 1). +-define(C_RA_SVR_METRIC_COMMIT_INDEX, ?C_RA_SRV_RESERVED + 2). +-define(C_RA_SVR_METRIC_SNAPSHOT_INDEX, ?C_RA_SRV_RESERVED + 3). +-define(C_RA_SVR_METRIC_LAST_INDEX, ?C_RA_SRV_RESERVED + 4). +-define(C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, ?C_RA_SRV_RESERVED + 5). +-define(C_RA_SVR_METRIC_COMMIT_LATENCY, ?C_RA_SRV_RESERVED + 6). +-define(C_RA_SVR_METRIC_TERM, ?C_RA_SRV_RESERVED + 7). + +-define(RA_SRV_METRICS_COUNTER_FIELDS, + [ + {last_applied, ?C_RA_SVR_METRIC_LAST_APPLIED, gauge, + "The last applied index. Can go backwards if a ra server is restarted."}, + {commit_index, ?C_RA_SVR_METRIC_COMMIT_INDEX, counter, + "The current commit index."}, + {snapshot_index, ?C_RA_SVR_METRIC_SNAPSHOT_INDEX, counter, + "The current snapshot index."}, + {last_index, ?C_RA_SVR_METRIC_LAST_INDEX, counter, + "The last index of the log."}, + {last_written_index, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, counter, + "The last fully written and fsynced index of the log."}, + {commit_latency, ?C_RA_SVR_METRIC_COMMIT_LATENCY, gauge, + "Approximate time taken from an entry being written to the log until it is committed."}, + {term, ?C_RA_SVR_METRIC_TERM, counter, "The current term."} + ]). + +-define(RA_COUNTER_FIELDS, + ?RA_LOG_COUNTER_FIELDS ++ + ?RA_SRV_COUNTER_FIELDS ++ + ?RA_SRV_METRICS_COUNTER_FIELDS). + +-define(FIELDSPEC_KEY, ra_seshat_fields_spec). diff --git a/src/ra_counters.erl b/src/ra_counters.erl index 42252b9a..ea73c7d2 100644 --- a/src/ra_counters.erl +++ b/src/ra_counters.erl @@ -5,6 +5,7 @@ %% Copyright (c) 2017-2022 VMware, Inc. or its affiliates. All rights reserved. %% -module(ra_counters). +-include("ra.hrl"). -export([ init/0, @@ -12,25 +13,24 @@ fetch/1, overview/0, overview/1, + counters/2, delete/1 ]). -type name() :: term(). --type seshat_field_spec() :: - {Name :: atom(), Position :: pos_integer(), - Type :: counter | gauge, Description :: string()}. + -spec init() -> ok. init() -> _ = application:ensure_all_started(seshat), _ = seshat:new_group(ra), + persistent_term:put(?FIELDSPEC_KEY, ?RA_COUNTER_FIELDS), ok. --spec new(name(), [seshat_field_spec()]) -> +-spec new(name(), seshat:fields_spec()) -> counters:counters_ref(). -new(Name, Fields) - when is_list(Fields) -> - seshat:new(ra, Name, Fields). +new(Name, FieldsSpec) -> + seshat:new(ra, Name, FieldsSpec). -spec fetch(name()) -> undefined | counters:counters_ref(). fetch(Name) -> @@ -47,3 +47,8 @@ overview() -> -spec overview(name()) -> #{atom() => non_neg_integer()}. overview(Name) -> seshat:overview(ra, Name). + +-spec counters(name(), [atom()]) -> + #{atom() => non_neg_integer()} | undefined. +counters(Name, Fields) -> + seshat:counters(ra, Name, Fields). diff --git a/src/ra_log.erl b/src/ra_log.erl index f7df25f8..d8371396 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -200,8 +200,10 @@ init(#{uid := UId, reader = Reader, snapshot_state = SnapshotState }, - + put_counter(Cfg, ?C_RA_SVR_METRIC_SNAPSHOT_INDEX, SnapIdx), LastIdx = State000#?MODULE.last_index, + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, LastIdx), + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, LastIdx), % recover the last term {LastTerm0, State00} = case LastIdx of SnapIdx -> @@ -374,7 +376,8 @@ last_written(#?MODULE{last_written_index_term = LWTI}) -> %% forces the last index and last written index back to a prior index -spec set_last_index(ra_index(), state()) -> {ok, state()} | {not_found, state()}. -set_last_index(Idx, #?MODULE{cache = Cache0, +set_last_index(Idx, #?MODULE{cfg = Cfg, + cache = Cache0, last_written_index_term = {LWIdx0, _}} = State0) -> case fetch_term(Idx, State0) of {undefined, State} -> @@ -385,6 +388,8 @@ set_last_index(Idx, #?MODULE{cache = Cache0, %% this should always be found but still assert just in case true = LWTerm =/= undefined, Cache = ra_log_cache:set_last(Idx, Cache0), + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx), + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, LWIdx), {ok, State2#?MODULE{last_index = Idx, last_term = Term, cache = Cache, @@ -401,7 +406,8 @@ handle_event({written, {FromIdx, _ToIdx, _Term}}, %% Just drop the event in this case as it is stale {State, []}; handle_event({written, {FromIdx, ToIdx0, Term}}, - #?MODULE{last_written_index_term = {LastWrittenIdx0, + #?MODULE{cfg = Cfg, + last_written_index_term = {LastWrittenIdx0, LastWrittenTerm0}, last_index = LastIdx, snapshot_state = SnapState} = State0) @@ -416,6 +422,7 @@ handle_event({written, {FromIdx, ToIdx0, Term}}, ToIdx = min(ToIdx0, LastIdx), case fetch_term(ToIdx, State0) of {Term, State} when is_integer(Term) -> + ok = put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, ToIdx), {State#?MODULE{last_written_index_term = {ToIdx, Term}}, %% delaying truncate_cache until the next event allows any entries %% that became committed to be read from cache rather than ETS @@ -426,8 +433,10 @@ handle_event({written, {FromIdx, ToIdx0, Term}}, % followers returning appending the entry and the leader committing % and processing a snapshot before the written event comes in. % ensure last_written_index_term does not go backwards - LastWrittenIdxTerm = {max(LastWrittenIdx0, ToIdx), + LastWrittenIdx = max(LastWrittenIdx0, ToIdx), + LastWrittenIdxTerm = {LastWrittenIdx, max(LastWrittenTerm0, Term)}, + ok = put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, LastWrittenIdx), {State#?MODULE{last_written_index_term = LastWrittenIdxTerm}, [{next_event, {ra_log_event, {truncate_cache, FromIdx, ToIdx}}}]}; {OtherTerm, State} -> @@ -491,13 +500,15 @@ handle_event({segments, Tid, NewSegs}, {State, log_update_effects(Readers, Pid, State)} end; handle_event({snapshot_written, {SnapIdx, _} = Snap}, - #?MODULE{first_index = FstIdx, + #?MODULE{cfg = Cfg, + first_index = FstIdx, snapshot_state = SnapState0} = State0) %% only update snapshot if it is newer than the last snapshot when SnapIdx >= FstIdx -> % delete any segments outside of first_index {State, Effects0} = delete_segments(SnapIdx, State0), SnapState = ra_snapshot:complete_snapshot(Snap, SnapState0), + put_counter(Cfg, ?C_RA_SVR_METRIC_SNAPSHOT_INDEX, SnapIdx), %% delete old snapshot files %% This is done as an effect %% so that if an old snapshot is still being replicated @@ -573,6 +584,9 @@ install_snapshot({SnapIdx, _} = IdxTerm, SnapState, #?MODULE{cfg = Cfg, cache = Cache} = State0) -> ok = incr_counter(Cfg, ?C_RA_LOG_SNAPSHOTS_INSTALLED, 1), + ok = put_counter(Cfg, ?C_RA_SVR_METRIC_SNAPSHOT_INDEX, SnapIdx), + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, SnapIdx), + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, SnapIdx), {State, Effs} = delete_segments(SnapIdx, State0), {State#?MODULE{snapshot_state = SnapState, first_index = SnapIdx + 1, @@ -837,6 +851,7 @@ wal_truncate_write(#?MODULE{cfg = #cfg{uid = UId, % and that prior entries should be considered stale ok = ra_log_wal:truncate_write({UId, self()}, Wal, Idx, Term, Cmd), ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1), + ok = put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx), State#?MODULE{last_index = Idx, last_term = Term, cache = ra_log_cache:add(Entry, Cache)}. @@ -847,6 +862,7 @@ wal_write(#?MODULE{cfg = #cfg{uid = UId, case ra_log_wal:write({UId, self()}, Wal, Idx, Term, Cmd) of ok -> ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1), + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx), State#?MODULE{last_index = Idx, last_term = Term, cache = ra_log_cache:add(Entry, Cache)}; {error, wal_down} -> @@ -869,6 +885,7 @@ wal_write_batch(#?MODULE{cfg = #cfg{uid = UId, case ra_log_wal:write_batch(Wal, lists:reverse(WalCommands)) of ok -> ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, Num), + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, LastIdx), State#?MODULE{last_index = LastIdx, last_term = LastTerm, cache = Cache}; @@ -1030,6 +1047,11 @@ incr_counter(#cfg{counter = Cnt}, Ix, N) when Cnt =/= undefined -> incr_counter(#cfg{counter = undefined}, _Ix, _N) -> ok. +put_counter(#cfg{counter = Cnt}, Ix, N) when Cnt =/= undefined -> + counters:put(Cnt, Ix, N); +put_counter(#cfg{counter = undefined}, _Ix, _N) -> + ok. + server_data_dir(Dir, UId) -> Me = ra_lib:to_list(UId), filename:join(Dir, Me). diff --git a/src/ra_server.erl b/src/ra_server.erl index 1ac231d3..e69bb923 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -324,6 +324,9 @@ init(#{id := Id, max_append_entries_rpc_batch_size = MaxAERBatchSize, counter = maps:get(counter, Config, undefined), system_config = SystemConfig}, + put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_INDEX, CommitIndex), + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_APPLIED, SnapshotIdx), + put_counter(Cfg, ?C_RA_SVR_METRIC_TERM, CurrentTerm), #{cfg => Cfg, current_term => CurrentTerm, @@ -350,19 +353,21 @@ init(#{id := Id, recover(#{cfg := #cfg{log_id = LogId, machine_version = MacVer, - effective_machine_version = EffMacVer}, + effective_machine_version = EffMacVer} = Cfg, commit_index := CommitIndex, last_applied := LastApplied} = State0) -> + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_APPLIED, LastApplied), ?DEBUG("~ts: recovering state machine version ~b:~b from index ~b to ~b", [LogId, EffMacVer, MacVer, LastApplied, CommitIndex]), Before = erlang:system_time(millisecond), {#{log := Log0, cfg := #cfg{effective_machine_version = EffMacVerAfter}} = State, _} = apply_to(CommitIndex, - fun(E, S0) -> + fun({Idx, _, _} = E, S0) -> %% Clear out the effects and notifies map %% to avoid memory explosion {Mod, LastAppl, S, MacSt, _E, _N, LastTs} = apply_with(E, S0), + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_APPLIED, Idx), {Mod, LastAppl, S, MacSt, [], #{}, LastTs} end, State0, []), @@ -372,6 +377,7 @@ recover(#{cfg := #cfg{log_id = LogId, [LogId, EffMacVerAfter, MacVer, LastApplied, CommitIndex, After - Before]), %% disable segment read cache by setting random access pattern Log = ra_log:release_resources(1, random, Log0), + put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_LATENCY, 0), State#{log => Log, %% reset commit latency as recovery may calculate a very old value commit_latency => 0}. @@ -969,6 +975,7 @@ handle_follower(#append_entries_rpc{term = Term, current_term := CurTerm}) when Term >= CurTerm -> ok = incr_counter(Cfg, ?C_RA_SRV_AER_RECEIVED_FOLLOWER, 1), + ok = put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_INDEX, LeaderCommit), %% this is a valid leader, append entries message Effects0 = [{record_leader_msg, LeaderId}], State0 = update_term(Term, State00#{leader_id => LeaderId, @@ -1129,15 +1136,14 @@ handle_follower(#request_vote_rpc{term = Term, candidate_id = Cand, [LogId, Cand, {LLIdx, LLTerm}, Term, CurTerm]), Reply = #request_vote_result{term = Term, vote_granted = true}, State = update_term_and_voted_for(Term, Cand, State1), - {follower, State#{voted_for => Cand, current_term => Term}, - [{reply, Reply}]}; + {follower, State, [{reply, Reply}]}; false -> ?INFO("~ts: declining vote for ~w for term ~b," " candidate last log index term was: ~w~n" " last log entry idxterm seen was: ~w", [LogId, Cand, Term, {LLIdx, LLTerm}, {LastIdxTerm}]), Reply = #request_vote_result{term = Term, vote_granted = false}, - {follower, State1#{current_term => Term}, [{reply, Reply}]} + {follower, update_term(Term, State1), [{reply, Reply}]} end; handle_follower(#request_vote_rpc{term = Term, candidate_id = Candidate}, State = #{current_term := CurTerm, @@ -1189,8 +1195,8 @@ handle_follower(#install_snapshot_rpc{term = Term, SnapState0 = ra_log:snapshot_state(Log0), {ok, SS} = ra_snapshot:begin_accept(Meta, SnapState0), Log = ra_log:set_snapshot_state(SS, Log0), - {receive_snapshot, State0#{log => Log, - leader_id => LeaderId}, + {receive_snapshot, update_term(Term, State0#{log => Log, + leader_id => LeaderId}), [{next_event, Rpc}, {record_leader_msg, LeaderId}]}; handle_follower(#request_vote_result{}, State) -> %% handle to avoid logging as unhandled @@ -1262,19 +1268,19 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term, end, {#{cluster := ClusterIds}, MacState} = ra_log:recover_snapshot(Log), - State = State0#{cfg => Cfg, - log => Log, - current_term => Term, - commit_index => SnapIndex, - last_applied => SnapIndex, - cluster => make_cluster(Id, ClusterIds), - machine_state => MacState}, + State = update_term(Term, + State0#{cfg => Cfg, + log => Log, + commit_index => SnapIndex, + last_applied => SnapIndex, + cluster => make_cluster(Id, ClusterIds), + machine_state => MacState}), %% it was the last snapshot chunk so we can revert back to %% follower status {follower, persist_last_applied(State), [{reply, Reply} | Effs]}; next -> Log = ra_log:set_snapshot_state(SnapState, Log0), - State = State0#{log => Log}, + State = update_term(Term, State0#{log => Log}), {receive_snapshot, State, [{reply, Reply}]} end; handle_receive_snapshot({ra_log_event, Evt}, @@ -2144,6 +2150,7 @@ update_term_and_voted_for(Term, VotedFor, #{cfg := #cfg{uid = UId} = Cfg, ok = ra_log_meta:store(MetaName, UId, current_term, Term), ok = ra_log_meta:store_sync(MetaName, UId, voted_for, VotedFor), incr_counter(Cfg, ?C_RA_SRV_TERM_AND_VOTED_FOR_UPDATES, 1), + put_counter(Cfg, ?C_RA_SVR_METRIC_TERM, Term), reset_query_index(State#{current_term => Term, voted_for => VotedFor}) end. @@ -2237,7 +2244,7 @@ apply_to(ApplyTo, ApplyFun, Notifys0, Effects0, #{last_applied := LastApplied, cfg := #cfg{machine_version = MacVer, effective_machine_module = MacMod, - effective_machine_version = EffMacVer}, + effective_machine_version = EffMacVer} = Cfg, machine_state := MacState0, log := Log0} = State0) when ApplyTo > LastApplied andalso MacVer >= EffMacVer -> @@ -2257,6 +2264,8 @@ apply_to(ApplyTo, ApplyFun, Notifys0, Effects0, %% due to machine versioning all entries may not have been applied %% FinalEffs = make_notify_effects(Notifys, lists:reverse(Effects)), + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_APPLIED, AppliedTo), + put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_LATENCY, CommitLatency), {State#{last_applied => AppliedTo, log => Log, commit_latency => CommitLatency, @@ -2549,12 +2558,14 @@ append_entries_reply(Term, Success, State = #{log := Log}) -> last_index = LWIdx, last_term = LWTerm}. -evaluate_quorum(#{commit_index := CI0} = State0, Effects0) -> +evaluate_quorum(#{cfg := Cfg, + commit_index := CI0} = State0, Effects0) -> % TODO: shortcut function if commit index was not incremented State = #{commit_index := CI} = increment_commit_index(State0), Effects = case CI > CI0 of true -> + put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_INDEX, CI), [{aux, eval} | Effects0]; false -> Effects0 @@ -2799,6 +2810,11 @@ incr_counter(#cfg{counter = Cnt}, Ix, N) when Cnt =/= undefined -> incr_counter(#cfg{counter = undefined}, _Ix, _N) -> ok. +put_counter(#cfg{counter = Cnt}, Ix, N) when Cnt =/= undefined -> + counters:put(Cnt, Ix, N); +put_counter(#cfg{counter = undefined}, _Ix, _N) -> + ok. + meta_name(#cfg{system_config = #{names := #{log_meta := Name}}}) -> Name; meta_name(#{names := #{log_meta := Name}}) -> diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index e13ec1b8..49e8e4d9 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -270,6 +270,8 @@ init(Config) -> do_init(#{id := Id, cluster_name := ClusterName} = Config0) -> + Key = ra_lib:ra_server_id_to_local_name(Id), + true = ets:insert(ra_state, {Key, init}), process_flag(trap_exit, true), Config = #{counter := Counter, system_config := SysConf} = maps:merge(config_defaults(Id), @@ -283,7 +285,6 @@ do_init(#{id := Id, UId = ra_server:uid(ServerState), % ensure ra_directory has the new pid #{names := Names} = SysConf, - Key = ra_lib:ra_server_id_to_local_name(Id), ok = ra_directory:register_name(Names, UId, self(), maps:get(parent, Config, undefined), Key, ClusterName), @@ -1527,7 +1528,7 @@ config_defaults(ServerId) -> install_snap_rpc_timeout => ?INSTALL_SNAP_RPC_TIMEOUT, await_condition_timeout => ?DEFAULT_AWAIT_CONDITION_TIMEOUT, initial_members => [], - counter => ra_counters:new(ServerId, ?RA_COUNTER_FIELDS), + counter => ra_counters:new(ServerId, {persistent_term, ?FIELDSPEC_KEY}), system_config => ra_system:default_config() }. diff --git a/test/coordination_SUITE.erl b/test/coordination_SUITE.erl index 8a96c012..0ecf9e42 100644 --- a/test/coordination_SUITE.erl +++ b/test/coordination_SUITE.erl @@ -42,7 +42,8 @@ all_tests() -> local_log_effect, leaderboard, bench, - disconnected_node_catches_up + disconnected_node_catches_up, + key_metrics ]. groups() -> @@ -381,6 +382,76 @@ disconnected_node_catches_up(Config) -> [ok = slave:stop(S) || {_, S} <- ServerIds], ok. +key_metrics(Config) -> + PrivDir = ?config(data_dir, Config), + ClusterName = ?config(cluster_name, Config), + ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]], + Machine = {module, ?MODULE, #{}}, + {ok, Started, []} = ra:start_cluster(?SYS, ClusterName, Machine, ServerIds), + {ok, _, Leader} = ra:members(hd(Started)), + + Data = crypto:strong_rand_bytes(1024), + [begin + ok = ra:pipeline_command(Leader, {data, Data}) + end || _ <- lists:seq(1, 10000)], + {ok, _, _} = ra:process_command(Leader, {data, Data}), + + timer:sleep(100), + TestId = lists:last(Started), + ok = ra:stop_server(?SYS, TestId), + StoppedMetrics = ra:key_metrics(TestId), + ct:pal("StoppedMetrics ~p", [StoppedMetrics]), + ?assertMatch(#{state := noproc, + last_applied := LA, + last_written_index := LW, + commit_index := CI} + when LA > 0 andalso + LW > 0 andalso + CI > 0, + StoppedMetrics), + ok = ra:restart_server(?SYS, TestId), + await_condition( + fun () -> + Metrics = ra:key_metrics(TestId), + ct:pal("RecoverMetrics ~p", [Metrics]), + recover == maps:get(state, Metrics) + end, 200), + {ok, _, _} = ra:process_command(Leader, {data, Data}), + await_condition( + fun () -> + Metrics = ra:key_metrics(TestId), + ct:pal("FollowerMetrics ~p", [Metrics]), + follower == maps:get(state, Metrics) + end, 200), + [begin + M = ra:key_metrics(S), + ct:pal("Metrics ~p", [M]), + ?assertMatch(#{state := _, + last_applied := LA, + last_written_index := LW, + commit_index := CI} + when LA > 0 andalso + LW > 0 andalso + CI > 0, M) + end + || S <- Started], + ok = ra:transfer_leadership(Leader, TestId), + timer:sleep(1000), + [begin + M = ra:key_metrics(S), + ct:pal("Metrics ~p", [M]), + ?assertMatch(#{state := _, + last_applied := LA, + last_written_index := LW, + commit_index := CI} + when LA > 0 andalso + LW > 0 andalso + CI > 0, M) + end || S <- Started], + + [ok = slave:stop(S) || {_, S} <- ServerIds], + ok. + leaderboard(Config) -> PrivDir = ?config(data_dir, Config), @@ -537,6 +608,8 @@ apply(#{index := Idx}, {do_local_log, SenderPid, Opts}, State) -> end, {local, node(SenderPid)}}, {State, ok, [Eff]}; +apply(#{index := _Idx}, {data, _}, State) -> + {State, ok, []}; apply(#{index := Idx}, _Cmd, State) -> {State, ok, [{release_cursor, Idx, State}]}.