diff --git a/src/wa_raft_server.erl b/src/wa_raft_server.erl index d83025a..efdf7d0 100644 --- a/src/wa_raft_server.erl +++ b/src/wa_raft_server.erl @@ -78,6 +78,27 @@ parse_rpc/2 ]). +%%------------------------------------------------------------------------------ +%% RAFT Server - Internal APIs - Commands +%%------------------------------------------------------------------------------ + +-export([ + commit/2, + read/2, + snapshot_available/3, + adjust_membership/3, + adjust_membership/4, + promote/2, + promote/3, + promote/4, + resign/1, + handover/1, + handover/2, + handover_candidates/1, + disable/2, + enable/1 +]). + %%------------------------------------------------------------------------------ %% RAFT Server - State Machine Implementation %%------------------------------------------------------------------------------ @@ -99,30 +120,6 @@ witness/3 ]). -%%------------------------------------------------------------------------------ -%% RAFT Server - Internal APIs - Commands -%%------------------------------------------------------------------------------ - --export([ - commit/2, - read/2 -]). - --export([ - snapshot_available/3, - promote/2, - promote/3, - promote/4, - resign/1, - adjust_membership/3, - adjust_membership/4, - handover/1, - handover/2, - handover_candidates/1, - disable/2, - enable/1 -]). - %%------------------------------------------------------------------------------ %% RAFT Server - Test Exports %%------------------------------------------------------------------------------ @@ -342,20 +339,6 @@ maybe_upgrade_config(_Config) -> % All valid configurations will contain at least their own version. error(badarg). -%%------------------------------------------------------------------------------ -%% RAFT Server - Internal APIs - Commands -%%------------------------------------------------------------------------------ - -%% Commit an op to the consensus group. --spec commit(Pid :: gen_statem:server_ref(), Op :: wa_raft_acceptor:op()) -> ok | wa_raft:error(). -commit(Pid, Op) -> - gen_statem:cast(Pid, ?COMMIT_COMMAND(Op)). - -%% Strongly-consistent read --spec read(Pid :: gen_statem:server_ref(), Op :: wa_raft_acceptor:op()) -> ok | wa_raft:error(). -read(Pid, Op) -> - gen_statem:cast(Pid, ?READ_COMMAND(Op)). - %%------------------------------------------------------------------------------ %% RAFT Server - Public APIs %%------------------------------------------------------------------------------ @@ -391,73 +374,8 @@ membership(Service) -> Config -> get_config_members(Config) end. -% An API that uses storage timeout since it interacts with storage layer directly --spec snapshot_available(Pid :: gen_statem:server_ref(), Root :: string(), Pos :: wa_raft_log:log_pos()) -> ok | wa_raft:error(). -snapshot_available(Pid, Root, Pos) -> - gen_statem:call(Pid, ?SNAPSHOT_AVAILABLE_COMMAND(Root, Pos), ?RAFT_STORAGE_CALL_TIMEOUT()). - -%% TODO(hsun324): Update promote to enable setting a RAFT cluster membership -%% in order to be able to bootstrap new RAFT clusters. --spec promote(Pid :: gen_statem:server_ref(), Term :: pos_integer()) -> ok | wa_raft:error(). -promote(Pid, Term) -> - promote(Pid, Term, false). - --spec promote(Pid :: gen_statem:server_ref(), Term :: pos_integer(), Force :: boolean()) -> ok | wa_raft:error(). -promote(Pid, Term, Force) -> - promote(Pid, Term, Force, undefined). - --spec promote(Pid :: gen_statem:server_ref(), Term :: pos_integer(), Force :: boolean(), Config :: undefined | config()) -> ok | wa_raft:error(). -promote(Pid, Term, Force, Config) -> - gen_statem:call(Pid, ?PROMOTE_COMMAND(Term, Force, Config), ?RAFT_RPC_CALL_TIMEOUT()). - --spec resign(Pid :: gen_statem:server_ref()) -> ok | wa_raft:error(). -resign(Pid) -> - gen_statem:call(Pid, ?RESIGN_COMMAND, ?RAFT_RPC_CALL_TIMEOUT()). - --spec adjust_membership( - Name :: gen_statem:server_ref(), - Action :: add | remove | add_witness | remove_witness, - Peer :: peer() -) -> - {ok, Pos :: wa_raft_log:log_pos()} | wa_raft:error(). -adjust_membership(Name, Action, Peer) -> - adjust_membership(Name, Action, Peer, undefined). - --spec adjust_membership( - Name :: gen_statem:server_ref(), - Action :: add | remove | add_witness | remove_witness, - Peer :: peer(), - ConfigIndex :: wa_raft_log:log_index() | undefined -) -> {ok, Pos :: wa_raft_log:log_pos()} | wa_raft:error(). -adjust_membership(Name, Action, Peer, ConfigIndex) -> - gen_statem:call(Name, ?ADJUST_MEMBERSHIP_COMMAND(Action, Peer, ConfigIndex), ?RAFT_RPC_CALL_TIMEOUT()). - --spec handover_candidates(Name :: gen_statem:server_ref()) -> {ok, Candidates :: [node()]} | wa_raft:error(). -handover_candidates(Name) -> - gen_statem:call(Name, ?HANDOVER_CANDIDATES_COMMAND, ?RAFT_RPC_CALL_TIMEOUT()). - -%% Instruct a RAFT leader to attempt a handover to a random handover candidate. --spec handover(Name :: gen_statem:server_ref()) -> ok. -handover(Name) -> - gen_statem:cast(Name, ?HANDOVER_COMMAND(undefined)). - -%% Instruct a RAFT leader to attempt a handover to the specified peer node. -%% If an `undefined` peer node is specified, then handover to a random handover candidate. -%% Returns which peer node the handover was sent to or otherwise an error. --spec handover(Name :: gen_statem:server_ref(), Peer :: node() | undefined) -> {ok, Peer :: node()} | wa_raft:error(). -handover(Name, Peer) -> - gen_statem:call(Name, ?HANDOVER_COMMAND(Peer), ?RAFT_RPC_CALL_TIMEOUT()). - --spec disable(Name :: gen_statem:server_ref(), Reason :: term()) -> ok | {error, ErrorReason :: atom()}. -disable(Name, Reason) -> - gen_statem:cast(Name, ?DISABLE_COMMAND(Reason)). - --spec enable(Name :: gen_statem:server_ref()) -> ok | {error, ErrorReason :: atom()}. -enable(Name) -> - gen_statem:call(Name, ?ENABLE_COMMAND, ?RAFT_RPC_CALL_TIMEOUT()). - %%------------------------------------------------------------------------------ -%% Internal API +%% RAFT Server - Internal APIs - Local Options %%------------------------------------------------------------------------------ %% Get the default name for the RAFT server associated with the provided @@ -475,6 +393,10 @@ registered_name(Table, Partition) -> Options -> Options#raft_options.server_name end. +%%------------------------------------------------------------------------------ +%% RAFT Server - Internal APIs - RPC Handling +%%------------------------------------------------------------------------------ + -spec make_rpc(Self :: #raft_identity{}, Term :: wa_raft_log:log_term(), Procedure :: normalized_procedure()) -> rpc(). make_rpc(#raft_identity{name = Name, node = Node}, Term, ?PROCEDURE(Procedure, Payload)) -> % For compatibility with legacy versions that expect RPCs sent with no arguments to have payload 'undefined' instead of {}. @@ -496,10 +418,108 @@ parse_rpc(#raft_identity{name = Name} = Self, ?LEGACY_RAFT_RPC(Procedure, Term, parse_rpc(Self, ?RAFT_NAMED_RPC(Procedure, Term, Name, SenderId, Payload)). %%------------------------------------------------------------------------------ -%% RAFT Server - State Machine Implementation +%% RAFT Server - Internal APIs - Commands +%%------------------------------------------------------------------------------ + +-spec commit( + Server :: gen_statem:server_ref(), + Op :: wa_raft_acceptor:op() +) -> ok | wa_raft:error(). +commit(Server, Op) -> + gen_statem:cast(Server, ?COMMIT_COMMAND(Op)). + +-spec read( + Server :: gen_statem:server_ref(), + Op :: wa_raft_acceptor:read_op() +) -> ok | wa_raft:error(). +read(Server, Op) -> + gen_statem:cast(Server, ?READ_COMMAND(Op)). + +-spec snapshot_available( + Server :: gen_statem:server_ref(), + Root :: file:filename(), + Position :: wa_raft_log:log_pos() +) -> ok | wa_raft:error(). +snapshot_available(Server, Root, Position) -> + % Use the storage call timeout because this command requires the RAFT + % server to make a potentially expensive call against the RAFT storage + % server to complete. + gen_statem:call(Server, ?SNAPSHOT_AVAILABLE_COMMAND(Root, Position), ?RAFT_STORAGE_CALL_TIMEOUT()). + +-spec adjust_membership( + Server :: gen_statem:server_ref(), + Action :: add | remove | add_witness | remove_witness, + Peer :: peer() +) -> + {ok, Pos :: wa_raft_log:log_pos()} | wa_raft:error(). +adjust_membership(Server, Action, Peer) -> + adjust_membership(Server, Action, Peer, undefined). + +-spec adjust_membership( + Server :: gen_statem:server_ref(), + Action :: add | remove | add_witness | remove_witness, + Peer :: peer(), + ConfigIndex :: wa_raft_log:log_index() | undefined +) -> {ok, Pos :: wa_raft_log:log_pos()} | wa_raft:error(). +adjust_membership(Server, Action, Peer, ConfigIndex) -> + gen_statem:call(Server, ?ADJUST_MEMBERSHIP_COMMAND(Action, Peer, ConfigIndex), ?RAFT_RPC_CALL_TIMEOUT()). + +-spec promote( + Server :: gen_statem:server_ref(), + Term :: wa_raft_log:log_term() +) -> ok | wa_raft:error(). +promote(Server, Term) -> + promote(Server, Term, false). + +-spec promote( + Server :: gen_statem:server_ref(), + Term :: wa_raft_log:log_term(), + Force :: boolean() +) -> ok | wa_raft:error(). +promote(Server, Term, Force) -> + promote(Server, Term, Force, undefined). + +-spec promote( + Server :: gen_statem:server_ref(), + Term :: wa_raft_log:log_term(), + Force :: boolean(), + Config :: config() | config_all() | undefined +) -> ok | wa_raft:error(). +promote(Server, Term, Force, Config) -> + gen_statem:call(Server, ?PROMOTE_COMMAND(Term, Force, Config), ?RAFT_RPC_CALL_TIMEOUT()). + +-spec resign(Server :: gen_statem:server_ref()) -> ok | wa_raft:error(). +resign(Server) -> + gen_statem:call(Server, ?RESIGN_COMMAND, ?RAFT_RPC_CALL_TIMEOUT()). + +%% Instruct a RAFT leader to attempt a handover to a random handover candidate. +-spec handover(Server :: gen_statem:server_ref()) -> ok. +handover(Server) -> + gen_statem:cast(Server, ?HANDOVER_COMMAND(undefined)). + +%% Instruct a RAFT leader to attempt a handover to the specified peer node. +%% If an `undefined` peer node is specified, then handover to a random handover candidate. +%% Returns which peer node the handover was sent to or otherwise an error. +-spec handover(Server :: gen_statem:server_ref(), Peer :: node() | undefined) -> {ok, Peer :: node()} | wa_raft:error(). +handover(Server, Peer) -> + gen_statem:call(Server, ?HANDOVER_COMMAND(Peer), ?RAFT_RPC_CALL_TIMEOUT()). + +-spec handover_candidates(Server :: gen_statem:server_ref()) -> {ok, Candidates :: [node()]} | wa_raft:error(). +handover_candidates(Server) -> + gen_statem:call(Server, ?HANDOVER_CANDIDATES_COMMAND, ?RAFT_RPC_CALL_TIMEOUT()). + +-spec disable(Server :: gen_statem:server_ref(), Reason :: term()) -> ok | {error, ErrorReason :: atom()}. +disable(Server, Reason) -> + gen_statem:cast(Server, ?DISABLE_COMMAND(Reason)). + +-spec enable(Server :: gen_statem:server_ref()) -> ok | {error, ErrorReason :: atom()}. +enable(Server) -> + gen_statem:call(Server, ?ENABLE_COMMAND, ?RAFT_RPC_CALL_TIMEOUT()). + +%%------------------------------------------------------------------------------ +%% RAFT Server - State Machine Implementation - General Callbacks %%------------------------------------------------------------------------------ -%% gen_statem callbacks -spec init(Options :: #raft_options{}) -> gen_statem:init_result(state()). init(#raft_options{application = Application, table = Table, partition = Partition, witness = Witness, self = Self, identifier = Identifier, database = DataDir, distribution_module = DistributionModule, log_name = Log, log_catchup_name = Catchup, server_name = Name, storage_name = Storage} = Options) -> @@ -555,8 +575,17 @@ init(#raft_options{application = Application, table = Table, partition = Partiti callback_mode() -> [state_functions, state_enter]. +-spec terminate(Reason :: term(), State :: state(), Data :: #raft_state{}) -> ok. +terminate(Reason, State, #raft_state{name = Name, table = Table, partition = Partition, current_term = CurrentTerm} = Data) -> + ?LOG_NOTICE("Server[~0p, term ~0p, ~0p] terminating due to ~p.", + [Name, CurrentTerm, State, Reason], #{domain => [whatsapp, wa_raft]}), + wa_raft_durable_state:sync(Data), + wa_raft_info:delete_state(Table, Partition), + wa_raft_info:set_stale(Table, Partition, true), + ok. + %%------------------------------------------------------------------------------ -%% Procedure Call Marshalling +%% RAFT Server - State Machine Implementation - Procedure Call Marshalling %%------------------------------------------------------------------------------ %% A macro that destructures the identity record indicating that the @@ -678,12 +707,23 @@ defaultize_payload(Defaults, Payload, N, M) when N < M -> defaultize_payload(Defaults, erlang:delete_element(M, Payload), N, M - 1). %%------------------------------------------------------------------------------ -%% RAFT Server - State Machien Implementation - State-specific Callbacks +%% RAFT Server - State Machine Implementation - Stalled State +%%------------------------------------------------------------------------------ +%% The stalled state is an extension to the RAFT protocol designed to handle +%% situations in which a replica of the FSM is lost or replaced within a RAFT +%% cluster without being removed from the cluster membership. As the replica of +%% the FSM stored before the machine was lost or replaced could have been a +%% critical member of a quorum, it is important to ensure that the replacement +%% does not support a different result for any quorums before it receives a +%% fresh copy of the FSM state and log that is guaranteed to reflect any +%% quorums that the machine supported before it was lost or replaced. +%% +%% This is acheived by preventing a stalled node from participating in quorum +%% for both log entries and election. A leader of the cluster must provide a +%% fresh copy of its FSM state before the stalled node can return to normal +%% operation. %%------------------------------------------------------------------------------ -%% [RAFT Extension] -%% Stalled - an extension of raft protocol to handle the situation that a node join after being wiped(tupperware preemption). -%% This node should not participate any election. Otherwise it may vote yes to a lagging node and sabotage data integrity. -spec stalled(Type :: enter, PreviousStateName :: state(), Data :: #raft_state{}) -> gen_statem:state_enter_result(state(), #raft_state{}); (Type :: gen_statem:event_type(), Event :: event(), Data :: #raft_state{}) -> gen_statem:event_handler_result(state(), #raft_state{}). stalled(enter, PreviousStateName, #raft_state{name = Name, current_term = CurrentTerm} = State) -> @@ -754,13 +794,17 @@ stalled(Type, Event, #raft_state{name = Name, current_term = CurrentTerm}) -> [Name, CurrentTerm, Type, Event], #{domain => [whatsapp, wa_raft]}), keep_state_and_data. -%% [RAFT Core] -%% Leader - In the RAFT cluster, the leader node is a node agreed upon by the cluster to be -%% responsible for accepting and replicating commits and maintaining the integrity -%% of the underlying state machine. +%%------------------------------------------------------------------------------ +%% RAFT Server - State Machine Implementation - Leader State +%%------------------------------------------------------------------------------ +%% In a RAFT cluster, the leader of a RAFT term is a replica that has received +%% a quorum of votes from the cluster in that RAFT term, establishing it as the +%% unique coordinator for that RAFT term. The leader is responsible for +%% accepting and replicating new log entries to progress the state of the FSM. +%%------------------------------------------------------------------------------ + -spec leader(Type :: enter, PreviousStateName :: state(), Data :: #raft_state{}) -> gen_statem:state_enter_result(state(), #raft_state{}); (Type :: gen_statem:event_type(), Event :: event(), Data :: #raft_state{}) -> gen_statem:event_handler_result(state(), #raft_state{}). -%% [Leader] changing to leader leader(enter, PreviousStateName, #raft_state{name = Name, self = Self, current_term = CurrentTerm, log_view = View0} = State0) -> ?RAFT_COUNT('raft.leader.enter'), ?RAFT_COUNT('raft.leader.elected'), @@ -1206,6 +1250,15 @@ leader(Type, Event, #raft_state{name = Name, current_term = CurrentTerm}) -> [Name, CurrentTerm, Type, Event], #{domain => [whatsapp, wa_raft]}), keep_state_and_data. +%%------------------------------------------------------------------------------ +%% RAFT Server - State Machine Implementation - Follower State +%%------------------------------------------------------------------------------ +%% In a RAFT cluster, a follower is a replica that is receiving replicated log +%% entries from the leader of a RAFT term. The follower participates in quorum +%% decisions about log entries received from the leader by appending those log +%% entries to its own local copy of the RAFT log. +%%------------------------------------------------------------------------------ + -spec follower(Type :: enter, PreviousStateName :: state(), Data :: #raft_state{}) -> gen_statem:state_enter_result(state(), #raft_state{}); (Type :: gen_statem:event_type(), Event :: event(), Data :: #raft_state{}) -> gen_statem:event_handler_result(state(), #raft_state{}). follower(enter, PreviousStateName, #raft_state{name = Name, current_term = CurrentTerm} = State) -> @@ -1359,6 +1412,15 @@ follower(Type, Event, #raft_state{name = Name, current_term = CurrentTerm}) -> [Name, CurrentTerm, Type, Event], #{domain => [whatsapp, wa_raft]}), keep_state_and_data. +%%------------------------------------------------------------------------------ +%% RAFT Server - State Machine Implementation - Candidate State +%%------------------------------------------------------------------------------ +%% In a RAFT cluster, a candidate is a replica that is attempting to become the +%% leader of a RAFT term. It is waiting for responses from the other members of +%% the RAFT cluster to determine if it has received enough votes to assume the +%% leadership of the RAFT term. +%%------------------------------------------------------------------------------ + -spec candidate(Type :: enter, PreviousStateName :: state(), Data :: #raft_state{}) -> gen_statem:state_enter_result(state(), #raft_state{}); (Type :: gen_statem:event_type(), Event :: event(), Data :: #raft_state{}) -> gen_statem:event_handler_result(state(), #raft_state{}). %% [Enter] Node starts a new election upon entering the candidate state. @@ -1482,6 +1544,18 @@ candidate(Type, Event, #raft_state{name = Name, current_term = CurrentTerm}) -> [Name, CurrentTerm, Type, Event], #{domain => [whatsapp, wa_raft]}), keep_state_and_data. +%%------------------------------------------------------------------------------ +%% RAFT Server - State Machine Implementation - Disabled State +%%------------------------------------------------------------------------------ +%% The disabled state is an extension to the RAFT protocol used to hold any +%% replicas of an FSM that have for some reason or another identified that some +%% deficiency or malfunction that makes them unfit to either enforce any prior +%% quorum decisions or properly participate in future quorum decisions. Common +%% reasons include the detection of corruptions or inconsistencies within the +%% FSM state or RAFT log. The reason for which the replica was disabled is kept +%% in persistent so that the replica will remain disabled even when restarted. +%%------------------------------------------------------------------------------ + -spec disabled(Type :: enter, PreviousStateName :: state(), Data :: #raft_state{}) -> gen_statem:state_enter_result(state(), #raft_state{}); (Type :: gen_statem:event_type(), Event :: event(), Data :: #raft_state{}) -> gen_statem:event_handler_result(state(), #raft_state{}). disabled(enter, PreviousStateName, #raft_state{name = Name, current_term = CurrentTerm, disable_reason = DisableReason} = State0) -> @@ -1538,7 +1612,19 @@ disabled(Type, Event, #raft_state{name = Name, current_term = CurrentTerm}) -> [Name, CurrentTerm, Type, Event], #{domain => [whatsapp, wa_raft]}), keep_state_and_data. -%% [Witness] State functions +%%------------------------------------------------------------------------------ +%% RAFT Server - State Machine Implementation - Witness State +%%------------------------------------------------------------------------------ +%% The witness state is an extension to the RAFT protocol that identifies a +%% replica as a special "witness replica" that participates in quorum decisions +%% but does not retain a full copy of the actual underlying FSM. These replicas +%% can use significantly fewer system resources to operate however it is not +%% recommended for more than 25% of the replicas in a RAFT cluster to be +%% witness replicas as having more than such a number of witness replicas can +%% result in significantly reduced chance of data durability in the face of +%% unexpected replica loss. +%%------------------------------------------------------------------------------ + -spec witness(Type :: enter, PreviousStateName :: state(), Data :: #raft_state{}) -> gen_statem:state_enter_result(state(), #raft_state{}); (Type :: gen_statem:event_type(), Event :: event(), Data :: #raft_state{}) -> gen_statem:event_handler_result(state(), #raft_state{}). witness(enter, PreviousStateName, #raft_state{name = Name, current_term = CurrentTerm} = State) -> @@ -1664,17 +1750,13 @@ witness(Type, Event, #raft_state{name = Name, current_term = CurrentTerm}) -> [Name, CurrentTerm, Type, Event], #{domain => [whatsapp, wa_raft]}), keep_state_and_data. --spec terminate(Reason :: term(), State :: state(), Data :: #raft_state{}) -> ok. -terminate(Reason, State, #raft_state{name = Name, table = Table, partition = Partition, current_term = CurrentTerm} = Data) -> - ?LOG_NOTICE("Server[~0p, term ~0p, ~0p] terminating due to ~p.", - [Name, CurrentTerm, State, Reason], #{domain => [whatsapp, wa_raft]}), - wa_raft_durable_state:sync(Data), - wa_raft_info:delete_state(Table, Partition), - wa_raft_info:set_stale(Table, Partition, true), - ok. +%%------------------------------------------------------------------------------ +%% RAFT Server - State Machine Implementation - Command Handlers +%%------------------------------------------------------------------------------ +%% Fallbacks for command calls to the RAFT server for when there is no special +%% handling for a command defined within the state-specific callback itself. +%%------------------------------------------------------------------------------ -%% Fallbacks for command calls to the RAFT server for when there is no state-specific -%% handler for a particular command defined for a particular RAFT server FSM state. -spec command(state(), gen_statem:event_type(), command(), #raft_state{}) -> gen_statem:event_handler_result(state(), #raft_state{}). %% [Commit] Non-leader nodes should fail commits with {error, not_leader}. command(StateName, cast, ?COMMIT_COMMAND({Key, _}), @@ -1860,7 +1942,7 @@ command(StateName, Type, Event, #raft_state{name = Name, current_term = CurrentT keep_state_and_data. %%------------------------------------------------------------------------------ -%% RAFT Configuration Change Helpers +%% RAFT Server - State Machine Implementation - Cluster Configuration Helpers %%------------------------------------------------------------------------------ %% Determine if the specified peer is a member of the current cluster configuration, @@ -1956,9 +2038,10 @@ maybe_update_config(Index, _Term, {_Ref, {config, Config}}, #raft_state{table = maybe_update_config(_Index, _Term, _Op, State) -> State. -%% -%% Private functions -%% +%%------------------------------------------------------------------------------ +%% RAFT Server - State Machine Implementation - Private Functions +%%------------------------------------------------------------------------------ + -spec random_election_timeout(#raft_state{}) -> non_neg_integer(). random_election_timeout(#raft_state{application = App}) -> Max = ?RAFT_ELECTION_TIMEOUT_MAX(App), @@ -2140,7 +2223,7 @@ append_entries_to_followers(#raft_state{name = Name, table = Table, partition = end, State1, config_identities(config(State1))). %%------------------------------------------------------------------------------ -%% RAFT Server State Management +%% RAFT Server - State Machine Implementation - State Management %%------------------------------------------------------------------------------ -spec set_leader(StateName :: state(), Leader :: #raft_identity{}, State :: #raft_state{}) -> #raft_state{}. @@ -2220,7 +2303,7 @@ advance_term(StateName, NewerTerm, VotedFor, #raft_state{current_term = CurrentT State2. %%------------------------------------------------------------------------------ -%% RAFT Leader Functionality +%% RAFT Server - State Machine Implementation - Leader Methods %%------------------------------------------------------------------------------ -spec heartbeat(#raft_identity{}, #raft_state{}) -> #raft_state{}. @@ -2330,14 +2413,14 @@ adjust_config(Action, Config, #raft_state{self = Self}) -> end end. + %%------------------------------------------------------------------------------ -%% [AppendEntries] Logic for handling heartbeats from leaders (5.3) +%% RAFT Server - State Machine Implementation - Follower Methods %%------------------------------------------------------------------------------ + %% Attempt to append the log entries declared by a leader in a heartbeat, %% apply committed but not yet applied log entries, trim the log, and reset %% the election timeout timer as necessary. -%%------------------------------------------------------------------------------ - -spec handle_heartbeat(State :: state(), Event :: gen_statem:event_type(), Leader :: #raft_identity{}, PrevLogIndex :: wa_raft_log:log_index(), PrevLogTerm :: wa_raft_log:log_term(), Entries :: [wa_raft_log:log_entry()], CommitIndex :: wa_raft_log:log_index(), TrimIndex :: wa_raft_log:log_index(), Data :: #raft_state{}) -> gen_statem:event_handler_result(state(), #raft_state{}). handle_heartbeat(State, Event, Leader, PrevLogIndex, PrevLogTerm, Entries, CommitIndex, TrimIndex, #raft_state{application = App, name = Name, current_term = CurrentTerm, log_view = View} = Data0) -> @@ -2367,17 +2450,12 @@ handle_heartbeat(State, Event, Leader, PrevLogIndex, PrevLogTerm, Entries, Commi {next_state, disabled, Data0#raft_state{disable_reason = Reason}} end. -%%------------------------------------------------------------------------------ -%% [AppendEntries] Logic for log append during heartbeat and handover (5.3) -%%------------------------------------------------------------------------------ %% Append the provided range of the log entries to the local log only if the %% term of the previous log matches the term stored by the local log, %% otherwise, truncate the log if the term does not match or do nothing if %% the previous log entry is not available locally. If an unrecoverable error %% is encountered, returns a diagnostic that can be used as a reason to %% disable the current replica. -%%------------------------------------------------------------------------------ - -spec append_entries(State :: state(), PrevLogIndex :: wa_raft_log:log_index(), PrevLogTerm :: wa_raft_log:log_term(), Entries :: [wa_raft_log:log_entry()], EntryCount :: non_neg_integer(), Data :: #raft_state{}) -> {ok, Accepted :: boolean(), NewLastIndex :: wa_raft_log:log_index(), NewData :: #raft_state{}} | {fatal, Reason :: term()}. append_entries(State, PrevLogIndex, PrevLogTerm, Entries, EntryCount, #raft_state{name = Name, log_view = View, last_applied = LastApplied, current_term = CurrentTerm, leader_id = LeaderId} = Data) -> @@ -2429,6 +2507,10 @@ append_entries(State, PrevLogIndex, PrevLogTerm, Entries, EntryCount, #raft_stat {ok, false, wa_raft_log:last_index(View), Data} end. +%%------------------------------------------------------------------------------ +%% RAFT Server - State Machine Implementation - Helpers +%%------------------------------------------------------------------------------ + %% Generic reply function that operates based on event type. -spec reply(Type :: enter | gen_statem:event_type(), Message :: term()) -> ok | wa_raft:error(). reply(cast, _Message) ->