Skip to content

Commit

Permalink
Merge pull request #396 from rabbitmq/gh_393
Browse files Browse the repository at this point in the history
Commit down commands as low priority.
  • Loading branch information
kjnilsson authored Sep 12, 2023
2 parents cb5bc3c + 50b3c33 commit 4dec8d3
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 8 deletions.
7 changes: 3 additions & 4 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1874,10 +1874,9 @@ peer_snapshot_process_exited(SnapshotPid, #{cluster := Peers} = State) ->
{ra_state(), ra_server_state(), effects()}.
handle_down(leader, machine, Pid, Info, State)
when is_pid(Pid) ->
%% commit command to be processed by state machine
handle_leader({command, {'$usr', #{ts => erlang:system_time(millisecond)},
{down, Pid, Info}, noreply}},
State);
% %% commit command to be processed by state machine
Eff = {next_event, {command, low, {'$usr', {down, Pid, Info}, noreply}}},
{leader, State, [Eff]};
handle_down(RaftState, snapshot_sender, Pid, Info,
#{cfg := #cfg{log_id = LogId}} = State)
when (RaftState == leader orelse
Expand Down
54 changes: 54 additions & 0 deletions test/ra_machine_int_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ all_tests() ->
send_msg_with_ra_event_and_cast_options,
machine_replies,
leader_monitors,
down_follows_all_low_priority_commands,
follower_takes_over_monitor,
deleted_cluster_emits_eol_effect,
machine_state_enter_effects,
Expand Down Expand Up @@ -222,6 +223,59 @@ leader_monitors(Config) ->
ra:stop_server(?SYS, ServerId),
ok.

down_follows_all_low_priority_commands(Config) ->
ClusterName = ?config(cluster_name, Config),
{_Name1, _} = ServerId1 = ?config(server_id, Config),
{_Name2, _} = ServerId2 = ?config(server_id2, Config),
{_Name3, _} = ServerId3 = ?config(server_id3, Config),
Cluster = [ServerId1, ServerId2, ServerId3],
Mod = ?config(modname, Config),
Self = self(),
meck:new(Mod, [non_strict]),
meck:expect(Mod, init, fun (_) -> [] end),
meck:expect(Mod, apply,
fun (_, {monitor_me, Pid}, State) ->
ct:pal("monitoring ~p", [Pid]),
{[Pid | State], ok, [{monitor, process, Pid}]};
(_, {down, Pid, _}, State) ->
{lists:delete(Pid, State), ok, []};
(_, {cmd, Pid}, State) ->
% ct:pal("handling ~p", [Cmd]),
case lists:member(Pid, State) of
true ->
{State, ok};
false ->
{State, ok, [{send_msg, Self, {unexpected_cmd, Pid}}]}
end
end),
ok = start_cluster(ClusterName, {module, Mod, #{}}, Cluster),
%% send some commands then exit swiftly
spawn(
fun () ->
{ok, ok, L} = ra:process_command(ServerId1, {monitor_me, self()}),
[ra:pipeline_command(L, {cmd, self()}) || _ <- lists:seq(1, 200)],
Self ! done,
ok
end),

receive
done ->
receive
{unexpected_cmd, _} ->
ct:fail("Unexpexted command after down")
after 2000 ->
ok
end
after 5000 ->
exit(done_Timeout)
end,


ra:stop_server(?SYS, ServerId1),
ra:stop_server(?SYS, ServerId2),
ra:stop_server(?SYS, ServerId3),
ok.

follower_takes_over_monitor(Config) ->
ClusterName = ?config(cluster_name, Config),
{_Name1, _} = ServerId1 = ?config(server_id, Config),
Expand Down
8 changes: 4 additions & 4 deletions test/ra_server_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2440,15 +2440,15 @@ receive_snapshot_heartbeat_reply_dropped(_config) ->

handle_down(_config) ->
State0 = base_state(3, ?FUNCTION_NAME),
%% this should commit a command
{leader, #{log := Log} = State, _} =
%% this should return a next_event effect to commit a command
Pid = self(),
{leader, State,
[{next_event, {command, low, {'$usr', {down, Pid, noproc}, noreply}}}]} =
ra_server:handle_down(leader, machine, self(), noproc, State0),
?assertEqual({4, 5}, ra_log:last_index_term(Log)),
%% this should be ignored as may happen if state machine doesn't demonitor
%% on state changes
{follower, State, []} =
ra_server:handle_down(follower, machine, self(), noproc, State),

ok.

set_peer_query_index(State, PeerId, QueryIndex) ->
Expand Down

0 comments on commit 4dec8d3

Please sign in to comment.