Skip to content

Commit

Permalink
Merge pull request #449 from rabbitmq/handle-unknown-calls
Browse files Browse the repository at this point in the history
Return an error if a `call` is unsupported
  • Loading branch information
kjnilsson authored Jun 20, 2024
2 parents 7175f0f + e37a41b commit 6de4074
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 18 deletions.
10 changes: 5 additions & 5 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ handle_leader(force_member_change, State0) ->
{follower, State0#{votes => 0}, [{next_event, force_member_change}]};
handle_leader(Msg, State) ->
log_unhandled_msg(leader, Msg, State),
{leader, State, []}.
{leader, State, [{reply, {error, {unsupported_call, Msg}}}]}.


-spec handle_candidate(ra_msg() | election_timeout, ra_server_state()) ->
Expand Down Expand Up @@ -943,7 +943,7 @@ handle_candidate(force_member_change, State0) ->
{follower, State0#{votes => 0}, [{next_event, force_member_change}]};
handle_candidate(Msg, State) ->
log_unhandled_msg(candidate, Msg, State),
{candidate, State, []}.
{candidate, State, [{reply, {error, {unsupported_call, Msg}}}]}.

-spec handle_pre_vote(ra_msg(), ra_server_state()) ->
{ra_state(), ra_server_state(), effects()}.
Expand Down Expand Up @@ -1023,7 +1023,7 @@ handle_pre_vote(force_member_change, State0) ->
{follower, State0#{votes => 0}, [{next_event, force_member_change}]};
handle_pre_vote(Msg, State) ->
log_unhandled_msg(pre_vote, Msg, State),
{pre_vote, State, []}.
{pre_vote, State, [{reply, {error, {unsupported_call, Msg}}}]}.


-spec handle_follower(ra_msg(), ra_server_state()) ->
Expand Down Expand Up @@ -1327,7 +1327,7 @@ handle_follower(force_member_change,
call_for_election(pre_vote, State, [{reply, ok} | Effects]);
handle_follower(Msg, State) ->
log_unhandled_msg(follower, Msg, State),
{follower, State, []}.
{follower, State, [{reply, {error, {unsupported_call, Msg}}}]}.

handle_receive_snapshot(#install_snapshot_rpc{term = Term,
meta = #{index := SnapIndex,
Expand Down Expand Up @@ -1416,7 +1416,7 @@ handle_receive_snapshot(Msg, State) ->
log_unhandled_msg(receive_snapshot, Msg, State),
%% drop all other events??
%% TODO: work out what else to handle
{receive_snapshot, State, []}.
{receive_snapshot, State, [{reply, {error, {unsupported_call, Msg}}}]}.

-spec handle_await_condition(ra_msg(), ra_server_state()) ->
{ra_state(), ra_server_state(), effects()}.
Expand Down
9 changes: 7 additions & 2 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@

-export([send_rpc/3]).

-ifdef(TEST).
-export([leader_call/3,
local_call/3]).
-endif.

-define(DEFAULT_BROADCAST_TIME, 100).
-define(DEFAULT_ELECTION_MULT, 5).
-define(TICK_INTERVAL_MS, 1000).
Expand Down Expand Up @@ -1300,8 +1305,8 @@ handle_effect(_, {reply, Reply}, {call, From}, State, Actions) ->
% reply directly
ok = gen_statem:reply(From, Reply),
{State, Actions};
handle_effect(_, {reply, Reply}, EvtType, _, _) ->
exit({undefined_reply, Reply, EvtType});
handle_effect(_, {reply, _Reply}, _EvtType, State, Actions) ->
{State, Actions};
handle_effect(leader, {send_snapshot, {_, ToNode} = To, {SnapState, Id, Term}}, _,
#state{server_state = SS0,
monitors = Monitors,
Expand Down
49 changes: 48 additions & 1 deletion test/ra_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
%% `ra_server:command_reply_mode()' type:
-dialyzer({nowarn_function, [process_command_with_unknown_reply_mode/1]}).

%% The following testcases simulate an erroneous or unsupported call that is
%% outside of the spec.
-dialyzer({nowarn_function, [unknown_leader_call/1,
unknown_local_call/1]}).

all() ->
[
{group, tests}
Expand Down Expand Up @@ -68,7 +73,9 @@ all_tests() ->
transfer_leadership_two_node,
new_nonvoter_knows_its_status,
voter_gets_promoted_consistent_leader,
voter_gets_promoted_new_leader
voter_gets_promoted_new_leader,
unknown_leader_call,
unknown_local_call
].

groups() ->
Expand Down Expand Up @@ -1160,6 +1167,46 @@ voter_gets_promoted_new_leader(Config) ->
lists:map(fun({Name, _}) -> #{Name := #{membership := voter}} = Servers end, All),
ok.

unknown_leader_call(Config) ->
[A, _B, _C] = Cluster = start_local_cluster(3, ?config(test_name, Config),
{simple, fun erlang:'+'/2, 9}),
try
%% Query the leader and deduce a follower.
{ok, _, Leader} = ra:process_command(A, 5, ?PROCESS_COMMAND_TIMEOUT),
[Follower | _] = Cluster -- [Leader],
ct:pal("Leader: ~0p~nFollower: ~0p", [Leader, Follower]),

Call = unknown_call,
?assertEqual(
{error, {unsupported_call, Call}},
ra_server_proc:leader_call(Leader, Call, ?DEFAULT_TIMEOUT)),
?assertEqual(
{error, {unsupported_call, Call}},
ra_server_proc:leader_call(Follower, Call, ?DEFAULT_TIMEOUT))
after
terminate_cluster(Cluster)
end.

unknown_local_call(Config) ->
[A, _B, _C] = Cluster = start_local_cluster(3, ?config(test_name, Config),
{simple, fun erlang:'+'/2, 9}),
try
%% Query the leader and deduce a follower.
{ok, _, Leader} = ra:process_command(A, 5, ?PROCESS_COMMAND_TIMEOUT),
[Follower | _] = Cluster -- [Leader],
ct:pal("Leader: ~0p~nFollower: ~0p", [Leader, Follower]),

Call = unknown_call,
?assertEqual(
{error, {unsupported_call, Call}},
ra_server_proc:local_call(Leader, Call, ?DEFAULT_TIMEOUT)),
?assertEqual(
{error, {unsupported_call, Call}},
ra_server_proc:local_call(Follower, Call, ?DEFAULT_TIMEOUT))
after
terminate_cluster(Cluster)
end.

get_gen_statem_status(Ref) ->
{_, _, _, Items} = sys:get_status(Ref),
proplists:get_value(raft_state, lists:last(Items)).
Expand Down
20 changes: 10 additions & 10 deletions test/ra_server_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2384,11 +2384,11 @@ candidate_heartbeat_reply(_Config) ->

HeartbeatReply = #heartbeat_reply{term = Term, query_index = 2},
%% Same term is ignored
{candidate, State, []}
{candidate, State, [{reply, {error, {unsupported_call, _}}}]}
= ra_server:handle_candidate({{no_peer, node()}, HeartbeatReply}, State),

%% Lower term is ignored
{candidate, State, []}
{candidate, State, [{reply, {error, {unsupported_call, _}}}]}
= ra_server:handle_candidate({{no_peer, node()}, HeartbeatReply#heartbeat_reply{term = Term - 1}}, State),

%% Higher term updates term and changes to follower
Expand Down Expand Up @@ -2439,11 +2439,11 @@ pre_vote_heartbeat_reply(_Config) ->
query_index = 2},

%% Heartbeat reply with same term is ignored
{pre_vote, State, []}
{pre_vote, State, [{reply, {error, {unsupported_call, _}}}]}
= ra_server:handle_pre_vote({{no_peer, node()}, HeartbeatReply}, State),

%% Heartbeat reply with lower term is ignored
{pre_vote, State, []}
{pre_vote, State, [{reply, {error, {unsupported_call, _}}}]}
= ra_server:handle_pre_vote(
{{no_peer, node()}, HeartbeatReply#heartbeat_reply{term = Term - 1}},
State),
Expand Down Expand Up @@ -2748,13 +2748,13 @@ receive_snapshot_heartbeat_dropped(_Config) ->
Heartbeat = #heartbeat_rpc{term = Term,
query_index = QueryIndex,
leader_id = Id},
{receive_snapshot, State, []} =
{receive_snapshot, State, [{reply, {error, {unsupported_call, _}}}]} =
ra_server:handle_receive_snapshot(Heartbeat, State),
%% Term does not matter
{receive_snapshot, State, []} =
{receive_snapshot, State, [{reply, {error, {unsupported_call, _}}}]} =
ra_server:handle_receive_snapshot(Heartbeat#heartbeat_rpc{term = Term + 1},
State),
{receive_snapshot, State, []} =
{receive_snapshot, State, [{reply, {error, {unsupported_call, _}}}]} =
ra_server:handle_receive_snapshot(Heartbeat#heartbeat_rpc{term = Term - 1},
State).

Expand All @@ -2765,13 +2765,13 @@ receive_snapshot_heartbeat_reply_dropped(_config) ->

HeartbeatReply = #heartbeat_reply{term = Term,
query_index = QueryIndex},
{receive_snapshot, State, []} =
{receive_snapshot, State, [{reply, {error, {unsupported_call, _}}}]} =
ra_server:handle_receive_snapshot(HeartbeatReply, State),
%% Term does not matter
{receive_snapshot, State, []} =
{receive_snapshot, State, [{reply, {error, {unsupported_call, _}}}]} =
ra_server:handle_receive_snapshot(HeartbeatReply#heartbeat_reply{term = Term + 1},
State),
{receive_snapshot, State, []} =
{receive_snapshot, State, [{reply, {error, {unsupported_call, _}}}]} =
ra_server:handle_receive_snapshot(HeartbeatReply#heartbeat_reply{term = Term - 1},
State).

Expand Down

0 comments on commit 6de4074

Please sign in to comment.