Skip to content

Commit

Permalink
move state query to ra_server
Browse files Browse the repository at this point in the history
potential new callback sig

readme.md

make compilable again

Add initial ra_machine:handle_aux/5 API

the appropriate handle_aux function to use is re-evaluated every
time the effective machine module changes and is stored in the
ra_server configuration state.
  • Loading branch information
kjnilsson committed Feb 13, 2024
1 parent 00d660b commit b84b55e
Show file tree
Hide file tree
Showing 7 changed files with 304 additions and 104 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ Ra attempts to follow [Semantic Versioning](https://semver.org/).

The modules that form part of the public API are:
* `ra`
* `ra_machine`
* `ra_machine` (behaviour callbacks only)
* `ra_system`
* `ra_counters`
* `ra_leaderboard`
Expand Down
20 changes: 20 additions & 0 deletions src/ra_aux.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries.
-module(ra_aux).

-export([
machine_state/1
]).

% -include("ra_server.hrl").

-opaque state() :: ra_server:state().

-export_type([state/0]).

-spec machine_state(ra_server:state()) -> term().
machine_state(State) ->
maps:get(?FUNCTION_NAME, State).
50 changes: 48 additions & 2 deletions src/ra_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,12 @@
query/3,
module/1,
init_aux/2,
handle_aux/6,
handle_aux/7,
snapshot_module/1,
version/1,
which_module/2,
which_aux_fun/1,
is_versioned/1
]).

Expand Down Expand Up @@ -202,6 +204,7 @@
-optional_callbacks([tick/2,
state_enter/2,
init_aux/1,
handle_aux/5,
handle_aux/6,
overview/1,
snapshot_module/0,
Expand Down Expand Up @@ -232,6 +235,20 @@

-callback init_aux(Name :: atom()) -> term().

-callback handle_aux(ra_server:ra_state(),
{call, From :: from()} | cast,
Command :: term(),
AuxState,
State) ->
{reply, Reply :: term(), AuxState, State} |
{reply, Reply :: term(), AuxState, State,
[{monitor, process, aux, pid()}]} |
{no_reply, AuxState, State} |
{no_reply, AuxState, State,
[{monitor, process, aux, pid()}]}
when AuxState :: term(),
State :: ra_aux:state().

-callback handle_aux(ra_server:ra_state(),
{call, From :: from()} | cast,
Command :: term(),
Expand Down Expand Up @@ -328,8 +345,37 @@ init_aux(Mod, Name) ->
when AuxState :: term(),
LogState :: ra_log:state().
handle_aux(Mod, RaftState, Type, Cmd, Aux, Log, MacState) ->
?OPT_CALL(Mod:handle_aux(RaftState, Type, Cmd, Aux, Log, MacState),
undefined).
Mod:handle_aux(RaftState, Type, Cmd, Aux, Log, MacState).


-spec handle_aux(module(),
ra_server:ra_state(),
{call, From :: from()} | cast,
Command :: term(),
AuxState,
State) ->
{reply, Reply :: term(), AuxState, State} |
{reply, Reply :: term(), AuxState, State,
[{monitor, process, aux, pid()}]} |
{no_reply, AuxState, State} |
{no_reply, AuxState, State,
[{monitor, process, aux, pid()}]}
when AuxState :: term(),
State :: ra_server:state().
handle_aux(Mod, RaftState, Type, Cmd, Aux, State) ->
Mod:handle_aux(RaftState, Type, Cmd, Aux, State).

-spec which_aux_fun(module()) ->
undefined | {atom(), arity()}.
which_aux_fun(Mod) when is_atom(Mod) ->
case lists:sort([E || {handle_aux, _Arity} = E
<- erlang:apply(Mod,module_info, [exports])]) of
[] ->
undefined;
[AuxFun | _] ->
%% favour {handle_aux, 5} as this is the newer api
AuxFun
end.

-spec query(module(), fun((state()) -> Result), state()) ->
Result when Result :: term().
Expand Down
129 changes: 121 additions & 8 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
log_fold/3,
log_read/2,
get_membership/1,
recover/1
recover/1,
state_query/2
]).

-type ra_await_condition_fun() ::
Expand Down Expand Up @@ -90,6 +91,8 @@
commit_latency => option(non_neg_integer())
}.

-type state() :: ra_server_state().

-type ra_state() :: leader | follower | candidate
| pre_vote | await_condition | delete_and_terminate
| terminating_leader | terminating_follower | recover
Expand Down Expand Up @@ -213,7 +216,8 @@

-type config() :: ra_server_config().

-export_type([config/0,
-export_type([state/0,
config/0,
ra_server_state/0,
ra_state/0,
ra_server_config/0,
Expand Down Expand Up @@ -325,6 +329,7 @@ init(#{id := Id,
machine_versions = [{SnapshotIdx, MacVer}],
effective_machine_version = MacVer,
effective_machine_module = MacMod,
effective_handle_aux_fun = ra_machine:which_aux_fun(MacMod),
max_pipeline_count = MaxPipelineCount,
max_append_entries_rpc_batch_size = MaxAERBatchSize,
counter = maps:get(counter, Config, undefined),
Expand Down Expand Up @@ -1299,7 +1304,9 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
Cfg0#cfg{effective_machine_version = SnapMacVer,
machine_versions = [{SnapIndex, SnapMacVer}
| MachineVersions],
effective_machine_module = EffMacMod};
effective_machine_module = EffMacMod,
effective_handle_aux_fun =
ra_machine:which_aux_fun(EffMacMod)};
false ->
Cfg0
end,
Expand Down Expand Up @@ -1475,11 +1482,37 @@ is_fully_replicated(#{commit_index := CI} = State) ->
MinMI >= CI andalso MinCI >= CI
end.

handle_aux(RaftState, Type, Cmd, #{cfg := #cfg{effective_machine_module = MacMod},
aux_state := Aux0, log := Log0,
machine_state := MacState0} = State0) ->
handle_aux(RaftState, _Type, _Cmd,
#{cfg := #cfg{effective_handle_aux_fun = undefined}} = State0) ->
%% todo reply with error if Type is a call?
{RaftState, State0, []};
handle_aux(RaftState, Type, Cmd,
#{cfg := #cfg{effective_machine_module = MacMod,
effective_handle_aux_fun = {handle_aux, 5}},
aux_state := Aux0} = State0) ->
%% NEW API
case ra_machine:handle_aux(MacMod, RaftState, Type, Cmd, Aux0,
State0) of
{reply, Reply, Aux, State} ->
{RaftState, State#{aux_state => Aux},
[{reply, Reply}]};
{reply, Reply, Aux, State, Effects} ->
{RaftState, State#{aux_state => Aux},
[{reply, Reply} | Effects]};
{no_reply, Aux, State} ->
{RaftState, State#{aux_state => Aux}, []};
{no_reply, Aux, State, Effects} ->
{RaftState, State#{aux_state => Aux}, Effects}
end;
handle_aux(RaftState, Type, Cmd,
#{cfg := #cfg{effective_machine_module = MacMod,
effective_handle_aux_fun = {handle_aux, 6}},
aux_state := Aux0,
machine_state := MacState,
log := Log0} = State0) ->
%% OLD API
case ra_machine:handle_aux(MacMod, RaftState, Type, Cmd, Aux0,
Log0, MacState0) of
Log0, MacState) of
{reply, Reply, Aux, Log} ->
{RaftState, State0#{log => Log, aux_state => Aux},
[{reply, Reply}]};
Expand Down Expand Up @@ -2205,6 +2238,83 @@ update_term(_, State) ->
last_idx_term(#{log := Log}) ->
ra_log:last_index_term(Log).


state_query(all, State) -> State;
state_query(overview, State) ->
overview(State);
state_query(machine, #{machine_state := MacState}) ->
MacState;
state_query(voters, #{cluster := Cluster}) ->
Vs = maps:fold(fun(K, V, Acc) ->
case maps:get(voter_status, V, undefined) of
undefined -> [K|Acc];
S -> case maps:get(membership, S, undefined) of
undefined -> [K|Acc];
voter -> [K|Acc];
_ -> Acc
end
end
end, [], Cluster),
Vs;
state_query(leader, State) ->
maps:get(leader_id, State, undefined);
state_query(members, #{cluster := Cluster}) ->
maps:keys(Cluster);
state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster,
leader_id := Self, query_index := QI, commit_index := CI,
membership := Membership}) ->
maps:map(fun(Id, Peer) ->
case {Id, Peer} of
{Self, Peer = #{voter_status := VoterStatus}} ->
%% For completeness sake, preserve `target`
%% of once promoted leader.
#{next_index => CI+1,
match_index => CI,
query_index => QI,
status => normal,
voter_status => VoterStatus#{membership => Membership}};
{Self, _} ->
#{next_index => CI+1,
match_index => CI,
query_index => QI,
status => normal,
voter_status => #{membership => Membership}};
{_, Peer = #{voter_status := _}} ->
Peer;
{_, Peer} ->
%% Initial cluster members have no voter_status.
Peer#{voter_status => #{membership => voter}}
end
end, Cluster);
state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster,
query_index := QI, commit_index := CI,
membership := Membership}) ->
%% Followers do not have sufficient information,
%% bail out and send whatever we have.
maps:map(fun(Id, Peer) ->
case {Id, Peer} of
{Self, #{voter_status := VS}} ->
#{match_index => CI,
query_index => QI,
voter_status => VS#{membership => Membership}};
{Self, _} ->
#{match_index => CI,
query_index => QI,
voter_status => #{membership => Membership}};
_ ->
#{}
end
end, Cluster);
state_query(initial_members, #{log := Log}) ->
case ra_log:read_config(Log) of
{ok, #{initial_members := InitialMembers}} ->
InitialMembers;
_ ->
error
end;
state_query(Query, _State) ->
{error, {unknown_query, Query}}.

%% § 5.4.1 Raft determines which of two logs is more up-to-date by comparing
%% the index and term of the last entries in the logs. If the logs have last
%% entries with different terms, then the log with the later term is more
Expand Down Expand Up @@ -2427,7 +2537,10 @@ apply_with({Idx, Term, {noop, CmdMeta, NextMacVer}},
Cfg = Cfg0#cfg{effective_machine_version = NextMacVer,
%% record this machine version "term"
machine_versions = [{Idx, NextMacVer} | MacVersions],
effective_machine_module = Module},
effective_machine_module = Module,
effective_handle_aux_fun =
ra_machine:which_aux_fun(Module)
},
State = State0#{cfg => Cfg,
cluster_change_permitted => ClusterChangePerm},
Meta = augment_command_meta(Idx, Term, MacVer, CmdMeta),
Expand Down
1 change: 1 addition & 0 deletions src/ra_server.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
machine_versions :: [{ra_index(), ra_machine:version()}, ...],
effective_machine_version :: ra_machine:version(),
effective_machine_module :: module(),
effective_handle_aux_fun :: undefined | {handle_aux, 5 | 6},
max_pipeline_count = ?DEFAULT_MAX_PIPELINE_COUNT :: non_neg_integer(),
max_append_entries_rpc_batch_size = ?AER_CHUNK_SIZE :: non_neg_integer(),
counter :: undefined | counters:counters_ref(),
Expand Down
Loading

0 comments on commit b84b55e

Please sign in to comment.