From 10a1f9d1347d67e120e023f7a2e3716dc72f1ad3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Wed, 19 Jun 2024 16:29:19 +0200 Subject: [PATCH] Add `wait_for_index` option to leader and local queries [Why] It allows to wait for a specific index to be applied locally (or on the leader) before a query is evaluated. It is useful when the caller wants to be sure that the result of the previous command is "visible" by the next query. By default, it's not guarantied because the command will be considered successfully applied as long as a quorum of Ra servers applied it. This list of Ra servers may not include the local node for instance. [How] If the `wait_for_index` option is specified with a `{Index, Term}` tuple, the query will be evaluated right away if that index is already applied, or it will be added to a list of pending queries. Pending queries are evaluated after each applied batch of commands by the local node. If a pending query's target index was reached or passed, it is evaluated. If a pending query's target term ended, an error is returned. Here is an example: ra:local_query(ServerId, QueryFun, #{wait_for_index => {Index, Term}}). The `local_query` tuple sent to the Ra server changes format. The old one was: {local_query, QueryFun} The new one is: {local_query, QueryFun, Options} If the remote Ra server that receives the query runs a version of Ra older than the one having this change and thus doesn't understand the new tuple, it will ignore and drop the query. This will lead to a timeout of the query, or an indefinitely hanging call if the timeout was set to `infinity`. Note in the opposite situation, i.e. if a Ra server that knows the new query tuple receives an old tuple, it will evaluate the query as if the options was an empty map. V2: Rename the option from `limit` to `wait_for_index` which is more explicit. --- src/ra.erl | 64 ++++++++++++---- src/ra_server.erl | 11 ++- src/ra_server_proc.erl | 159 ++++++++++++++++++++++++++++++--------- test/ra_SUITE.erl | 97 ++++++++++++++++++++++++ test/ra_server_SUITE.erl | 3 + 5 files changed, 285 insertions(+), 49 deletions(-) diff --git a/src/ra.erl b/src/ra.erl index a34f60a2..1d252ba6 100644 --- a/src/ra.erl +++ b/src/ra.erl @@ -928,19 +928,38 @@ pipeline_command(ServerId, Command) -> local_query(ServerId, QueryFun) -> local_query(ServerId, QueryFun, ?DEFAULT_TIMEOUT). -%% @doc Same as `local_query/2' but accepts a custom timeout. +%% @doc Same as `local_query/2' but accepts a custom timeout or a map of +%% options. +%% +%% The supported options are: +%% +%% %% @param ServerId the ra server id to send the query to %% @param QueryFun the query function to run -%% @param Timeout the timeout to use +%% @param TimeoutOrOptions the timeout to use or a map of options %% @see local_query/2 %% @end -spec local_query(ServerId :: ra_server_id(), QueryFun :: query_fun(), - Timeout :: timeout()) -> + TimeoutOrOptions) -> ra_server_proc:ra_leader_call_ret({ra_idxterm(), Reply :: term()}) | - {ok, {ra_idxterm(), Reply :: term()}, not_known}. -local_query(ServerId, QueryFun, Timeout) -> - ra_server_proc:query(ServerId, QueryFun, local, Timeout). + {ok, {ra_idxterm(), Reply :: term()}, not_known} + when TimeoutOrOptions :: Timeout | Options, + Timeout :: timeout(), + Options :: #{wait_for_index => ra_idxterm(), + timeout => timeout()}. +local_query(ServerId, QueryFun, Timeout) + when Timeout =:= infinity orelse is_integer(Timeout) -> + ra_server_proc:query(ServerId, QueryFun, local, #{}, Timeout); +local_query(ServerId, QueryFun, Options) when is_map(Options) -> + Timeout = maps:get(timeout, Options, ?DEFAULT_TIMEOUT), + Options1 = maps:remove(timeout, Options), + ra_server_proc:query( + ServerId, QueryFun, local, Options1, Timeout). %% @doc Query the machine state on the current leader node. @@ -959,19 +978,38 @@ local_query(ServerId, QueryFun, Timeout) -> leader_query(ServerId, QueryFun) -> leader_query(ServerId, QueryFun, ?DEFAULT_TIMEOUT). -%% @doc Same as `leader_query/2' but accepts a custom timeout. +%% @doc Same as `leader_query/2' but accepts a custom timeout or a map of +%% options. +%% +%% The supported options are: +%% +%% %% @param ServerId the ra server id(s) to send the query to %% @param QueryFun the query function to run -%% @param Timeout the timeout to use +%% @param TimeoutOrOptions the timeout to use or a map of options %% @see leader_query/2 %% @end -spec leader_query(ServerId :: ra_server_id() | [ra_server_id()], QueryFun :: query_fun(), - Timeout :: timeout()) -> + TimeoutOrOptions) -> ra_server_proc:ra_leader_call_ret({ra_idxterm(), Reply :: term()}) | - {ok, {ra_idxterm(), Reply :: term()}, not_known}. -leader_query(ServerId, QueryFun, Timeout) -> - ra_server_proc:query(ServerId, QueryFun, leader, Timeout). + {ok, {ra_idxterm(), Reply :: term()}, not_known} + when TimeoutOrOptions :: Timeout | Options, + Timeout :: timeout(), + Options :: #{wait_for_index => ra_idxterm(), + timeout => timeout()}. +leader_query(ServerId, QueryFun, Timeout) + when Timeout =:= infinity orelse is_integer(Timeout) -> + ra_server_proc:query(ServerId, QueryFun, leader, #{}, Timeout); +leader_query(ServerId, QueryFun, Options) when is_map(Options) -> + Timeout = maps:get(timeout, Options, ?DEFAULT_TIMEOUT), + Options1 = maps:remove(timeout, Options), + ra_server_proc:query( + ServerId, QueryFun, leader, Options1, Timeout). %% @doc Query the state machine with a consistency guarantee. %% This allows the caller to query the state machine on the leader node with @@ -999,7 +1037,7 @@ consistent_query(ServerId, QueryFun) -> Timeout :: timeout()) -> ra_server_proc:ra_leader_call_ret(Reply :: term()). consistent_query(ServerId, QueryFun, Timeout) -> - ra_server_proc:query(ServerId, QueryFun, consistent, Timeout). + ra_server_proc:query(ServerId, QueryFun, consistent, #{}, Timeout). %% @doc Returns a list of cluster members %% diff --git a/src/ra_server.erl b/src/ra_server.erl index e06b3c03..433aef0f 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -1817,6 +1817,8 @@ filter_follower_effects(Effects) -> %% only machine monitors should not be emitted %% by followers [C | Acc]; + ({applied_to, _} = C, Acc) -> + [C | Acc]; (L, Acc) when is_list(L) -> %% nested case - recurse case filter_follower_effects(L) of @@ -2537,7 +2539,7 @@ apply_to(ApplyTo, ApplyFun, Notifys0, Effects0, log := Log0} = State0) when ApplyTo > LastApplied andalso MacVer >= EffMacVer -> From = LastApplied + 1, - {LastIdx, _} = ra_log:last_index_term(Log0), + {LastIdx, LastTerm} = ra_log:last_index_term(Log0), To = min(LastIdx, ApplyTo), FoldState = {MacMod, LastApplied, State0, MacState0, Effects0, Notifys0, undefined}, @@ -2551,7 +2553,12 @@ apply_to(ApplyTo, ApplyFun, Notifys0, Effects0, end, %% due to machine versioning all entries may not have been applied %% - FinalEffs = make_notify_effects(Notifys, lists:reverse(Effects)), + FinalEffs0 = lists:reverse(Effects), + FinalEffs1 = make_notify_effects(Notifys, FinalEffs0), + %% The `applied_to' effect, also handled on followers, is used to signal + %% fences that are waiting on that applied index. See {@link + %% ra:local_fence/2} and {@link ra:leader_fence/2}. + FinalEffs = [{applied_to, {AppliedTo, LastTerm}} | FinalEffs1], put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_APPLIED, AppliedTo), put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_LATENCY, CommitLatency), {State#{last_applied => AppliedTo, diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 77ce1a4c..2bcbc519 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -45,7 +45,7 @@ command/3, cast_command/2, cast_command/3, - query/4, + query/5, state_query/3, local_state_query/3, trigger_election/2, @@ -73,6 +73,7 @@ handle_effects(?FUNCTION_NAME, Effects, EvtType, State0)). -type query_fun() :: ra:query_fun(). +-type query_options() :: #{wait_for_index => ra_idxterm()}. -type ra_command() :: {ra_server:command_type(), term(), ra_server:command_reply_mode()}. @@ -113,7 +114,8 @@ safe_call_ret/1, ra_event_reject_detail/0, ra_event/0, - ra_event_body/0]). + ra_event_body/0, + query_options/0]). %% the ra server proc keeps monitors on behalf of different components %% the state machine, log and server code. The tag is used to determine @@ -143,7 +145,11 @@ low_priority_commands :: ra_ets_queue:state(), election_timeout_set = false :: boolean(), %% the log index last time gc was forced - pending_notifys = #{} :: #{pid() => [term()]} + pending_notifys = #{} :: #{pid() => [term()]}, + pending_queries = [] :: [{ra_term(), + ra_index(), + gen_statem:from(), + query_fun()}] }). %%%=================================================================== @@ -169,15 +175,17 @@ cast_command(ServerId, Priority, Cmd) -> gen_statem:cast(ServerId, {command, Priority, Cmd}). -spec query(server_loc(), query_fun(), - local | consistent | leader, timeout()) -> + local | consistent | leader, + query_options(), + timeout()) -> ra_server_proc:ra_leader_call_ret({ra_idxterm(), Reply :: term()}) | ra_server_proc:ra_leader_call_ret(Reply :: term()) | {ok, {ra_idxterm(), Reply :: term()}, not_known}. -query(ServerLoc, QueryFun, local, Timeout) -> - statem_call(ServerLoc, {local_query, QueryFun}, Timeout); -query(ServerLoc, QueryFun, leader, Timeout) -> - leader_call(ServerLoc, {local_query, QueryFun}, Timeout); -query(ServerLoc, QueryFun, consistent, Timeout) -> +query(ServerLoc, QueryFun, local, Options, Timeout) -> + statem_call(ServerLoc, {local_query, QueryFun, Options}, Timeout); +query(ServerLoc, QueryFun, leader, Options, Timeout) -> + leader_call(ServerLoc, {local_query, QueryFun, Options}, Timeout); +query(ServerLoc, QueryFun, consistent, _Options, Timeout) -> leader_call(ServerLoc, {consistent_query, QueryFun}, Timeout). -spec log_fold(ra_server_id(), fun(), term(), integer()) -> term(). @@ -498,11 +506,10 @@ leader(EventType, flush_commands, _ -> next_state(NextState, State, Actions) end; -leader({call, From}, {local_query, QueryFun}, - #state{conf = Conf, - server_state = ServerState} = State) -> - Reply = perform_local_query(QueryFun, id(State), ServerState, Conf), - {keep_state, State, [{reply, From, Reply}]}; +leader({call, _From} = EventType, {local_query, QueryFun}, State) -> + leader(EventType, {local_query, QueryFun, #{}}, State); +leader({call, From}, {local_query, QueryFun, Options}, State) -> + perform_or_delay_local_query(leader, From, QueryFun, Options, State); leader({call, From}, {state_query, Spec}, State) -> Reply = {ok, do_state_query(Spec, State), id(State)}, {keep_state, State, [{reply, From, Reply}]}; @@ -595,10 +602,10 @@ candidate(cast, {command, _Priority, State) -> _ = reject_command(Pid, Corr, State), {keep_state, State, []}; -candidate({call, From}, {local_query, QueryFun}, - #state{conf = Conf, server_state = ServerState} = State) -> - Reply = perform_local_query(QueryFun, not_known, ServerState, Conf), - {keep_state, State, [{reply, From, Reply}]}; +candidate({call, _From} = EventType, {local_query, QueryFun}, State) -> + candidate(EventType, {local_query, QueryFun, #{}}, State); +candidate({call, From}, {local_query, QueryFun, Options}, State) -> + perform_or_delay_local_query(candidate, From, QueryFun, Options, State); candidate({call, From}, {state_query, Spec}, State) -> Reply = {ok, do_state_query(Spec, State), id(State)}, {keep_state, State, [{reply, From, Reply}]}; @@ -643,10 +650,10 @@ pre_vote(cast, {command, _Priority, State) -> _ = reject_command(Pid, Corr, State), {keep_state, State, []}; -pre_vote({call, From}, {local_query, QueryFun}, - #state{conf = Conf, server_state = ServerState} = State) -> - Reply = perform_local_query(QueryFun, not_known, ServerState, Conf), - {keep_state, State, [{reply, From, Reply}]}; +pre_vote({call, _From} = EventType, {local_query, QueryFun}, State) -> + pre_vote(EventType, {local_query, QueryFun, #{}}, State); +pre_vote({call, From}, {local_query, QueryFun, Options}, State) -> + perform_or_delay_local_query(pre_vote, From, QueryFun, Options, State); pre_vote({call, From}, {state_query, Spec}, State) -> Reply = {ok, do_state_query(Spec, State), id(State)}, {keep_state, State, [{reply, From, Reply}]}; @@ -725,14 +732,10 @@ follower(cast, {command, _Priority, State) -> _ = reject_command(Pid, Corr, State), {keep_state, State, []}; -follower({call, From}, {local_query, QueryFun}, - #state{conf = Conf, server_state = ServerState} = State) -> - Leader = case ra_server:leader_id(ServerState) of - undefined -> not_known; - L -> L - end, - Reply = perform_local_query(QueryFun, Leader, ServerState, Conf), - {keep_state, State, [{reply, From, Reply}]}; +follower({call, _From} = EventType, {local_query, QueryFun}, State) -> + follower(EventType, {local_query, QueryFun, #{}}, State); +follower({call, From}, {local_query, QueryFun, Options}, State) -> + perform_or_delay_local_query(follower, From, QueryFun, Options, State); follower({call, From}, {state_query, Spec}, State) -> Reply = {ok, do_state_query(Spec, State), id(State)}, {keep_state, State, [{reply, From, Reply}]}; @@ -929,10 +932,11 @@ await_condition({call, From}, {leader_call, Msg}, State) -> maybe_redirect(From, Msg, State); await_condition(EventType, {local_call, Msg}, State) -> await_condition(EventType, Msg, State); -await_condition({call, From}, {local_query, QueryFun}, - #state{conf = Conf, server_state = ServerState} = State) -> - Reply = perform_local_query(QueryFun, follower, ServerState, Conf), - {keep_state, State, [{reply, From, Reply}]}; +await_condition({call, _From} = EventType, {local_query, QueryFun}, State) -> + await_condition(EventType, {local_query, QueryFun, #{}}, State); +await_condition({call, From}, {local_query, QueryFun, Options}, State) -> + perform_or_delay_local_query( + await_condition, From, QueryFun, Options, State); await_condition({call, From}, {state_query, Spec}, State) -> Reply = {ok, do_state_query(Spec, State), id(State)}, {keep_state, State, [{reply, From, Reply}]}; @@ -1152,6 +1156,89 @@ handle_receive_snapshot(Msg, State) -> handle_await_condition(Msg, State) -> handle_raft_state(?FUNCTION_NAME, Msg, State). +perform_or_delay_local_query( + RaftState, From, QueryFun, Options, + #state{conf = Conf, + server_state = ServerState, + pending_queries = PendingQueries} = State) -> + %% The caller might decide it wants the query to be executed only after a + %% specific index has been applied on the local node. It can specify that + %% with the `wait_for_index' option. + %% + %% If the option is unset or set to `undefined', the query is performed + %% immediatly. That is the default behavior. + %% + %% If the option is set to `{Index, Term}', the query is added to a list + %% of pending queries. It will be evaluated once that index is applied + %% locally. + Leader = determine_leader(RaftState, State), + case maps:get(wait_for_index, Options, undefined) of + undefined -> + Reply = perform_local_query(QueryFun, Leader, ServerState, Conf), + {keep_state, State, [{reply, From, Reply}]}; + {Index, Term} = IdxTerm + when is_integer(Index) andalso is_integer(Term) -> + PendingQuery = {IdxTerm, From, QueryFun}, + PendingQueries1 = [PendingQuery | PendingQueries], + State1 = State#state{pending_queries = PendingQueries1}, + %% It's possible that the specified index was already applied. + %% That's why we evaluate pending queries just after adding the + %% query to the list. + {State2, Actions} = perform_pending_queries(RaftState, State1), + {keep_state, State2, Actions} + end. + +perform_pending_queries(RaftState, State) -> + #{current_term := CurrentTerm, + last_applied := LastApplied} = do_state_query(overview, State), + perform_pending_queries(RaftState, {LastApplied, CurrentTerm}, State, []). + +perform_pending_queries(RaftState, {LastApplied, CurrentTerm}, + #state{conf = Conf, + server_state = ServerState, + pending_queries = PendingQueries} = State, + Actions) -> + Leader = determine_leader(RaftState, State), + {PendingQueries1, + Actions1} = lists:foldr( + fun + ({{TargetIndex, TargetTerm}, From, QueryFun}, {PQ, A}) + when TargetTerm =:= CurrentTerm andalso + TargetIndex =< LastApplied -> + %% The local node reached or passed the target + %% index. We can evaluate the query. + Reply = perform_local_query( + QueryFun, Leader, ServerState, Conf), + A1 = [{reply, From, Reply} | A], + {PQ, A1}; + ({{_TargetIndex, TargetTerm}, From, _}, {PQ, A}) + when TargetTerm < CurrentTerm -> + %% The specified term ended. Therefore, we can't be + %% sure if the target index was ever applied. Let's + %% return an error for that query. + Reply = {error, term_is_over}, + A1 = [{reply, From, Reply} | A], + {PQ, A1}; + (PendingQuery, {PQ, A}) -> + PQ1 = [PendingQuery | PQ], + {PQ1, A} + end, {[], Actions}, PendingQueries), + State1 = State#state{pending_queries = PendingQueries1}, + {State1, Actions1}. + +determine_leader(RaftState, #state{server_state = ServerState} = State) -> + case RaftState of + leader -> + id(State); + follower -> + case ra_server:leader_id(ServerState) of + undefined -> not_known; + L -> L + end; + _ -> + not_known + end. + perform_local_query(QueryFun, Leader, ServerState, Conf) -> incr_counter(Conf, ?C_RA_SRV_LOCAL_QUERIES, 1), try ra_server:machine_query(QueryFun, ServerState) of @@ -1270,6 +1357,10 @@ handle_effect(_, {notify, Nots}, _, #state{} = State0, Actions) -> %% should only be done by leader State = send_applied_notifications(State0, Nots), {State, Actions}; +handle_effect(RaftState, {applied_to, IdxTerm}, _, State, Actions) -> + %% The last applied index made progress. Check if there are pending + %% queries that wait for this index. + perform_pending_queries(RaftState, IdxTerm, State, Actions); handle_effect(_, {cast, To, Msg}, _, State, Actions) -> %% TODO: handle send failure _ = gen_cast(To, Msg, State), diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl index 346ff6bc..6345a26f 100644 --- a/test/ra_SUITE.erl +++ b/test/ra_SUITE.erl @@ -48,6 +48,7 @@ all_tests() -> local_query, local_query_boom, local_query_stale, + local_query_with_wait_for_index_option, members, members_info, consistent_query, @@ -488,6 +489,102 @@ local_query_stale(Config) -> ?assertNotMatch(LeaderV, NonLeaderV), terminate_cluster(Cluster). +local_query_with_wait_for_index_option() -> + [{timetrap, {minutes, 1}}]. + +local_query_with_wait_for_index_option(Config) -> + [A, _B, _C] = Cluster = start_local_cluster(3, ?config(test_name, Config), + {simple, fun erlang:'+'/2, 9}), + try + %% Get the leader and deduce a follower. + {ok, _, Leader} = ra:process_command(A, 5, ?PROCESS_COMMAND_TIMEOUT), + [Follower | _] = Cluster -- [Leader], + ct:pal("Leader: ~0p~nFollower: ~0p", [Leader, Follower]), + + %% Get the last applied index. + QueryFun = fun(S) -> S end, + {ok, {{Idx, Term}, 14}, _} = ra:local_query(Leader, QueryFun), + + %% Query using the already applied index. + IdxTerm = {Idx, Term}, + ?assertMatch( + {ok, {_, 14}, _}, + ra:local_query(Leader, QueryFun, #{wait_for_index => IdxTerm})), + ?assertMatch( + {ok, {_, 14}, _}, + ra:local_query(Follower, QueryFun, #{wait_for_index => IdxTerm})), + + %% Query using the next index; this should time out. + IdxTerm1 = {Idx + 1, Term}, + Timeout = 2000, + ?assertMatch( + {timeout, _}, + ra:local_query( + Leader, QueryFun, + #{wait_for_index => IdxTerm1, + timeout => Timeout})), + ?assertMatch( + {timeout, _}, + ra:local_query( + Follower, QueryFun, + #{wait_for_index => IdxTerm1, + timeout => Timeout})), + + %% Submit a command through the follower. + ok = ra:pipeline_command(Follower, 3, no_correlation, normal), + + %% Query using the next index; we should get an answer. The command + %% might be applied already before we submit the query though. + ?assertMatch( + {ok, {_, 17}, _}, + ra:local_query(Leader, QueryFun, #{wait_for_index => IdxTerm1})), + ?assertMatch( + {ok, {_, 17}, _}, + ra:local_query(Follower, QueryFun, #{wait_for_index => IdxTerm1})), + + %% Query using the next next index; this should time out. This ensures + %% that that index wasn't already applied, invalidating the rest of the + %% test case. + IdxTerm2 = {Idx + 2, Term}, + ?assertMatch( + {timeout, _}, + ra:local_query( + Leader, QueryFun, + #{wait_for_index => IdxTerm2, + timeout => Timeout})), + ?assertMatch( + {timeout, _}, + ra:local_query( + Follower, QueryFun, + #{wait_for_index => IdxTerm2, + timeout => Timeout})), + + %% Query using the next next index; we should get an answer. This time, + %% we ensure that the query is submitted before the next command. + Parent = self(), + _Pid = spawn_link( + fun() -> + ?assertMatch( + {ok, {_, 19}, _}, + ra:local_query( + Leader, QueryFun, #{wait_for_index => IdxTerm2})), + ?assertMatch( + {ok, {_, 19}, _}, + ra:local_query( + Follower, QueryFun, #{wait_for_index => IdxTerm2})), + Parent ! done, + erlang:unlink(Parent) + end), + + %% Submit a command through the follower. + ok = ra:pipeline_command(Follower, 2, no_correlation, normal), + receive + done -> ok + end + after + terminate_cluster(Cluster) + end. + consistent_query_stale(Config) -> [A, B, _C] = Cluster = start_local_cluster(3, ?config(test_name, Config), add_machine()), diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index 89113683..6a3cb7ac 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -967,6 +967,7 @@ append_entries_reply_success_promotes_nonvoter(_Config) -> last_applied := 3, machine_state := <<"hi3">>}, [{next_event, info, pipeline_rpcs}, + {applied_to, {3, 5}}, {aux, eval}]} = ra_server:handle_leader({N2, Ack2}, State3), ok. @@ -991,6 +992,7 @@ append_entries_reply_success(_Config) -> last_applied := 3, machine_state := <<"hi3">>} = State, [{next_event, info, pipeline_rpcs}, + {applied_to, {3, 5}}, {aux, eval}]} = ra_server:handle_leader(Msg, State0), {leader, #{cluster := #{N2 := #{next_index := 4, @@ -1841,6 +1843,7 @@ command_notify(_Config) -> last_term = 5}, {leader, _State, [_, + {applied_to, {4, 5}}, {notify, _}, {aux,eval}, {aux, banana}