diff --git a/src/ra_server.erl b/src/ra_server.erl index 02e1a004..62e16d0a 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -503,19 +503,19 @@ handle_leader({PeerId, #append_entries_reply{success = false, {leader, State, Effects}; handle_leader({command, Cmd}, #{cfg := #cfg{log_id = LogId} = Cfg} = State00) -> ok = incr_counter(Cfg, ?C_RA_SRV_COMMANDS, 1), - case append_log_leader(Cmd, State00) of - {not_appended, Reason, State} -> + case append_log_leader(Cmd, State00, []) of + {not_appended, Reason, State, Effects0} -> ?WARN("~ts command ~W NOT appended to log. Reason ~w", [LogId, Cmd, 10, Reason]), Effects = case Cmd of {_, #{from := From}, _, _} -> - [{reply, From, {error, Reason}}]; + [{reply, From, {error, Reason}} | Effects0]; _ -> - [] + Effects0 end, {leader, State, Effects}; - {ok, Idx, Term, State0} -> - {State, _, Effects0} = make_pipelined_rpc_effects(State0, []), + {ok, Idx, Term, State0, Effects00} -> + {State, _, Effects0} = make_pipelined_rpc_effects(State0, Effects00), % check if a reply is required. % TODO: refactor - can this be made a bit nicer/more explicit? Effects = case Cmd of @@ -531,8 +531,8 @@ handle_leader({commands, Cmds}, #{cfg := Cfg} = State00) -> %% TODO: refactor to use wal batch API? Num = length(Cmds), {State0, Effects0} = - lists:foldl(fun(C, {S0, E}) -> - {ok, I, T, S} = append_log_leader(C, S0), + lists:foldl(fun(C, {S0, E0}) -> + {ok, I, T, S, E} = append_log_leader(C, S0, E0), case C of {_, #{from := From}, _, after_log_append} -> {S, [{reply, From, @@ -1023,18 +1023,27 @@ handle_follower(#append_entries_rpc{term = Term, _ -> State1 = lists:foldl(fun pre_append_log_follower/2, State0, Entries), + %% if the cluster has changed we need to update + %% the leaderboard + Effects1 = case maps:get(cluster, State0) =/= + maps:get(cluster, State1) of + true -> + [update_leaderboard | Effects0]; + false -> + Effects0 + end, case ra_log:write(Entries, Log1) of {ok, Log2} -> {NextState, State, Effects} = evaluate_commit_index_follower(State1#{log => Log2}, - Effects0), + Effects1), {NextState, State, [{next_event, {ra_log_event, flush_cache}} | Effects]}; {error, wal_down} -> {await_condition, State1#{log => Log1, condition => fun wal_down_condition/2}, - Effects0}; + Effects1}; {error, _} = Err -> exit(Err) end @@ -1248,8 +1257,9 @@ handle_follower(force_member_change, Cluster = #{Id => new_peer()}, ?WARN("~ts: Forcing cluster change. New cluster ~w", [LogId, Cluster]), - {ok, _, _, State} = append_cluster_change(Cluster, undefined, no_reply, State0), - call_for_election(pre_vote, State, [{reply, ok}]); + {ok, _, _, State, Effects} = + append_cluster_change(Cluster, undefined, no_reply, State0, []), + call_for_election(pre_vote, State, [{reply, ok} | Effects]); handle_follower(Msg, State) -> log_unhandled_msg(follower, Msg, State), {follower, State, []}. @@ -1639,6 +1649,8 @@ filter_follower_effects(Effects) -> [C | Acc]; (garbage_collection = C, Acc) -> [C | Acc]; + (update_leaderboard = C, Acc) -> + [C | Acc]; ({delete_snapshot, _} = C, Acc) -> [C | Acc]; ({send_msg, _, _, _Opts} = C, Acc) -> @@ -2495,65 +2507,67 @@ add_reply(_, _, _, % From, Reply, Mode {Effects, Notifys}. append_log_leader({CmdTag, _, _, _}, - State = #{cluster_change_permitted := false}) + #{cluster_change_permitted := false} = State, + Effects) when CmdTag == '$ra_join' orelse CmdTag == '$ra_leave' -> - {not_appended, cluster_change_not_permitted, State}; -append_log_leader({'$ra_join', From, - #{id := JoiningNode, voter_status := Voter0}, - ReplyMode}, - State = #{cluster := OldCluster}) -> + {not_appended, cluster_change_not_permitted, State, Effects}; +append_log_leader({'$ra_join', From, #{id := JoiningNode, + voter_status := Voter0}, ReplyMode}, + #{cluster := OldCluster} = State, Effects) -> case ensure_promotion_target(Voter0, State) of {error, Reason} -> {not_appended, Reason, State}; {ok, Voter} -> case OldCluster of #{JoiningNode := #{voter_status := Voter}} -> - already_member(State); + already_member(State, Effects); #{JoiningNode := Peer} -> % Update member status. Cluster = OldCluster#{JoiningNode => Peer#{voter_status => Voter}}, - append_cluster_change(Cluster, From, ReplyMode, State); + append_cluster_change(Cluster, From, ReplyMode, State, Effects); _ -> % Insert new member. Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter_status => Voter})}, - append_cluster_change(Cluster, From, ReplyMode, State) + append_cluster_change(Cluster, From, ReplyMode, State, Effects) end end; append_log_leader({'$ra_join', From, #{id := JoiningNode} = Config, ReplyMode}, - State) -> + State, Effects) -> append_log_leader({'$ra_join', From, #{id => JoiningNode, voter_status => maps:with([membership, uid, target], Config)}, - ReplyMode}, State); + ReplyMode}, State, Effects); append_log_leader({'$ra_join', From, JoiningNode, ReplyMode}, - State = #{cluster := OldCluster}) -> + #{cluster := OldCluster} = State, + Effects) -> % Legacy $ra_join, join as voter if no such member in the cluster. case OldCluster of #{JoiningNode := _} -> - already_member(State); + already_member(State, Effects); _ -> - append_log_leader({'$ra_join', From, #{id => JoiningNode}, ReplyMode}, State) + append_log_leader({'$ra_join', From, #{id => JoiningNode}, ReplyMode}, + State, Effects) end; append_log_leader({'$ra_leave', From, LeavingServer, ReplyMode}, - State = #{cfg := #cfg{log_id = LogId}, - cluster := OldCluster}) -> + #{cfg := #cfg{log_id = LogId}, + cluster := OldCluster} = State, Effects) -> case OldCluster of #{LeavingServer := _} -> Cluster = maps:remove(LeavingServer, OldCluster), - append_cluster_change(Cluster, From, ReplyMode, State); + append_cluster_change(Cluster, From, ReplyMode, State, Effects); _ -> ?DEBUG("~ts: member ~w requested to leave but was not a member. " "Members: ~w", [LogId, LeavingServer, maps:keys(OldCluster)]), % not a member - do nothing - {not_appended, not_member, State} + {not_appended, not_member, State, Effects} end; -append_log_leader(Cmd, State = #{log := Log0, current_term := Term}) -> +append_log_leader(Cmd, State = #{log := Log0, current_term := Term}, Effects) -> NextIdx = ra_log:next_index(Log0), Log = ra_log:append({NextIdx, Term, Cmd}, Log0), - {ok, NextIdx, Term, State#{log => Log}}. + {ok, NextIdx, Term, State#{log => Log}, Effects}. pre_append_log_follower({Idx, Term, Cmd} = Entry, State = #{cluster_index_term := {Idx, CITTerm}}) @@ -2582,10 +2596,11 @@ pre_append_log_follower(_, State) -> State. append_cluster_change(Cluster, From, ReplyMode, - State = #{log := Log0, - cluster := PrevCluster, - cluster_index_term := {PrevCITIdx, PrevCITTerm}, - current_term := Term}) -> + #{log := Log0, + cluster := PrevCluster, + cluster_index_term := {PrevCITIdx, PrevCITTerm}, + current_term := Term} = State, + Effects0) -> % turn join command into a generic cluster change command % that include the new cluster configuration Command = {'$ra_cluster_change', From, Cluster, ReplyMode}, @@ -2594,12 +2609,14 @@ append_cluster_change(Cluster, From, ReplyMode, % TODO: is it safe to do change the cluster config with an async write? % what happens if the write fails? Log = ra_log:append({NextIdx, Term, Command}, Log0), + Effects = [update_leaderboard | Effects0], {ok, NextIdx, Term, State#{log => Log, cluster => Cluster, cluster_change_permitted => false, cluster_index_term => IdxTerm, - previous_cluster => {PrevCITIdx, PrevCITTerm, PrevCluster}}}. + previous_cluster => {PrevCITIdx, PrevCITTerm, PrevCluster}}, + Effects}. mismatch_append_entries_reply(Term, CommitIndex, State0) -> {CITerm, State} = fetch_term(CommitIndex, State0), @@ -2891,10 +2908,10 @@ meta_name(#cfg{system_config = #{names := #{log_meta := Name}}}) -> meta_name(#{names := #{log_meta := Name}}) -> Name. -already_member(State) -> +already_member(State, Effects) -> % already a member do nothing % TODO: reply? If we don't reply the caller may block until timeout - {not_appended, already_member, State}. + {not_appended, already_member, State, Effects}. %%% ==================== %%% Voter status helpers @@ -2909,7 +2926,7 @@ ensure_promotion_target(#{membership := promotable, uid := _} = Status, #{log := Log}) -> %% The next index in the log is used by for a cluster change command: %% the caller of `ensure_promotion_target/2' also calls - %% `append_cluster_change/4'. So even if a peer joins a cluster which isn't + %% `append_cluster_change/5'. So even if a peer joins a cluster which isn't %% handling any other commands, this promotion target will be reachable. Target = ra_log:next_index(Log), {ok, Status#{target => Target}}; diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 7b20b154..cbaa97b3 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -980,10 +980,8 @@ terminate(Reason, StateName, Parent = ra_directory:where_is_parent(Names, UId), case Reason of {shutdown, delete} -> - catch ra_leaderboard:clear(ClusterName), catch ra_directory:unregister_name(Names, UId), catch ra_log_meta:delete_sync(MetaName, UId), - catch ets:delete(ra_state, UId), catch ra_counters:delete(Id), Self = self(), %% we have to terminate the child spec from the supervisor as it @@ -1004,6 +1002,7 @@ terminate(Reason, StateName, _ -> ok end, + catch ra_leaderboard:clear(ClusterName), _ = ets:delete(ra_metrics, MetricsKey), _ = ets:delete(ra_state, Key), ok; @@ -1340,6 +1339,9 @@ handle_effect(_, garbage_collection, _EvtType, State, Actions) -> true = erlang:garbage_collect(), incr_counter(State#state.conf, ?C_RA_SRV_GCS, 1), {State, Actions}; +handle_effect(_, update_leaderboard, _EvtType, State, Actions) -> + ok = record_leader_change(leader_id(State), State), + {State, Actions}; handle_effect(_, {monitor, _ProcOrNode, PidOrNode}, _, #state{monitors = Monitors} = State, Actions0) -> {State#state{monitors = ra_monitors:add(PidOrNode, machine, Monitors)}, diff --git a/src/ra_server_sup_sup.erl b/src/ra_server_sup_sup.erl index 1b759b9d..c96f152e 100644 --- a/src/ra_server_sup_sup.erl +++ b/src/ra_server_sup_sup.erl @@ -130,12 +130,12 @@ prepare_server_stop_rpc(System, RaName) -> {ok, Parent, SrvSup} end. --spec delete_server(atom(), NodeId :: ra_server_id()) -> +-spec delete_server(atom(), ServerId :: ra_server_id()) -> ok | {error, term()} | {badrpc, term()}. -delete_server(System, NodeId) when is_atom(System) -> - Node = ra_lib:ra_server_id_node(NodeId), - Name = ra_lib:ra_server_id_to_local_name(NodeId), - case stop_server(System, NodeId) of +delete_server(System, ServerId) when is_atom(System) -> + Node = ra_lib:ra_server_id_node(ServerId), + Name = ra_lib:ra_server_id_to_local_name(ServerId), + case stop_server(System, ServerId) of ok -> rpc:call(Node, ?MODULE, delete_server_rpc, [System, Name]); {error, _} = Err -> Err @@ -151,6 +151,7 @@ delete_server_rpc(System, RaName) -> ?INFO("Deleting server ~w and its data directory.~n", [RaName]), %% TODO: better handle and report errors + %% UId could be `undefined' here UId = ra_directory:uid_of(Names, RaName), Pid = ra_directory:where_is(Names, RaName), ra_log_meta:delete(Meta, UId), diff --git a/test/coordination_SUITE.erl b/test/coordination_SUITE.erl index fa1a7445..7756460b 100644 --- a/test/coordination_SUITE.erl +++ b/test/coordination_SUITE.erl @@ -41,6 +41,7 @@ all_tests() -> delete_three_server_cluster_parallel, start_cluster_majority, start_cluster_minority, + grow_cluster, send_local_msg, local_log_effect, leaderboard, @@ -295,6 +296,68 @@ start_cluster_minority(Config) -> [ok = slave:stop(S) || {_, S} <- NodeIds0], ok. +grow_cluster(Config) -> + PrivDir = ?config(data_dir, Config), + ClusterName = ?config(cluster_name, Config), + [{_, ANode} = A, + {_, BNode} = B, + {_, CNode} = C] = + ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]], + Machine = {module, ?MODULE, #{}}, + {ok, [A], []} = ra:start_cluster(?SYS, ClusterName, Machine, [A]), + + ok = ra:start_server(?SYS, ClusterName, B, Machine, [A]), + {ok, _, _} = ra:add_member(A, B), + {ok, _, _} = ra:process_command(A, banana), + [A, B] = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]), + [A, B] = rpc:call(BNode, ra_leaderboard, lookup_members, [ClusterName]), + + ok = ra:start_server(?SYS, ClusterName, C, Machine, [A, B]), + {ok, _, _} = ra:add_member(A, C), + {ok, _, _} = ra:process_command(A, banana), + {ok, _, L1} = ra:members(A), + [A, B, C] = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]), + L1 = rpc:call(ANode, ra_leaderboard, lookup_leader, [ClusterName]), + %% TODO: handle race conditions + await_condition( + fun () -> + [A, B, C] == rpc:call(BNode, ra_leaderboard, lookup_members, [ClusterName]) andalso + L1 == rpc:call(BNode, ra_leaderboard, lookup_leader, [ClusterName]) + end, 20), + await_condition( + fun () -> + [A, B, C] == rpc:call(CNode, ra_leaderboard, lookup_members, [ClusterName]) andalso + L1 == rpc:call(CNode, ra_leaderboard, lookup_leader, [ClusterName]) + end, 20), + + ok = ra:leave_and_delete_server(?SYS, A, A), + {ok, _, _} = ra:process_command(B, banana), + {ok, _, L2} = ra:members(B), + + %% check members + [B, C] = rpc:call(BNode, ra_leaderboard, lookup_members, [ClusterName]), + [B, C] = rpc:call(CNode, ra_leaderboard, lookup_members, [ClusterName]), + undefined = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]), + %% check leader + L2 = rpc:call(BNode, ra_leaderboard, lookup_leader, [ClusterName]), + L2 = rpc:call(CNode, ra_leaderboard, lookup_leader, [ClusterName]), + undefined = rpc:call(ANode, ra_leaderboard, lookup_leader, [ClusterName]), + + + ok = ra:leave_and_delete_server(?SYS, B, B), + {ok, _, _} = ra:process_command(C, banana), + %% check members + [C] = rpc:call(CNode, ra_leaderboard, lookup_members, [ClusterName]), + undefined = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]), + undefined = rpc:call(BNode, ra_leaderboard, lookup_members, [ClusterName]), + %% check leader + C = rpc:call(CNode, ra_leaderboard, lookup_leader, [ClusterName]), + undefined = rpc:call(ANode, ra_leaderboard, lookup_leader, [ClusterName]), + undefined = rpc:call(BNode, ra_leaderboard, lookup_leader, [ClusterName]), + + [ok = slave:stop(S) || {_, S} <- ServerIds], + ok. + send_local_msg(Config) -> PrivDir = ?config(data_dir, Config), ClusterName = ?config(cluster_name, Config), diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index ae080eb1..ccb2e7c3 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -876,8 +876,8 @@ append_entries_reply_success_promotes_nonvoter(_Config) -> entries = [{4, 5, {'$ra_cluster_change', _, #{N2 := #{voter_status := #{membership := voter, uid := <<"uid">>}}}, - _}}]}} - ]} = ra_server:handle_leader(RaJoin, State2), + _}}]}} | + _]} = ra_server:handle_leader(RaJoin, State2), Ack2 = #append_entries_reply{term = 5, success = true, next_index = 5,