diff --git a/src/ra.erl b/src/ra.erl
index a34f60a2..1d252ba6 100644
--- a/src/ra.erl
+++ b/src/ra.erl
@@ -928,19 +928,38 @@ pipeline_command(ServerId, Command) ->
local_query(ServerId, QueryFun) ->
local_query(ServerId, QueryFun, ?DEFAULT_TIMEOUT).
-%% @doc Same as `local_query/2' but accepts a custom timeout.
+%% @doc Same as `local_query/2' but accepts a custom timeout or a map of
+%% options.
+%%
+%% The supported options are:
+%%
+%% - `wait_for_index': the query will be evaluated only once the specific
+%% `{Index, Term}' is applied locally.
+%% - `timeout': the maximum time to wait for the query to be evaluated.
+%%
+%%
%% @param ServerId the ra server id to send the query to
%% @param QueryFun the query function to run
-%% @param Timeout the timeout to use
+%% @param TimeoutOrOptions the timeout to use or a map of options
%% @see local_query/2
%% @end
-spec local_query(ServerId :: ra_server_id(),
QueryFun :: query_fun(),
- Timeout :: timeout()) ->
+ TimeoutOrOptions) ->
ra_server_proc:ra_leader_call_ret({ra_idxterm(), Reply :: term()}) |
- {ok, {ra_idxterm(), Reply :: term()}, not_known}.
-local_query(ServerId, QueryFun, Timeout) ->
- ra_server_proc:query(ServerId, QueryFun, local, Timeout).
+ {ok, {ra_idxterm(), Reply :: term()}, not_known}
+ when TimeoutOrOptions :: Timeout | Options,
+ Timeout :: timeout(),
+ Options :: #{wait_for_index => ra_idxterm(),
+ timeout => timeout()}.
+local_query(ServerId, QueryFun, Timeout)
+ when Timeout =:= infinity orelse is_integer(Timeout) ->
+ ra_server_proc:query(ServerId, QueryFun, local, #{}, Timeout);
+local_query(ServerId, QueryFun, Options) when is_map(Options) ->
+ Timeout = maps:get(timeout, Options, ?DEFAULT_TIMEOUT),
+ Options1 = maps:remove(timeout, Options),
+ ra_server_proc:query(
+ ServerId, QueryFun, local, Options1, Timeout).
%% @doc Query the machine state on the current leader node.
@@ -959,19 +978,38 @@ local_query(ServerId, QueryFun, Timeout) ->
leader_query(ServerId, QueryFun) ->
leader_query(ServerId, QueryFun, ?DEFAULT_TIMEOUT).
-%% @doc Same as `leader_query/2' but accepts a custom timeout.
+%% @doc Same as `leader_query/2' but accepts a custom timeout or a map of
+%% options.
+%%
+%% The supported options are:
+%%
+%% - `wait_for_index': the query will be evaluated only once the specific
+%% `{Index, Term}' is applied on the leader.
+%% - `timeout': the maximum time to wait for the query to be evaluated.
+%%
+%%
%% @param ServerId the ra server id(s) to send the query to
%% @param QueryFun the query function to run
-%% @param Timeout the timeout to use
+%% @param TimeoutOrOptions the timeout to use or a map of options
%% @see leader_query/2
%% @end
-spec leader_query(ServerId :: ra_server_id() | [ra_server_id()],
QueryFun :: query_fun(),
- Timeout :: timeout()) ->
+ TimeoutOrOptions) ->
ra_server_proc:ra_leader_call_ret({ra_idxterm(), Reply :: term()}) |
- {ok, {ra_idxterm(), Reply :: term()}, not_known}.
-leader_query(ServerId, QueryFun, Timeout) ->
- ra_server_proc:query(ServerId, QueryFun, leader, Timeout).
+ {ok, {ra_idxterm(), Reply :: term()}, not_known}
+ when TimeoutOrOptions :: Timeout | Options,
+ Timeout :: timeout(),
+ Options :: #{wait_for_index => ra_idxterm(),
+ timeout => timeout()}.
+leader_query(ServerId, QueryFun, Timeout)
+ when Timeout =:= infinity orelse is_integer(Timeout) ->
+ ra_server_proc:query(ServerId, QueryFun, leader, #{}, Timeout);
+leader_query(ServerId, QueryFun, Options) when is_map(Options) ->
+ Timeout = maps:get(timeout, Options, ?DEFAULT_TIMEOUT),
+ Options1 = maps:remove(timeout, Options),
+ ra_server_proc:query(
+ ServerId, QueryFun, leader, Options1, Timeout).
%% @doc Query the state machine with a consistency guarantee.
%% This allows the caller to query the state machine on the leader node with
@@ -999,7 +1037,7 @@ consistent_query(ServerId, QueryFun) ->
Timeout :: timeout()) ->
ra_server_proc:ra_leader_call_ret(Reply :: term()).
consistent_query(ServerId, QueryFun, Timeout) ->
- ra_server_proc:query(ServerId, QueryFun, consistent, Timeout).
+ ra_server_proc:query(ServerId, QueryFun, consistent, #{}, Timeout).
%% @doc Returns a list of cluster members
%%
diff --git a/src/ra_server.erl b/src/ra_server.erl
index e06b3c03..433aef0f 100644
--- a/src/ra_server.erl
+++ b/src/ra_server.erl
@@ -1817,6 +1817,8 @@ filter_follower_effects(Effects) ->
%% only machine monitors should not be emitted
%% by followers
[C | Acc];
+ ({applied_to, _} = C, Acc) ->
+ [C | Acc];
(L, Acc) when is_list(L) ->
%% nested case - recurse
case filter_follower_effects(L) of
@@ -2537,7 +2539,7 @@ apply_to(ApplyTo, ApplyFun, Notifys0, Effects0,
log := Log0} = State0)
when ApplyTo > LastApplied andalso MacVer >= EffMacVer ->
From = LastApplied + 1,
- {LastIdx, _} = ra_log:last_index_term(Log0),
+ {LastIdx, LastTerm} = ra_log:last_index_term(Log0),
To = min(LastIdx, ApplyTo),
FoldState = {MacMod, LastApplied, State0, MacState0,
Effects0, Notifys0, undefined},
@@ -2551,7 +2553,12 @@ apply_to(ApplyTo, ApplyFun, Notifys0, Effects0,
end,
%% due to machine versioning all entries may not have been applied
%%
- FinalEffs = make_notify_effects(Notifys, lists:reverse(Effects)),
+ FinalEffs0 = lists:reverse(Effects),
+ FinalEffs1 = make_notify_effects(Notifys, FinalEffs0),
+ %% The `applied_to' effect, also handled on followers, is used to signal
+ %% fences that are waiting on that applied index. See {@link
+ %% ra:local_fence/2} and {@link ra:leader_fence/2}.
+ FinalEffs = [{applied_to, {AppliedTo, LastTerm}} | FinalEffs1],
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_APPLIED, AppliedTo),
put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_LATENCY, CommitLatency),
{State#{last_applied => AppliedTo,
diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl
index 77ce1a4c..2bcbc519 100644
--- a/src/ra_server_proc.erl
+++ b/src/ra_server_proc.erl
@@ -45,7 +45,7 @@
command/3,
cast_command/2,
cast_command/3,
- query/4,
+ query/5,
state_query/3,
local_state_query/3,
trigger_election/2,
@@ -73,6 +73,7 @@
handle_effects(?FUNCTION_NAME, Effects, EvtType, State0)).
-type query_fun() :: ra:query_fun().
+-type query_options() :: #{wait_for_index => ra_idxterm()}.
-type ra_command() :: {ra_server:command_type(), term(),
ra_server:command_reply_mode()}.
@@ -113,7 +114,8 @@
safe_call_ret/1,
ra_event_reject_detail/0,
ra_event/0,
- ra_event_body/0]).
+ ra_event_body/0,
+ query_options/0]).
%% the ra server proc keeps monitors on behalf of different components
%% the state machine, log and server code. The tag is used to determine
@@ -143,7 +145,11 @@
low_priority_commands :: ra_ets_queue:state(),
election_timeout_set = false :: boolean(),
%% the log index last time gc was forced
- pending_notifys = #{} :: #{pid() => [term()]}
+ pending_notifys = #{} :: #{pid() => [term()]},
+ pending_queries = [] :: [{ra_term(),
+ ra_index(),
+ gen_statem:from(),
+ query_fun()}]
}).
%%%===================================================================
@@ -169,15 +175,17 @@ cast_command(ServerId, Priority, Cmd) ->
gen_statem:cast(ServerId, {command, Priority, Cmd}).
-spec query(server_loc(), query_fun(),
- local | consistent | leader, timeout()) ->
+ local | consistent | leader,
+ query_options(),
+ timeout()) ->
ra_server_proc:ra_leader_call_ret({ra_idxterm(), Reply :: term()})
| ra_server_proc:ra_leader_call_ret(Reply :: term())
| {ok, {ra_idxterm(), Reply :: term()}, not_known}.
-query(ServerLoc, QueryFun, local, Timeout) ->
- statem_call(ServerLoc, {local_query, QueryFun}, Timeout);
-query(ServerLoc, QueryFun, leader, Timeout) ->
- leader_call(ServerLoc, {local_query, QueryFun}, Timeout);
-query(ServerLoc, QueryFun, consistent, Timeout) ->
+query(ServerLoc, QueryFun, local, Options, Timeout) ->
+ statem_call(ServerLoc, {local_query, QueryFun, Options}, Timeout);
+query(ServerLoc, QueryFun, leader, Options, Timeout) ->
+ leader_call(ServerLoc, {local_query, QueryFun, Options}, Timeout);
+query(ServerLoc, QueryFun, consistent, _Options, Timeout) ->
leader_call(ServerLoc, {consistent_query, QueryFun}, Timeout).
-spec log_fold(ra_server_id(), fun(), term(), integer()) -> term().
@@ -498,11 +506,10 @@ leader(EventType, flush_commands,
_ ->
next_state(NextState, State, Actions)
end;
-leader({call, From}, {local_query, QueryFun},
- #state{conf = Conf,
- server_state = ServerState} = State) ->
- Reply = perform_local_query(QueryFun, id(State), ServerState, Conf),
- {keep_state, State, [{reply, From, Reply}]};
+leader({call, _From} = EventType, {local_query, QueryFun}, State) ->
+ leader(EventType, {local_query, QueryFun, #{}}, State);
+leader({call, From}, {local_query, QueryFun, Options}, State) ->
+ perform_or_delay_local_query(leader, From, QueryFun, Options, State);
leader({call, From}, {state_query, Spec}, State) ->
Reply = {ok, do_state_query(Spec, State), id(State)},
{keep_state, State, [{reply, From, Reply}]};
@@ -595,10 +602,10 @@ candidate(cast, {command, _Priority,
State) ->
_ = reject_command(Pid, Corr, State),
{keep_state, State, []};
-candidate({call, From}, {local_query, QueryFun},
- #state{conf = Conf, server_state = ServerState} = State) ->
- Reply = perform_local_query(QueryFun, not_known, ServerState, Conf),
- {keep_state, State, [{reply, From, Reply}]};
+candidate({call, _From} = EventType, {local_query, QueryFun}, State) ->
+ candidate(EventType, {local_query, QueryFun, #{}}, State);
+candidate({call, From}, {local_query, QueryFun, Options}, State) ->
+ perform_or_delay_local_query(candidate, From, QueryFun, Options, State);
candidate({call, From}, {state_query, Spec}, State) ->
Reply = {ok, do_state_query(Spec, State), id(State)},
{keep_state, State, [{reply, From, Reply}]};
@@ -643,10 +650,10 @@ pre_vote(cast, {command, _Priority,
State) ->
_ = reject_command(Pid, Corr, State),
{keep_state, State, []};
-pre_vote({call, From}, {local_query, QueryFun},
- #state{conf = Conf, server_state = ServerState} = State) ->
- Reply = perform_local_query(QueryFun, not_known, ServerState, Conf),
- {keep_state, State, [{reply, From, Reply}]};
+pre_vote({call, _From} = EventType, {local_query, QueryFun}, State) ->
+ pre_vote(EventType, {local_query, QueryFun, #{}}, State);
+pre_vote({call, From}, {local_query, QueryFun, Options}, State) ->
+ perform_or_delay_local_query(pre_vote, From, QueryFun, Options, State);
pre_vote({call, From}, {state_query, Spec}, State) ->
Reply = {ok, do_state_query(Spec, State), id(State)},
{keep_state, State, [{reply, From, Reply}]};
@@ -725,14 +732,10 @@ follower(cast, {command, _Priority,
State) ->
_ = reject_command(Pid, Corr, State),
{keep_state, State, []};
-follower({call, From}, {local_query, QueryFun},
- #state{conf = Conf, server_state = ServerState} = State) ->
- Leader = case ra_server:leader_id(ServerState) of
- undefined -> not_known;
- L -> L
- end,
- Reply = perform_local_query(QueryFun, Leader, ServerState, Conf),
- {keep_state, State, [{reply, From, Reply}]};
+follower({call, _From} = EventType, {local_query, QueryFun}, State) ->
+ follower(EventType, {local_query, QueryFun, #{}}, State);
+follower({call, From}, {local_query, QueryFun, Options}, State) ->
+ perform_or_delay_local_query(follower, From, QueryFun, Options, State);
follower({call, From}, {state_query, Spec}, State) ->
Reply = {ok, do_state_query(Spec, State), id(State)},
{keep_state, State, [{reply, From, Reply}]};
@@ -929,10 +932,11 @@ await_condition({call, From}, {leader_call, Msg}, State) ->
maybe_redirect(From, Msg, State);
await_condition(EventType, {local_call, Msg}, State) ->
await_condition(EventType, Msg, State);
-await_condition({call, From}, {local_query, QueryFun},
- #state{conf = Conf, server_state = ServerState} = State) ->
- Reply = perform_local_query(QueryFun, follower, ServerState, Conf),
- {keep_state, State, [{reply, From, Reply}]};
+await_condition({call, _From} = EventType, {local_query, QueryFun}, State) ->
+ await_condition(EventType, {local_query, QueryFun, #{}}, State);
+await_condition({call, From}, {local_query, QueryFun, Options}, State) ->
+ perform_or_delay_local_query(
+ await_condition, From, QueryFun, Options, State);
await_condition({call, From}, {state_query, Spec}, State) ->
Reply = {ok, do_state_query(Spec, State), id(State)},
{keep_state, State, [{reply, From, Reply}]};
@@ -1152,6 +1156,89 @@ handle_receive_snapshot(Msg, State) ->
handle_await_condition(Msg, State) ->
handle_raft_state(?FUNCTION_NAME, Msg, State).
+perform_or_delay_local_query(
+ RaftState, From, QueryFun, Options,
+ #state{conf = Conf,
+ server_state = ServerState,
+ pending_queries = PendingQueries} = State) ->
+ %% The caller might decide it wants the query to be executed only after a
+ %% specific index has been applied on the local node. It can specify that
+ %% with the `wait_for_index' option.
+ %%
+ %% If the option is unset or set to `undefined', the query is performed
+ %% immediatly. That is the default behavior.
+ %%
+ %% If the option is set to `{Index, Term}', the query is added to a list
+ %% of pending queries. It will be evaluated once that index is applied
+ %% locally.
+ Leader = determine_leader(RaftState, State),
+ case maps:get(wait_for_index, Options, undefined) of
+ undefined ->
+ Reply = perform_local_query(QueryFun, Leader, ServerState, Conf),
+ {keep_state, State, [{reply, From, Reply}]};
+ {Index, Term} = IdxTerm
+ when is_integer(Index) andalso is_integer(Term) ->
+ PendingQuery = {IdxTerm, From, QueryFun},
+ PendingQueries1 = [PendingQuery | PendingQueries],
+ State1 = State#state{pending_queries = PendingQueries1},
+ %% It's possible that the specified index was already applied.
+ %% That's why we evaluate pending queries just after adding the
+ %% query to the list.
+ {State2, Actions} = perform_pending_queries(RaftState, State1),
+ {keep_state, State2, Actions}
+ end.
+
+perform_pending_queries(RaftState, State) ->
+ #{current_term := CurrentTerm,
+ last_applied := LastApplied} = do_state_query(overview, State),
+ perform_pending_queries(RaftState, {LastApplied, CurrentTerm}, State, []).
+
+perform_pending_queries(RaftState, {LastApplied, CurrentTerm},
+ #state{conf = Conf,
+ server_state = ServerState,
+ pending_queries = PendingQueries} = State,
+ Actions) ->
+ Leader = determine_leader(RaftState, State),
+ {PendingQueries1,
+ Actions1} = lists:foldr(
+ fun
+ ({{TargetIndex, TargetTerm}, From, QueryFun}, {PQ, A})
+ when TargetTerm =:= CurrentTerm andalso
+ TargetIndex =< LastApplied ->
+ %% The local node reached or passed the target
+ %% index. We can evaluate the query.
+ Reply = perform_local_query(
+ QueryFun, Leader, ServerState, Conf),
+ A1 = [{reply, From, Reply} | A],
+ {PQ, A1};
+ ({{_TargetIndex, TargetTerm}, From, _}, {PQ, A})
+ when TargetTerm < CurrentTerm ->
+ %% The specified term ended. Therefore, we can't be
+ %% sure if the target index was ever applied. Let's
+ %% return an error for that query.
+ Reply = {error, term_is_over},
+ A1 = [{reply, From, Reply} | A],
+ {PQ, A1};
+ (PendingQuery, {PQ, A}) ->
+ PQ1 = [PendingQuery | PQ],
+ {PQ1, A}
+ end, {[], Actions}, PendingQueries),
+ State1 = State#state{pending_queries = PendingQueries1},
+ {State1, Actions1}.
+
+determine_leader(RaftState, #state{server_state = ServerState} = State) ->
+ case RaftState of
+ leader ->
+ id(State);
+ follower ->
+ case ra_server:leader_id(ServerState) of
+ undefined -> not_known;
+ L -> L
+ end;
+ _ ->
+ not_known
+ end.
+
perform_local_query(QueryFun, Leader, ServerState, Conf) ->
incr_counter(Conf, ?C_RA_SRV_LOCAL_QUERIES, 1),
try ra_server:machine_query(QueryFun, ServerState) of
@@ -1270,6 +1357,10 @@ handle_effect(_, {notify, Nots}, _, #state{} = State0, Actions) ->
%% should only be done by leader
State = send_applied_notifications(State0, Nots),
{State, Actions};
+handle_effect(RaftState, {applied_to, IdxTerm}, _, State, Actions) ->
+ %% The last applied index made progress. Check if there are pending
+ %% queries that wait for this index.
+ perform_pending_queries(RaftState, IdxTerm, State, Actions);
handle_effect(_, {cast, To, Msg}, _, State, Actions) ->
%% TODO: handle send failure
_ = gen_cast(To, Msg, State),
diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl
index 346ff6bc..6345a26f 100644
--- a/test/ra_SUITE.erl
+++ b/test/ra_SUITE.erl
@@ -48,6 +48,7 @@ all_tests() ->
local_query,
local_query_boom,
local_query_stale,
+ local_query_with_wait_for_index_option,
members,
members_info,
consistent_query,
@@ -488,6 +489,102 @@ local_query_stale(Config) ->
?assertNotMatch(LeaderV, NonLeaderV),
terminate_cluster(Cluster).
+local_query_with_wait_for_index_option() ->
+ [{timetrap, {minutes, 1}}].
+
+local_query_with_wait_for_index_option(Config) ->
+ [A, _B, _C] = Cluster = start_local_cluster(3, ?config(test_name, Config),
+ {simple, fun erlang:'+'/2, 9}),
+ try
+ %% Get the leader and deduce a follower.
+ {ok, _, Leader} = ra:process_command(A, 5, ?PROCESS_COMMAND_TIMEOUT),
+ [Follower | _] = Cluster -- [Leader],
+ ct:pal("Leader: ~0p~nFollower: ~0p", [Leader, Follower]),
+
+ %% Get the last applied index.
+ QueryFun = fun(S) -> S end,
+ {ok, {{Idx, Term}, 14}, _} = ra:local_query(Leader, QueryFun),
+
+ %% Query using the already applied index.
+ IdxTerm = {Idx, Term},
+ ?assertMatch(
+ {ok, {_, 14}, _},
+ ra:local_query(Leader, QueryFun, #{wait_for_index => IdxTerm})),
+ ?assertMatch(
+ {ok, {_, 14}, _},
+ ra:local_query(Follower, QueryFun, #{wait_for_index => IdxTerm})),
+
+ %% Query using the next index; this should time out.
+ IdxTerm1 = {Idx + 1, Term},
+ Timeout = 2000,
+ ?assertMatch(
+ {timeout, _},
+ ra:local_query(
+ Leader, QueryFun,
+ #{wait_for_index => IdxTerm1,
+ timeout => Timeout})),
+ ?assertMatch(
+ {timeout, _},
+ ra:local_query(
+ Follower, QueryFun,
+ #{wait_for_index => IdxTerm1,
+ timeout => Timeout})),
+
+ %% Submit a command through the follower.
+ ok = ra:pipeline_command(Follower, 3, no_correlation, normal),
+
+ %% Query using the next index; we should get an answer. The command
+ %% might be applied already before we submit the query though.
+ ?assertMatch(
+ {ok, {_, 17}, _},
+ ra:local_query(Leader, QueryFun, #{wait_for_index => IdxTerm1})),
+ ?assertMatch(
+ {ok, {_, 17}, _},
+ ra:local_query(Follower, QueryFun, #{wait_for_index => IdxTerm1})),
+
+ %% Query using the next next index; this should time out. This ensures
+ %% that that index wasn't already applied, invalidating the rest of the
+ %% test case.
+ IdxTerm2 = {Idx + 2, Term},
+ ?assertMatch(
+ {timeout, _},
+ ra:local_query(
+ Leader, QueryFun,
+ #{wait_for_index => IdxTerm2,
+ timeout => Timeout})),
+ ?assertMatch(
+ {timeout, _},
+ ra:local_query(
+ Follower, QueryFun,
+ #{wait_for_index => IdxTerm2,
+ timeout => Timeout})),
+
+ %% Query using the next next index; we should get an answer. This time,
+ %% we ensure that the query is submitted before the next command.
+ Parent = self(),
+ _Pid = spawn_link(
+ fun() ->
+ ?assertMatch(
+ {ok, {_, 19}, _},
+ ra:local_query(
+ Leader, QueryFun, #{wait_for_index => IdxTerm2})),
+ ?assertMatch(
+ {ok, {_, 19}, _},
+ ra:local_query(
+ Follower, QueryFun, #{wait_for_index => IdxTerm2})),
+ Parent ! done,
+ erlang:unlink(Parent)
+ end),
+
+ %% Submit a command through the follower.
+ ok = ra:pipeline_command(Follower, 2, no_correlation, normal),
+ receive
+ done -> ok
+ end
+ after
+ terminate_cluster(Cluster)
+ end.
+
consistent_query_stale(Config) ->
[A, B, _C] = Cluster = start_local_cluster(3, ?config(test_name, Config),
add_machine()),
diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl
index 89113683..6a3cb7ac 100644
--- a/test/ra_server_SUITE.erl
+++ b/test/ra_server_SUITE.erl
@@ -967,6 +967,7 @@ append_entries_reply_success_promotes_nonvoter(_Config) ->
last_applied := 3,
machine_state := <<"hi3">>},
[{next_event, info, pipeline_rpcs},
+ {applied_to, {3, 5}},
{aux, eval}]} = ra_server:handle_leader({N2, Ack2}, State3),
ok.
@@ -991,6 +992,7 @@ append_entries_reply_success(_Config) ->
last_applied := 3,
machine_state := <<"hi3">>} = State,
[{next_event, info, pipeline_rpcs},
+ {applied_to, {3, 5}},
{aux, eval}]} = ra_server:handle_leader(Msg, State0),
{leader, #{cluster := #{N2 := #{next_index := 4,
@@ -1841,6 +1843,7 @@ command_notify(_Config) ->
last_term = 5},
{leader, _State, [_,
+ {applied_to, {4, 5}},
{notify, _},
{aux,eval},
{aux, banana}