Skip to content

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
kjnilsson committed Feb 16, 2024
1 parent 2a36e58 commit 2320ccc
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 45 deletions.
12 changes: 6 additions & 6 deletions docs/internals/STATE_MACHINE_TUTORIAL.md
Original file line number Diff line number Diff line change
Expand Up @@ -317,13 +317,13 @@ Apart from the mandatory callbacks Ra supports some optional callbacks
{call, From :: from()} | cast,
Command :: term(),
AuxState,
State) ->
{reply, Reply :: term(), AuxState, State} |
{reply, Reply :: term(), AuxState, State, effects()} |
{no_reply, AuxState, State} |
{no_reply, AuxState, State, effects()}
IntState) ->
{reply, Reply :: term(), AuxState, IntState} |
{reply, Reply :: term(), AuxState, IntState, effects()} |
{no_reply, AuxState, IntState} |
{no_reply, AuxState, IntState, effects()}
when AuxState :: term(),
State :: ra_aux:state().
IntState :: ra_aux:internal_state().
```

These two callbacks allow each server to maintain a local state machine
Expand Down
38 changes: 19 additions & 19 deletions src/ra_aux.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,46 @@

-include("ra.hrl").

-opaque state() :: ra_server:state().
-opaque internal_state() :: ra_server:state().

-export_type([state/0]).
-export_type([internal_state/0]).

-spec machine_state(ra_aux:state()) -> term().
-spec machine_state(ra_aux:internal_state()) -> term().
machine_state(State) ->
maps:get(?FUNCTION_NAME, State).

-spec leader_id(ra_aux:state()) -> undefined | ra_server_id().
-spec leader_id(ra_aux:internal_state()) -> undefined | ra_server_id().
leader_id(State) ->
maps:get(?FUNCTION_NAME, State).

-spec members_info(ra_aux:state()) -> ra_cluster().
-spec members_info(ra_aux:internal_state()) -> ra_cluster().
members_info(State) ->
ra_server:state_query(?FUNCTION_NAME, State).

-spec overview(ra_aux:state()) -> map().
-spec overview(ra_aux:internal_state()) -> map().
overview(State) ->
ra_server:state_query(?FUNCTION_NAME, State).

-spec log_last_index_term(ra_aux:state()) -> ra_idxterm().
-spec log_last_index_term(ra_aux:internal_state()) -> ra_idxterm().
log_last_index_term(#{log := Log}) ->
ra_log:last_index_term(Log).
ra_log:last_index_term(Log).

-spec log_fetch(ra_index(), ra_aux:state()) ->
-spec log_fetch(ra_index(), ra_aux:internal_state()) ->
{undefined |
{ra_term(),
CmdMetadata :: ra_server:command_meta(),
Command :: term()}, ra_aux:state()}.
Command :: term()}, ra_aux:internal_state()}.
log_fetch(Idx, #{log := Log0} = State)
when is_integer(Idx) ->
case ra_log:fetch(Idx, Log0) of
{{Idx, Term, {'$usr', Meta, Cmd, _ReplyMode}}, Log} ->
{{Term, Meta, Cmd}, State#{log => Log}};
{_, Log} ->
%% we only allow user commands to be read
{undefined, State#{log => Log}}
end.

-spec log_stats(ra_aux:state()) -> ra_log:overview().
case ra_log:fetch(Idx, Log0) of
{{Idx, Term, {'$usr', Meta, Cmd, _ReplyMode}}, Log} ->
{{Term, Meta, Cmd}, State#{log => Log}};
{_, Log} ->
%% we only allow user commands to be read
{undefined, State#{log => Log}}
end.

-spec log_stats(ra_aux:internal_state()) -> ra_log:overview().
log_stats(#{log := Log}) ->
ra_log:overview(Log).

1 change: 1 addition & 0 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
open_segments => non_neg_integer(),
snapshot_index => undefined | ra_index(),
cache_size => non_neg_integer(),
latest_checkpoint_index => undefined | ra_index(),
atom() => term()}.

-export_type([state/0,
Expand Down
14 changes: 7 additions & 7 deletions src/ra_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -240,19 +240,19 @@

-callback tick(TimeMs :: milliseconds(), state()) -> effects().

-callback init_aux(Name :: atom()) -> term().
-callback init_aux(Name :: atom()) -> AuxState :: term().

-callback handle_aux(ra_server:ra_state(),
{call, From :: from()} | cast,
Command :: term(),
AuxState,
State) ->
{reply, Reply :: term(), AuxState, State} |
{reply, Reply :: term(), AuxState, State, effects()} |
{no_reply, AuxState, State} |
{no_reply, AuxState, State, effects()}
IntState) ->
{reply, Reply :: term(), AuxState, IntState} |
{reply, Reply :: term(), AuxState, IntState, effects()} |
{no_reply, AuxState, IntState} |
{no_reply, AuxState, IntState, effects()}
when AuxState :: term(),
State :: ra_aux:state().
IntState :: ra_aux:internal_state().

-callback handle_aux(ra_server:ra_state(),
{call, From :: from()} | cast,
Expand Down
31 changes: 18 additions & 13 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1484,10 +1484,16 @@ is_fully_replicated(#{commit_index := CI} = State) ->
MinMI >= CI andalso MinCI >= CI
end.

handle_aux(RaftState, _Type, _Cmd,
handle_aux(RaftState, Type, _Cmd,
#{cfg := #cfg{effective_handle_aux_fun = undefined}} = State0) ->
%% todo reply with error if Type is a call?
{RaftState, State0, []};
Effects = case Type of
cast ->
[];
_From ->
[{reply, {error, aux_handler_not_implemented}}]
end,
{RaftState, State0, Effects};
handle_aux(RaftState, Type, Cmd,
#{cfg := #cfg{effective_machine_module = MacMod,
effective_handle_aux_fun = {handle_aux, 5}},
Expand Down Expand Up @@ -2266,17 +2272,16 @@ state_query(overview, State) ->
state_query(machine, #{machine_state := MacState}) ->
MacState;
state_query(voters, #{cluster := Cluster}) ->
Vs = maps:fold(fun(K, V, Acc) ->
case maps:get(voter_status, V, undefined) of
undefined -> [K|Acc];
S -> case maps:get(membership, S, undefined) of
undefined -> [K|Acc];
voter -> [K|Acc];
_ -> Acc
end
end
end, [], Cluster),
Vs;
maps:fold(fun(K, V, Acc) ->
case maps:get(voter_status, V, undefined) of
undefined -> [K|Acc];
S -> case maps:get(membership, S, undefined) of
undefined -> [K|Acc];
voter -> [K|Acc];
_ -> Acc
end
end
end, [], Cluster);
state_query(leader, State) ->
maps:get(leader_id, State, undefined);
state_query(members, #{cluster := Cluster}) ->
Expand Down
24 changes: 24 additions & 0 deletions test/ra_machine_int_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ all_tests() ->
log_effect,
aux_eval,
aux_tick,
aux_handler_not_impl,
aux_command,
aux_command_v2,
aux_command_v1_and_v2,
Expand Down Expand Up @@ -591,6 +592,29 @@ log_effect(Config) ->
end,
ok.

aux_handler_not_impl(Config) ->
ClusterName = ?config(cluster_name, Config),
ServerId1 = ?config(server_id, Config),
Cluster = [ServerId1,
?config(server_id2, Config),
?config(server_id3, Config)],
Mod = ?config(modname, Config),
meck:new(Mod, [non_strict]),
meck:expect(Mod, init, fun (_) -> [] end),
meck:expect(Mod, init_aux, fun (_) -> undefined end),
meck:expect(Mod, apply,
fun (_, {monitor_me, Pid}, State) ->
{[Pid | State], ok, [{monitor, process, Pid}]};
(_, Cmd, State) ->
ct:pal("handling ~p", [Cmd]),
%% handle all
{State, ok}
end),
ok = start_cluster(ClusterName, {module, Mod, #{}}, Cluster),
{ok, _, Leader} = ra:members(ServerId1),
{error, aux_handler_not_implemented} = ra:aux_command(Leader, emit),
ok.

aux_command(Config) ->
ClusterName = ?config(cluster_name, Config),
ServerId1 = ?config(server_id, Config),
Expand Down

0 comments on commit 2320ccc

Please sign in to comment.