Skip to content

Commit

Permalink
Seshat 0.6.0 and ra:key_metrics/1
Browse files Browse the repository at this point in the history
ra:key_metrics/1 will return key metrics, previously inserted into
the ra_metrics table (which remains for now). The aim is to allow
this function to always return and use counters instead to be able
to see progress during recovery.

ensure term is updated for snapshot installations
  • Loading branch information
kjnilsson committed Sep 5, 2023
1 parent 7c53d8e commit 5ce1e82
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 37 deletions.
2 changes: 1 addition & 1 deletion MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ erlang_package.git_package(

erlang_package.hex_package(
name = "seshat",
version = "0.4.0",
version = "0.6.0",
)

use_repo(
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ ESCRIPT_EMU_ARGS = -noinput -setcookie ra_fifo_cli

dep_gen_batch_server = hex 0.8.8
dep_aten = hex 0.5.8
dep_seshat = hex 0.4.0
dep_seshat = hex 0.6.0
DEPS = aten gen_batch_server seshat

TEST_DEPS = proper meck eunit_formatters inet_tcp_proxy
Expand Down
40 changes: 39 additions & 1 deletion src/ra.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@
cast_aux_command/2,
register_external_log_reader/1,
member_overview/1,
member_overview/2
member_overview/2,
key_metrics/1
]).

%% xref should pick these up
Expand Down Expand Up @@ -1107,6 +1108,43 @@ member_overview(ServerId) ->
member_overview(ServerId, Timeout) ->
ra_server_proc:local_state_query(ServerId, overview, Timeout).

%% @doc Returns a map of key metrics about a Ra member
%%
%% The keys and values may vary depending on what state
%% the member is in. This function will never call into the
%% Ra process itself so is likely to return swiftly even
%% when the Ra process is busy (such as when it is recovering)
%%
%% @param ServerId the Ra server to obtain key metrics for
%% @end
key_metrics({Name, N} = ServerId) when N == node() ->
Fields = [last_applied,
commit_index,
snapshot_index,
last_written_index,
last_index,
commit_latency,
term],
Counters = case ra_counters:counters(ServerId, Fields) of
undefined ->
#{};
C -> C
end,
case whereis(Name) of
undefined ->
Counters#{state => noproc};
_ ->
case ets:lookup(ra_state, Name) of
[] ->
Counters#{state => unknown};
[{_, State}] ->
Counters#{state => State}
end
end;
key_metrics({_, N} = ServerId) ->
erpc:call(N, ?MODULE, ?FUNCTION_NAME, [ServerId]).


%% internal

-spec usr(UserCommand, ReplyMode) -> Command when
Expand Down
36 changes: 34 additions & 2 deletions src/ra.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@
-define(C_RA_SRV_TERM_AND_VOTED_FOR_UPDATES, ?C_RA_LOG_RESERVED + 18).
-define(C_RA_SRV_LOCAL_QUERIES, ?C_RA_LOG_RESERVED + 19).
-define(C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, ?C_RA_LOG_RESERVED + 20).
-define(C_RA_SRV_RESERVED, ?C_RA_LOG_RESERVED + 21).


-define(RA_SRV_COUNTER_FIELDS,
Expand Down Expand Up @@ -312,7 +313,38 @@
{local_queries, ?C_RA_SRV_LOCAL_QUERIES, counter,
"Total number of local queries"},
{invalid_reply_mode_commands, ?C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, counter,
"Total number of commands received with an invalid reply-mode"}
"Total number of commands received with an invalid reply-mode"},
{reserved_2, ?C_RA_SRV_RESERVED, counter, "Reserved counter"}
]).

-define(RA_COUNTER_FIELDS, ?RA_LOG_COUNTER_FIELDS ++ ?RA_SRV_COUNTER_FIELDS).
-define(C_RA_SVR_METRIC_LAST_APPLIED, ?C_RA_SRV_RESERVED + 1).
-define(C_RA_SVR_METRIC_COMMIT_INDEX, ?C_RA_SRV_RESERVED + 2).
-define(C_RA_SVR_METRIC_SNAPSHOT_INDEX, ?C_RA_SRV_RESERVED + 3).
-define(C_RA_SVR_METRIC_LAST_INDEX, ?C_RA_SRV_RESERVED + 4).
-define(C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, ?C_RA_SRV_RESERVED + 5).
-define(C_RA_SVR_METRIC_COMMIT_LATENCY, ?C_RA_SRV_RESERVED + 6).
-define(C_RA_SVR_METRIC_TERM, ?C_RA_SRV_RESERVED + 7).

-define(RA_SRV_METRICS_COUNTER_FIELDS,
[
{last_applied, ?C_RA_SVR_METRIC_LAST_APPLIED, gauge,
"The last applied index. Can go backwards if a ra server is restarted."},
{commit_index, ?C_RA_SVR_METRIC_COMMIT_INDEX, counter,
"The current commit index."},
{snapshot_index, ?C_RA_SVR_METRIC_SNAPSHOT_INDEX, counter,
"The current snapshot index."},
{last_index, ?C_RA_SVR_METRIC_LAST_INDEX, counter,
"The last index of the log."},
{last_written_index, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, counter,
"The last fully written and fsynced index of the log."},
{commit_latency, ?C_RA_SVR_METRIC_COMMIT_LATENCY, gauge,
"Approximate time taken from an entry being written to the log until it is committed."},
{term, ?C_RA_SVR_METRIC_TERM, counter, "The current term."}
]).

-define(RA_COUNTER_FIELDS,
?RA_LOG_COUNTER_FIELDS ++
?RA_SRV_COUNTER_FIELDS ++
?RA_SRV_METRICS_COUNTER_FIELDS).

-define(FIELDSPEC_KEY, ra_seshat_fields_spec).
19 changes: 12 additions & 7 deletions src/ra_counters.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,32 @@
%% Copyright (c) 2017-2022 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(ra_counters).
-include("ra.hrl").

-export([
init/0,
new/2,
fetch/1,
overview/0,
overview/1,
counters/2,
delete/1
]).

-type name() :: term().
-type seshat_field_spec() ::
{Name :: atom(), Position :: pos_integer(),
Type :: counter | gauge, Description :: string()}.


-spec init() -> ok.
init() ->
_ = application:ensure_all_started(seshat),
_ = seshat:new_group(ra),
persistent_term:put(?FIELDSPEC_KEY, ?RA_COUNTER_FIELDS),
ok.

-spec new(name(), [seshat_field_spec()]) ->
-spec new(name(), seshat:fields_spec()) ->
counters:counters_ref().
new(Name, Fields)
when is_list(Fields) ->
seshat:new(ra, Name, Fields).
new(Name, FieldsSpec) ->
seshat:new(ra, Name, FieldsSpec).

-spec fetch(name()) -> undefined | counters:counters_ref().
fetch(Name) ->
Expand All @@ -47,3 +47,8 @@ overview() ->
-spec overview(name()) -> #{atom() => non_neg_integer()}.
overview(Name) ->
seshat:overview(ra, Name).

-spec counters(name(), [atom()]) ->
#{atom() => non_neg_integer()} | undefined.
counters(Name, Fields) ->
seshat:counters(ra, Name, Fields).
32 changes: 27 additions & 5 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,10 @@ init(#{uid := UId,
reader = Reader,
snapshot_state = SnapshotState
},

put_counter(Cfg, ?C_RA_SVR_METRIC_SNAPSHOT_INDEX, SnapIdx),
LastIdx = State000#?MODULE.last_index,
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, LastIdx),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, LastIdx),
% recover the last term
{LastTerm0, State00} = case LastIdx of
SnapIdx ->
Expand Down Expand Up @@ -374,7 +376,8 @@ last_written(#?MODULE{last_written_index_term = LWTI}) ->
%% forces the last index and last written index back to a prior index
-spec set_last_index(ra_index(), state()) ->
{ok, state()} | {not_found, state()}.
set_last_index(Idx, #?MODULE{cache = Cache0,
set_last_index(Idx, #?MODULE{cfg = Cfg,
cache = Cache0,
last_written_index_term = {LWIdx0, _}} = State0) ->
case fetch_term(Idx, State0) of
{undefined, State} ->
Expand All @@ -385,6 +388,8 @@ set_last_index(Idx, #?MODULE{cache = Cache0,
%% this should always be found but still assert just in case
true = LWTerm =/= undefined,
Cache = ra_log_cache:set_last(Idx, Cache0),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, LWIdx),
{ok, State2#?MODULE{last_index = Idx,
last_term = Term,
cache = Cache,
Expand All @@ -401,7 +406,8 @@ handle_event({written, {FromIdx, _ToIdx, _Term}},
%% Just drop the event in this case as it is stale
{State, []};
handle_event({written, {FromIdx, ToIdx0, Term}},
#?MODULE{last_written_index_term = {LastWrittenIdx0,
#?MODULE{cfg = Cfg,
last_written_index_term = {LastWrittenIdx0,
LastWrittenTerm0},
last_index = LastIdx,
snapshot_state = SnapState} = State0)
Expand All @@ -416,6 +422,7 @@ handle_event({written, {FromIdx, ToIdx0, Term}},
ToIdx = min(ToIdx0, LastIdx),
case fetch_term(ToIdx, State0) of
{Term, State} when is_integer(Term) ->
ok = put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, ToIdx),
{State#?MODULE{last_written_index_term = {ToIdx, Term}},
%% delaying truncate_cache until the next event allows any entries
%% that became committed to be read from cache rather than ETS
Expand All @@ -426,8 +433,10 @@ handle_event({written, {FromIdx, ToIdx0, Term}},
% followers returning appending the entry and the leader committing
% and processing a snapshot before the written event comes in.
% ensure last_written_index_term does not go backwards
LastWrittenIdxTerm = {max(LastWrittenIdx0, ToIdx),
LastWrittenIdx = max(LastWrittenIdx0, ToIdx),
LastWrittenIdxTerm = {LastWrittenIdx,
max(LastWrittenTerm0, Term)},
ok = put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, LastWrittenIdx),
{State#?MODULE{last_written_index_term = LastWrittenIdxTerm},
[{next_event, {ra_log_event, {truncate_cache, FromIdx, ToIdx}}}]};
{OtherTerm, State} ->
Expand Down Expand Up @@ -491,13 +500,15 @@ handle_event({segments, Tid, NewSegs},
{State, log_update_effects(Readers, Pid, State)}
end;
handle_event({snapshot_written, {SnapIdx, _} = Snap},
#?MODULE{first_index = FstIdx,
#?MODULE{cfg = Cfg,
first_index = FstIdx,
snapshot_state = SnapState0} = State0)
%% only update snapshot if it is newer than the last snapshot
when SnapIdx >= FstIdx ->
% delete any segments outside of first_index
{State, Effects0} = delete_segments(SnapIdx, State0),
SnapState = ra_snapshot:complete_snapshot(Snap, SnapState0),
put_counter(Cfg, ?C_RA_SVR_METRIC_SNAPSHOT_INDEX, SnapIdx),
%% delete old snapshot files
%% This is done as an effect
%% so that if an old snapshot is still being replicated
Expand Down Expand Up @@ -573,6 +584,9 @@ install_snapshot({SnapIdx, _} = IdxTerm, SnapState,
#?MODULE{cfg = Cfg,
cache = Cache} = State0) ->
ok = incr_counter(Cfg, ?C_RA_LOG_SNAPSHOTS_INSTALLED, 1),
ok = put_counter(Cfg, ?C_RA_SVR_METRIC_SNAPSHOT_INDEX, SnapIdx),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, SnapIdx),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, SnapIdx),
{State, Effs} = delete_segments(SnapIdx, State0),
{State#?MODULE{snapshot_state = SnapState,
first_index = SnapIdx + 1,
Expand Down Expand Up @@ -837,6 +851,7 @@ wal_truncate_write(#?MODULE{cfg = #cfg{uid = UId,
% and that prior entries should be considered stale
ok = ra_log_wal:truncate_write({UId, self()}, Wal, Idx, Term, Cmd),
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1),
ok = put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
State#?MODULE{last_index = Idx, last_term = Term,
cache = ra_log_cache:add(Entry, Cache)}.

Expand All @@ -847,6 +862,7 @@ wal_write(#?MODULE{cfg = #cfg{uid = UId,
case ra_log_wal:write({UId, self()}, Wal, Idx, Term, Cmd) of
ok ->
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
State#?MODULE{last_index = Idx, last_term = Term,
cache = ra_log_cache:add(Entry, Cache)};
{error, wal_down} ->
Expand All @@ -869,6 +885,7 @@ wal_write_batch(#?MODULE{cfg = #cfg{uid = UId,
case ra_log_wal:write_batch(Wal, lists:reverse(WalCommands)) of
ok ->
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, Num),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, LastIdx),
State#?MODULE{last_index = LastIdx,
last_term = LastTerm,
cache = Cache};
Expand Down Expand Up @@ -1030,6 +1047,11 @@ incr_counter(#cfg{counter = Cnt}, Ix, N) when Cnt =/= undefined ->
incr_counter(#cfg{counter = undefined}, _Ix, _N) ->
ok.

put_counter(#cfg{counter = Cnt}, Ix, N) when Cnt =/= undefined ->
counters:put(Cnt, Ix, N);
put_counter(#cfg{counter = undefined}, _Ix, _N) ->
ok.

server_data_dir(Dir, UId) ->
Me = ra_lib:to_list(UId),
filename:join(Dir, Me).
Expand Down
Loading

0 comments on commit 5ce1e82

Please sign in to comment.