From b18ab085315958bf9895d3d5be7dfec8edd13485 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 5 Jan 2024 16:48:53 +0000 Subject: [PATCH] Various fixes to ra_leaderboard Ensure that the members are updated when the membership is modifiied. Always delete leaderboard record in ra server terminate callback to favour accuracy of leadership information over availability. The idea is that when a record is not found in the leaderboard the caller falls back to some other, potentially slower, means of leader discovery. --- src/ra_server.erl | 97 ++++++++++++++++++++++--------------- src/ra_server_proc.erl | 6 ++- src/ra_server_sup_sup.erl | 11 +++-- test/coordination_SUITE.erl | 63 ++++++++++++++++++++++++ test/ra_server_SUITE.erl | 4 +- 5 files changed, 132 insertions(+), 49 deletions(-) diff --git a/src/ra_server.erl b/src/ra_server.erl index 02e1a004..62e16d0a 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -503,19 +503,19 @@ handle_leader({PeerId, #append_entries_reply{success = false, {leader, State, Effects}; handle_leader({command, Cmd}, #{cfg := #cfg{log_id = LogId} = Cfg} = State00) -> ok = incr_counter(Cfg, ?C_RA_SRV_COMMANDS, 1), - case append_log_leader(Cmd, State00) of - {not_appended, Reason, State} -> + case append_log_leader(Cmd, State00, []) of + {not_appended, Reason, State, Effects0} -> ?WARN("~ts command ~W NOT appended to log. Reason ~w", [LogId, Cmd, 10, Reason]), Effects = case Cmd of {_, #{from := From}, _, _} -> - [{reply, From, {error, Reason}}]; + [{reply, From, {error, Reason}} | Effects0]; _ -> - [] + Effects0 end, {leader, State, Effects}; - {ok, Idx, Term, State0} -> - {State, _, Effects0} = make_pipelined_rpc_effects(State0, []), + {ok, Idx, Term, State0, Effects00} -> + {State, _, Effects0} = make_pipelined_rpc_effects(State0, Effects00), % check if a reply is required. % TODO: refactor - can this be made a bit nicer/more explicit? Effects = case Cmd of @@ -531,8 +531,8 @@ handle_leader({commands, Cmds}, #{cfg := Cfg} = State00) -> %% TODO: refactor to use wal batch API? Num = length(Cmds), {State0, Effects0} = - lists:foldl(fun(C, {S0, E}) -> - {ok, I, T, S} = append_log_leader(C, S0), + lists:foldl(fun(C, {S0, E0}) -> + {ok, I, T, S, E} = append_log_leader(C, S0, E0), case C of {_, #{from := From}, _, after_log_append} -> {S, [{reply, From, @@ -1023,18 +1023,27 @@ handle_follower(#append_entries_rpc{term = Term, _ -> State1 = lists:foldl(fun pre_append_log_follower/2, State0, Entries), + %% if the cluster has changed we need to update + %% the leaderboard + Effects1 = case maps:get(cluster, State0) =/= + maps:get(cluster, State1) of + true -> + [update_leaderboard | Effects0]; + false -> + Effects0 + end, case ra_log:write(Entries, Log1) of {ok, Log2} -> {NextState, State, Effects} = evaluate_commit_index_follower(State1#{log => Log2}, - Effects0), + Effects1), {NextState, State, [{next_event, {ra_log_event, flush_cache}} | Effects]}; {error, wal_down} -> {await_condition, State1#{log => Log1, condition => fun wal_down_condition/2}, - Effects0}; + Effects1}; {error, _} = Err -> exit(Err) end @@ -1248,8 +1257,9 @@ handle_follower(force_member_change, Cluster = #{Id => new_peer()}, ?WARN("~ts: Forcing cluster change. New cluster ~w", [LogId, Cluster]), - {ok, _, _, State} = append_cluster_change(Cluster, undefined, no_reply, State0), - call_for_election(pre_vote, State, [{reply, ok}]); + {ok, _, _, State, Effects} = + append_cluster_change(Cluster, undefined, no_reply, State0, []), + call_for_election(pre_vote, State, [{reply, ok} | Effects]); handle_follower(Msg, State) -> log_unhandled_msg(follower, Msg, State), {follower, State, []}. @@ -1639,6 +1649,8 @@ filter_follower_effects(Effects) -> [C | Acc]; (garbage_collection = C, Acc) -> [C | Acc]; + (update_leaderboard = C, Acc) -> + [C | Acc]; ({delete_snapshot, _} = C, Acc) -> [C | Acc]; ({send_msg, _, _, _Opts} = C, Acc) -> @@ -2495,65 +2507,67 @@ add_reply(_, _, _, % From, Reply, Mode {Effects, Notifys}. append_log_leader({CmdTag, _, _, _}, - State = #{cluster_change_permitted := false}) + #{cluster_change_permitted := false} = State, + Effects) when CmdTag == '$ra_join' orelse CmdTag == '$ra_leave' -> - {not_appended, cluster_change_not_permitted, State}; -append_log_leader({'$ra_join', From, - #{id := JoiningNode, voter_status := Voter0}, - ReplyMode}, - State = #{cluster := OldCluster}) -> + {not_appended, cluster_change_not_permitted, State, Effects}; +append_log_leader({'$ra_join', From, #{id := JoiningNode, + voter_status := Voter0}, ReplyMode}, + #{cluster := OldCluster} = State, Effects) -> case ensure_promotion_target(Voter0, State) of {error, Reason} -> {not_appended, Reason, State}; {ok, Voter} -> case OldCluster of #{JoiningNode := #{voter_status := Voter}} -> - already_member(State); + already_member(State, Effects); #{JoiningNode := Peer} -> % Update member status. Cluster = OldCluster#{JoiningNode => Peer#{voter_status => Voter}}, - append_cluster_change(Cluster, From, ReplyMode, State); + append_cluster_change(Cluster, From, ReplyMode, State, Effects); _ -> % Insert new member. Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter_status => Voter})}, - append_cluster_change(Cluster, From, ReplyMode, State) + append_cluster_change(Cluster, From, ReplyMode, State, Effects) end end; append_log_leader({'$ra_join', From, #{id := JoiningNode} = Config, ReplyMode}, - State) -> + State, Effects) -> append_log_leader({'$ra_join', From, #{id => JoiningNode, voter_status => maps:with([membership, uid, target], Config)}, - ReplyMode}, State); + ReplyMode}, State, Effects); append_log_leader({'$ra_join', From, JoiningNode, ReplyMode}, - State = #{cluster := OldCluster}) -> + #{cluster := OldCluster} = State, + Effects) -> % Legacy $ra_join, join as voter if no such member in the cluster. case OldCluster of #{JoiningNode := _} -> - already_member(State); + already_member(State, Effects); _ -> - append_log_leader({'$ra_join', From, #{id => JoiningNode}, ReplyMode}, State) + append_log_leader({'$ra_join', From, #{id => JoiningNode}, ReplyMode}, + State, Effects) end; append_log_leader({'$ra_leave', From, LeavingServer, ReplyMode}, - State = #{cfg := #cfg{log_id = LogId}, - cluster := OldCluster}) -> + #{cfg := #cfg{log_id = LogId}, + cluster := OldCluster} = State, Effects) -> case OldCluster of #{LeavingServer := _} -> Cluster = maps:remove(LeavingServer, OldCluster), - append_cluster_change(Cluster, From, ReplyMode, State); + append_cluster_change(Cluster, From, ReplyMode, State, Effects); _ -> ?DEBUG("~ts: member ~w requested to leave but was not a member. " "Members: ~w", [LogId, LeavingServer, maps:keys(OldCluster)]), % not a member - do nothing - {not_appended, not_member, State} + {not_appended, not_member, State, Effects} end; -append_log_leader(Cmd, State = #{log := Log0, current_term := Term}) -> +append_log_leader(Cmd, State = #{log := Log0, current_term := Term}, Effects) -> NextIdx = ra_log:next_index(Log0), Log = ra_log:append({NextIdx, Term, Cmd}, Log0), - {ok, NextIdx, Term, State#{log => Log}}. + {ok, NextIdx, Term, State#{log => Log}, Effects}. pre_append_log_follower({Idx, Term, Cmd} = Entry, State = #{cluster_index_term := {Idx, CITTerm}}) @@ -2582,10 +2596,11 @@ pre_append_log_follower(_, State) -> State. append_cluster_change(Cluster, From, ReplyMode, - State = #{log := Log0, - cluster := PrevCluster, - cluster_index_term := {PrevCITIdx, PrevCITTerm}, - current_term := Term}) -> + #{log := Log0, + cluster := PrevCluster, + cluster_index_term := {PrevCITIdx, PrevCITTerm}, + current_term := Term} = State, + Effects0) -> % turn join command into a generic cluster change command % that include the new cluster configuration Command = {'$ra_cluster_change', From, Cluster, ReplyMode}, @@ -2594,12 +2609,14 @@ append_cluster_change(Cluster, From, ReplyMode, % TODO: is it safe to do change the cluster config with an async write? % what happens if the write fails? Log = ra_log:append({NextIdx, Term, Command}, Log0), + Effects = [update_leaderboard | Effects0], {ok, NextIdx, Term, State#{log => Log, cluster => Cluster, cluster_change_permitted => false, cluster_index_term => IdxTerm, - previous_cluster => {PrevCITIdx, PrevCITTerm, PrevCluster}}}. + previous_cluster => {PrevCITIdx, PrevCITTerm, PrevCluster}}, + Effects}. mismatch_append_entries_reply(Term, CommitIndex, State0) -> {CITerm, State} = fetch_term(CommitIndex, State0), @@ -2891,10 +2908,10 @@ meta_name(#cfg{system_config = #{names := #{log_meta := Name}}}) -> meta_name(#{names := #{log_meta := Name}}) -> Name. -already_member(State) -> +already_member(State, Effects) -> % already a member do nothing % TODO: reply? If we don't reply the caller may block until timeout - {not_appended, already_member, State}. + {not_appended, already_member, State, Effects}. %%% ==================== %%% Voter status helpers @@ -2909,7 +2926,7 @@ ensure_promotion_target(#{membership := promotable, uid := _} = Status, #{log := Log}) -> %% The next index in the log is used by for a cluster change command: %% the caller of `ensure_promotion_target/2' also calls - %% `append_cluster_change/4'. So even if a peer joins a cluster which isn't + %% `append_cluster_change/5'. So even if a peer joins a cluster which isn't %% handling any other commands, this promotion target will be reachable. Target = ra_log:next_index(Log), {ok, Status#{target => Target}}; diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 7b20b154..cbaa97b3 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -980,10 +980,8 @@ terminate(Reason, StateName, Parent = ra_directory:where_is_parent(Names, UId), case Reason of {shutdown, delete} -> - catch ra_leaderboard:clear(ClusterName), catch ra_directory:unregister_name(Names, UId), catch ra_log_meta:delete_sync(MetaName, UId), - catch ets:delete(ra_state, UId), catch ra_counters:delete(Id), Self = self(), %% we have to terminate the child spec from the supervisor as it @@ -1004,6 +1002,7 @@ terminate(Reason, StateName, _ -> ok end, + catch ra_leaderboard:clear(ClusterName), _ = ets:delete(ra_metrics, MetricsKey), _ = ets:delete(ra_state, Key), ok; @@ -1340,6 +1339,9 @@ handle_effect(_, garbage_collection, _EvtType, State, Actions) -> true = erlang:garbage_collect(), incr_counter(State#state.conf, ?C_RA_SRV_GCS, 1), {State, Actions}; +handle_effect(_, update_leaderboard, _EvtType, State, Actions) -> + ok = record_leader_change(leader_id(State), State), + {State, Actions}; handle_effect(_, {monitor, _ProcOrNode, PidOrNode}, _, #state{monitors = Monitors} = State, Actions0) -> {State#state{monitors = ra_monitors:add(PidOrNode, machine, Monitors)}, diff --git a/src/ra_server_sup_sup.erl b/src/ra_server_sup_sup.erl index 1b759b9d..c96f152e 100644 --- a/src/ra_server_sup_sup.erl +++ b/src/ra_server_sup_sup.erl @@ -130,12 +130,12 @@ prepare_server_stop_rpc(System, RaName) -> {ok, Parent, SrvSup} end. --spec delete_server(atom(), NodeId :: ra_server_id()) -> +-spec delete_server(atom(), ServerId :: ra_server_id()) -> ok | {error, term()} | {badrpc, term()}. -delete_server(System, NodeId) when is_atom(System) -> - Node = ra_lib:ra_server_id_node(NodeId), - Name = ra_lib:ra_server_id_to_local_name(NodeId), - case stop_server(System, NodeId) of +delete_server(System, ServerId) when is_atom(System) -> + Node = ra_lib:ra_server_id_node(ServerId), + Name = ra_lib:ra_server_id_to_local_name(ServerId), + case stop_server(System, ServerId) of ok -> rpc:call(Node, ?MODULE, delete_server_rpc, [System, Name]); {error, _} = Err -> Err @@ -151,6 +151,7 @@ delete_server_rpc(System, RaName) -> ?INFO("Deleting server ~w and its data directory.~n", [RaName]), %% TODO: better handle and report errors + %% UId could be `undefined' here UId = ra_directory:uid_of(Names, RaName), Pid = ra_directory:where_is(Names, RaName), ra_log_meta:delete(Meta, UId), diff --git a/test/coordination_SUITE.erl b/test/coordination_SUITE.erl index fa1a7445..7756460b 100644 --- a/test/coordination_SUITE.erl +++ b/test/coordination_SUITE.erl @@ -41,6 +41,7 @@ all_tests() -> delete_three_server_cluster_parallel, start_cluster_majority, start_cluster_minority, + grow_cluster, send_local_msg, local_log_effect, leaderboard, @@ -295,6 +296,68 @@ start_cluster_minority(Config) -> [ok = slave:stop(S) || {_, S} <- NodeIds0], ok. +grow_cluster(Config) -> + PrivDir = ?config(data_dir, Config), + ClusterName = ?config(cluster_name, Config), + [{_, ANode} = A, + {_, BNode} = B, + {_, CNode} = C] = + ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]], + Machine = {module, ?MODULE, #{}}, + {ok, [A], []} = ra:start_cluster(?SYS, ClusterName, Machine, [A]), + + ok = ra:start_server(?SYS, ClusterName, B, Machine, [A]), + {ok, _, _} = ra:add_member(A, B), + {ok, _, _} = ra:process_command(A, banana), + [A, B] = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]), + [A, B] = rpc:call(BNode, ra_leaderboard, lookup_members, [ClusterName]), + + ok = ra:start_server(?SYS, ClusterName, C, Machine, [A, B]), + {ok, _, _} = ra:add_member(A, C), + {ok, _, _} = ra:process_command(A, banana), + {ok, _, L1} = ra:members(A), + [A, B, C] = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]), + L1 = rpc:call(ANode, ra_leaderboard, lookup_leader, [ClusterName]), + %% TODO: handle race conditions + await_condition( + fun () -> + [A, B, C] == rpc:call(BNode, ra_leaderboard, lookup_members, [ClusterName]) andalso + L1 == rpc:call(BNode, ra_leaderboard, lookup_leader, [ClusterName]) + end, 20), + await_condition( + fun () -> + [A, B, C] == rpc:call(CNode, ra_leaderboard, lookup_members, [ClusterName]) andalso + L1 == rpc:call(CNode, ra_leaderboard, lookup_leader, [ClusterName]) + end, 20), + + ok = ra:leave_and_delete_server(?SYS, A, A), + {ok, _, _} = ra:process_command(B, banana), + {ok, _, L2} = ra:members(B), + + %% check members + [B, C] = rpc:call(BNode, ra_leaderboard, lookup_members, [ClusterName]), + [B, C] = rpc:call(CNode, ra_leaderboard, lookup_members, [ClusterName]), + undefined = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]), + %% check leader + L2 = rpc:call(BNode, ra_leaderboard, lookup_leader, [ClusterName]), + L2 = rpc:call(CNode, ra_leaderboard, lookup_leader, [ClusterName]), + undefined = rpc:call(ANode, ra_leaderboard, lookup_leader, [ClusterName]), + + + ok = ra:leave_and_delete_server(?SYS, B, B), + {ok, _, _} = ra:process_command(C, banana), + %% check members + [C] = rpc:call(CNode, ra_leaderboard, lookup_members, [ClusterName]), + undefined = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]), + undefined = rpc:call(BNode, ra_leaderboard, lookup_members, [ClusterName]), + %% check leader + C = rpc:call(CNode, ra_leaderboard, lookup_leader, [ClusterName]), + undefined = rpc:call(ANode, ra_leaderboard, lookup_leader, [ClusterName]), + undefined = rpc:call(BNode, ra_leaderboard, lookup_leader, [ClusterName]), + + [ok = slave:stop(S) || {_, S} <- ServerIds], + ok. + send_local_msg(Config) -> PrivDir = ?config(data_dir, Config), ClusterName = ?config(cluster_name, Config), diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index ae080eb1..ccb2e7c3 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -876,8 +876,8 @@ append_entries_reply_success_promotes_nonvoter(_Config) -> entries = [{4, 5, {'$ra_cluster_change', _, #{N2 := #{voter_status := #{membership := voter, uid := <<"uid">>}}}, - _}}]}} - ]} = ra_server:handle_leader(RaJoin, State2), + _}}]}} | + _]} = ra_server:handle_leader(RaJoin, State2), Ack2 = #append_entries_reply{term = 5, success = true, next_index = 5,