From e37a41bf290a98de5721904107f5b9674ca2656e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Wed, 19 Jun 2024 11:31:44 +0200 Subject: [PATCH] Return an error if a `call` is unsupported [Why] Currently, if a call to a Ra server is unsupported, the Ra server will ignore the event and the call will eventually time out. This could happen when Ra servers sport different versions of Ra for instance. It would be nicer if the Ra server would reply immediatly with an error to let the caller know about the actual problem. [How] The Ra server uses a `reply` effect in the catch-all clause. The returned error is: {error, {unsupported_call, Call}} --- src/ra_server.erl | 10 ++++---- src/ra_server_proc.erl | 9 ++++++-- test/ra_SUITE.erl | 49 +++++++++++++++++++++++++++++++++++++++- test/ra_server_SUITE.erl | 20 ++++++++-------- 4 files changed, 70 insertions(+), 18 deletions(-) diff --git a/src/ra_server.erl b/src/ra_server.erl index b5eaee2e..e06b3c03 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -830,7 +830,7 @@ handle_leader(force_member_change, State0) -> {follower, State0#{votes => 0}, [{next_event, force_member_change}]}; handle_leader(Msg, State) -> log_unhandled_msg(leader, Msg, State), - {leader, State, []}. + {leader, State, [{reply, {error, {unsupported_call, Msg}}}]}. -spec handle_candidate(ra_msg() | election_timeout, ra_server_state()) -> @@ -943,7 +943,7 @@ handle_candidate(force_member_change, State0) -> {follower, State0#{votes => 0}, [{next_event, force_member_change}]}; handle_candidate(Msg, State) -> log_unhandled_msg(candidate, Msg, State), - {candidate, State, []}. + {candidate, State, [{reply, {error, {unsupported_call, Msg}}}]}. -spec handle_pre_vote(ra_msg(), ra_server_state()) -> {ra_state(), ra_server_state(), effects()}. @@ -1023,7 +1023,7 @@ handle_pre_vote(force_member_change, State0) -> {follower, State0#{votes => 0}, [{next_event, force_member_change}]}; handle_pre_vote(Msg, State) -> log_unhandled_msg(pre_vote, Msg, State), - {pre_vote, State, []}. + {pre_vote, State, [{reply, {error, {unsupported_call, Msg}}}]}. -spec handle_follower(ra_msg(), ra_server_state()) -> @@ -1327,7 +1327,7 @@ handle_follower(force_member_change, call_for_election(pre_vote, State, [{reply, ok} | Effects]); handle_follower(Msg, State) -> log_unhandled_msg(follower, Msg, State), - {follower, State, []}. + {follower, State, [{reply, {error, {unsupported_call, Msg}}}]}. handle_receive_snapshot(#install_snapshot_rpc{term = Term, meta = #{index := SnapIndex, @@ -1416,7 +1416,7 @@ handle_receive_snapshot(Msg, State) -> log_unhandled_msg(receive_snapshot, Msg, State), %% drop all other events?? %% TODO: work out what else to handle - {receive_snapshot, State, []}. + {receive_snapshot, State, [{reply, {error, {unsupported_call, Msg}}}]}. -spec handle_await_condition(ra_msg(), ra_server_state()) -> {ra_state(), ra_server_state(), effects()}. diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 948158cb..77ce1a4c 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -57,6 +57,11 @@ -export([send_rpc/3]). +-ifdef(TEST). +-export([leader_call/3, + local_call/3]). +-endif. + -define(DEFAULT_BROADCAST_TIME, 100). -define(DEFAULT_ELECTION_MULT, 5). -define(TICK_INTERVAL_MS, 1000). @@ -1300,8 +1305,8 @@ handle_effect(_, {reply, Reply}, {call, From}, State, Actions) -> % reply directly ok = gen_statem:reply(From, Reply), {State, Actions}; -handle_effect(_, {reply, Reply}, EvtType, _, _) -> - exit({undefined_reply, Reply, EvtType}); +handle_effect(_, {reply, _Reply}, _EvtType, State, Actions) -> + {State, Actions}; handle_effect(leader, {send_snapshot, {_, ToNode} = To, {SnapState, Id, Term}}, _, #state{server_state = SS0, monitors = Monitors, diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl index 306d6976..346ff6bc 100644 --- a/test/ra_SUITE.erl +++ b/test/ra_SUITE.erl @@ -20,6 +20,11 @@ %% `ra_server:command_reply_mode()' type: -dialyzer({nowarn_function, [process_command_with_unknown_reply_mode/1]}). +%% The following testcases simulate an erroneous or unsupported call that is +%% outside of the spec. +-dialyzer({nowarn_function, [unknown_leader_call/1, + unknown_local_call/1]}). + all() -> [ {group, tests} @@ -68,7 +73,9 @@ all_tests() -> transfer_leadership_two_node, new_nonvoter_knows_its_status, voter_gets_promoted_consistent_leader, - voter_gets_promoted_new_leader + voter_gets_promoted_new_leader, + unknown_leader_call, + unknown_local_call ]. groups() -> @@ -1160,6 +1167,46 @@ voter_gets_promoted_new_leader(Config) -> lists:map(fun({Name, _}) -> #{Name := #{membership := voter}} = Servers end, All), ok. +unknown_leader_call(Config) -> + [A, _B, _C] = Cluster = start_local_cluster(3, ?config(test_name, Config), + {simple, fun erlang:'+'/2, 9}), + try + %% Query the leader and deduce a follower. + {ok, _, Leader} = ra:process_command(A, 5, ?PROCESS_COMMAND_TIMEOUT), + [Follower | _] = Cluster -- [Leader], + ct:pal("Leader: ~0p~nFollower: ~0p", [Leader, Follower]), + + Call = unknown_call, + ?assertEqual( + {error, {unsupported_call, Call}}, + ra_server_proc:leader_call(Leader, Call, ?DEFAULT_TIMEOUT)), + ?assertEqual( + {error, {unsupported_call, Call}}, + ra_server_proc:leader_call(Follower, Call, ?DEFAULT_TIMEOUT)) + after + terminate_cluster(Cluster) + end. + +unknown_local_call(Config) -> + [A, _B, _C] = Cluster = start_local_cluster(3, ?config(test_name, Config), + {simple, fun erlang:'+'/2, 9}), + try + %% Query the leader and deduce a follower. + {ok, _, Leader} = ra:process_command(A, 5, ?PROCESS_COMMAND_TIMEOUT), + [Follower | _] = Cluster -- [Leader], + ct:pal("Leader: ~0p~nFollower: ~0p", [Leader, Follower]), + + Call = unknown_call, + ?assertEqual( + {error, {unsupported_call, Call}}, + ra_server_proc:local_call(Leader, Call, ?DEFAULT_TIMEOUT)), + ?assertEqual( + {error, {unsupported_call, Call}}, + ra_server_proc:local_call(Follower, Call, ?DEFAULT_TIMEOUT)) + after + terminate_cluster(Cluster) + end. + get_gen_statem_status(Ref) -> {_, _, _, Items} = sys:get_status(Ref), proplists:get_value(raft_state, lists:last(Items)). diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index 85c4fc9a..89113683 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -2384,11 +2384,11 @@ candidate_heartbeat_reply(_Config) -> HeartbeatReply = #heartbeat_reply{term = Term, query_index = 2}, %% Same term is ignored - {candidate, State, []} + {candidate, State, [{reply, {error, {unsupported_call, _}}}]} = ra_server:handle_candidate({{no_peer, node()}, HeartbeatReply}, State), %% Lower term is ignored - {candidate, State, []} + {candidate, State, [{reply, {error, {unsupported_call, _}}}]} = ra_server:handle_candidate({{no_peer, node()}, HeartbeatReply#heartbeat_reply{term = Term - 1}}, State), %% Higher term updates term and changes to follower @@ -2439,11 +2439,11 @@ pre_vote_heartbeat_reply(_Config) -> query_index = 2}, %% Heartbeat reply with same term is ignored - {pre_vote, State, []} + {pre_vote, State, [{reply, {error, {unsupported_call, _}}}]} = ra_server:handle_pre_vote({{no_peer, node()}, HeartbeatReply}, State), %% Heartbeat reply with lower term is ignored - {pre_vote, State, []} + {pre_vote, State, [{reply, {error, {unsupported_call, _}}}]} = ra_server:handle_pre_vote( {{no_peer, node()}, HeartbeatReply#heartbeat_reply{term = Term - 1}}, State), @@ -2748,13 +2748,13 @@ receive_snapshot_heartbeat_dropped(_Config) -> Heartbeat = #heartbeat_rpc{term = Term, query_index = QueryIndex, leader_id = Id}, - {receive_snapshot, State, []} = + {receive_snapshot, State, [{reply, {error, {unsupported_call, _}}}]} = ra_server:handle_receive_snapshot(Heartbeat, State), %% Term does not matter - {receive_snapshot, State, []} = + {receive_snapshot, State, [{reply, {error, {unsupported_call, _}}}]} = ra_server:handle_receive_snapshot(Heartbeat#heartbeat_rpc{term = Term + 1}, State), - {receive_snapshot, State, []} = + {receive_snapshot, State, [{reply, {error, {unsupported_call, _}}}]} = ra_server:handle_receive_snapshot(Heartbeat#heartbeat_rpc{term = Term - 1}, State). @@ -2765,13 +2765,13 @@ receive_snapshot_heartbeat_reply_dropped(_config) -> HeartbeatReply = #heartbeat_reply{term = Term, query_index = QueryIndex}, - {receive_snapshot, State, []} = + {receive_snapshot, State, [{reply, {error, {unsupported_call, _}}}]} = ra_server:handle_receive_snapshot(HeartbeatReply, State), %% Term does not matter - {receive_snapshot, State, []} = + {receive_snapshot, State, [{reply, {error, {unsupported_call, _}}}]} = ra_server:handle_receive_snapshot(HeartbeatReply#heartbeat_reply{term = Term + 1}, State), - {receive_snapshot, State, []} = + {receive_snapshot, State, [{reply, {error, {unsupported_call, _}}}]} = ra_server:handle_receive_snapshot(HeartbeatReply#heartbeat_reply{term = Term - 1}, State).