Skip to content

Commit

Permalink
Add wait_for_index option to leader and local queries
Browse files Browse the repository at this point in the history
[Why]
It allows to wait for a specific index to be applied locally (or on the
leader) before a query is evaluated. It is useful when the caller wants
to be sure that the result of the previous command is "visible" by the
next query.

By default, it's not guarantied because the command will be considered
successfully applied as long as a quorum of Ra servers applied it. This
list of Ra servers may not include the local node for instance.

[How]
If the `wait_for_index` option is specified with a `{Index, Term}`
tuple, the query will be evaluated right away if that index is already
applied, or it will be added to a list of pending queries.

Pending queries are evaluated after each applied batch of commands by
the local node. If a pending query's target index was reached or passed,
it is evaluated. If a pending query's target term ended, an error is
returned.

Here is an example:

    ra:local_query(ServerId, QueryFun, #{wait_for_index => {Index, Term}}).

The `local_query` tuple sent to the Ra server changes format. The old
one was:

    {local_query, QueryFun}

The new one is:

    {local_query, QueryFun, Options}

If the remote Ra server that receives the query runs a version of Ra
older than the one having this change and thus doesn't understand the
new tuple, it will ignore and drop the query. This will lead to a
timeout of the query, or an indefinitely hanging call if the timeout was
set to `infinity`.

Note in the opposite situation, i.e. if a Ra server that knows the new
query tuple receives an old tuple, it will evaluate the query as if the
options was an empty map.

V2: Rename the option from `limit` to `wait_for_index` which is more
    explicit.
  • Loading branch information
dumbbell committed Jun 25, 2024
1 parent 6de4074 commit 10a1f9d
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 49 deletions.
64 changes: 51 additions & 13 deletions src/ra.erl
Original file line number Diff line number Diff line change
Expand Up @@ -928,19 +928,38 @@ pipeline_command(ServerId, Command) ->
local_query(ServerId, QueryFun) ->
local_query(ServerId, QueryFun, ?DEFAULT_TIMEOUT).

%% @doc Same as `local_query/2' but accepts a custom timeout.
%% @doc Same as `local_query/2' but accepts a custom timeout or a map of
%% options.
%%
%% The supported options are:
%% <ul>
%% <li>`wait_for_index': the query will be evaluated only once the specific
%% `{Index, Term}' is applied locally.</li>
%% <li>`timeout': the maximum time to wait for the query to be evaluated.</li>
%% </ul>
%%
%% @param ServerId the ra server id to send the query to
%% @param QueryFun the query function to run
%% @param Timeout the timeout to use
%% @param TimeoutOrOptions the timeout to use or a map of options
%% @see local_query/2
%% @end
-spec local_query(ServerId :: ra_server_id(),
QueryFun :: query_fun(),
Timeout :: timeout()) ->
TimeoutOrOptions) ->
ra_server_proc:ra_leader_call_ret({ra_idxterm(), Reply :: term()}) |
{ok, {ra_idxterm(), Reply :: term()}, not_known}.
local_query(ServerId, QueryFun, Timeout) ->
ra_server_proc:query(ServerId, QueryFun, local, Timeout).
{ok, {ra_idxterm(), Reply :: term()}, not_known}
when TimeoutOrOptions :: Timeout | Options,
Timeout :: timeout(),
Options :: #{wait_for_index => ra_idxterm(),
timeout => timeout()}.
local_query(ServerId, QueryFun, Timeout)
when Timeout =:= infinity orelse is_integer(Timeout) ->
ra_server_proc:query(ServerId, QueryFun, local, #{}, Timeout);
local_query(ServerId, QueryFun, Options) when is_map(Options) ->
Timeout = maps:get(timeout, Options, ?DEFAULT_TIMEOUT),
Options1 = maps:remove(timeout, Options),
ra_server_proc:query(
ServerId, QueryFun, local, Options1, Timeout).


%% @doc Query the machine state on the current leader node.
Expand All @@ -959,19 +978,38 @@ local_query(ServerId, QueryFun, Timeout) ->
leader_query(ServerId, QueryFun) ->
leader_query(ServerId, QueryFun, ?DEFAULT_TIMEOUT).

%% @doc Same as `leader_query/2' but accepts a custom timeout.
%% @doc Same as `leader_query/2' but accepts a custom timeout or a map of
%% options.
%%
%% The supported options are:
%% <ul>
%% <li>`wait_for_index': the query will be evaluated only once the specific
%% `{Index, Term}' is applied on the leader.</li>
%% <li>`timeout': the maximum time to wait for the query to be evaluated.</li>
%% </ul>
%%
%% @param ServerId the ra server id(s) to send the query to
%% @param QueryFun the query function to run
%% @param Timeout the timeout to use
%% @param TimeoutOrOptions the timeout to use or a map of options
%% @see leader_query/2
%% @end
-spec leader_query(ServerId :: ra_server_id() | [ra_server_id()],
QueryFun :: query_fun(),
Timeout :: timeout()) ->
TimeoutOrOptions) ->
ra_server_proc:ra_leader_call_ret({ra_idxterm(), Reply :: term()}) |
{ok, {ra_idxterm(), Reply :: term()}, not_known}.
leader_query(ServerId, QueryFun, Timeout) ->
ra_server_proc:query(ServerId, QueryFun, leader, Timeout).
{ok, {ra_idxterm(), Reply :: term()}, not_known}
when TimeoutOrOptions :: Timeout | Options,
Timeout :: timeout(),
Options :: #{wait_for_index => ra_idxterm(),
timeout => timeout()}.
leader_query(ServerId, QueryFun, Timeout)
when Timeout =:= infinity orelse is_integer(Timeout) ->
ra_server_proc:query(ServerId, QueryFun, leader, #{}, Timeout);
leader_query(ServerId, QueryFun, Options) when is_map(Options) ->
Timeout = maps:get(timeout, Options, ?DEFAULT_TIMEOUT),
Options1 = maps:remove(timeout, Options),
ra_server_proc:query(
ServerId, QueryFun, leader, Options1, Timeout).

%% @doc Query the state machine with a consistency guarantee.
%% This allows the caller to query the state machine on the leader node with
Expand Down Expand Up @@ -999,7 +1037,7 @@ consistent_query(ServerId, QueryFun) ->
Timeout :: timeout()) ->
ra_server_proc:ra_leader_call_ret(Reply :: term()).
consistent_query(ServerId, QueryFun, Timeout) ->
ra_server_proc:query(ServerId, QueryFun, consistent, Timeout).
ra_server_proc:query(ServerId, QueryFun, consistent, #{}, Timeout).

%% @doc Returns a list of cluster members
%%
Expand Down
11 changes: 9 additions & 2 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1817,6 +1817,8 @@ filter_follower_effects(Effects) ->
%% only machine monitors should not be emitted
%% by followers
[C | Acc];
({applied_to, _} = C, Acc) ->
[C | Acc];
(L, Acc) when is_list(L) ->
%% nested case - recurse
case filter_follower_effects(L) of
Expand Down Expand Up @@ -2537,7 +2539,7 @@ apply_to(ApplyTo, ApplyFun, Notifys0, Effects0,
log := Log0} = State0)
when ApplyTo > LastApplied andalso MacVer >= EffMacVer ->
From = LastApplied + 1,
{LastIdx, _} = ra_log:last_index_term(Log0),
{LastIdx, LastTerm} = ra_log:last_index_term(Log0),
To = min(LastIdx, ApplyTo),
FoldState = {MacMod, LastApplied, State0, MacState0,
Effects0, Notifys0, undefined},
Expand All @@ -2551,7 +2553,12 @@ apply_to(ApplyTo, ApplyFun, Notifys0, Effects0,
end,
%% due to machine versioning all entries may not have been applied
%%
FinalEffs = make_notify_effects(Notifys, lists:reverse(Effects)),
FinalEffs0 = lists:reverse(Effects),
FinalEffs1 = make_notify_effects(Notifys, FinalEffs0),
%% The `applied_to' effect, also handled on followers, is used to signal
%% fences that are waiting on that applied index. See {@link
%% ra:local_fence/2} and {@link ra:leader_fence/2}.
FinalEffs = [{applied_to, {AppliedTo, LastTerm}} | FinalEffs1],
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_APPLIED, AppliedTo),
put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_LATENCY, CommitLatency),
{State#{last_applied => AppliedTo,
Expand Down
159 changes: 125 additions & 34 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
command/3,
cast_command/2,
cast_command/3,
query/4,
query/5,
state_query/3,
local_state_query/3,
trigger_election/2,
Expand Down Expand Up @@ -73,6 +73,7 @@
handle_effects(?FUNCTION_NAME, Effects, EvtType, State0)).

-type query_fun() :: ra:query_fun().
-type query_options() :: #{wait_for_index => ra_idxterm()}.

-type ra_command() :: {ra_server:command_type(), term(),
ra_server:command_reply_mode()}.
Expand Down Expand Up @@ -113,7 +114,8 @@
safe_call_ret/1,
ra_event_reject_detail/0,
ra_event/0,
ra_event_body/0]).
ra_event_body/0,
query_options/0]).

%% the ra server proc keeps monitors on behalf of different components
%% the state machine, log and server code. The tag is used to determine
Expand Down Expand Up @@ -143,7 +145,11 @@
low_priority_commands :: ra_ets_queue:state(),
election_timeout_set = false :: boolean(),
%% the log index last time gc was forced
pending_notifys = #{} :: #{pid() => [term()]}
pending_notifys = #{} :: #{pid() => [term()]},
pending_queries = [] :: [{ra_term(),
ra_index(),
gen_statem:from(),
query_fun()}]
}).

%%%===================================================================
Expand All @@ -169,15 +175,17 @@ cast_command(ServerId, Priority, Cmd) ->
gen_statem:cast(ServerId, {command, Priority, Cmd}).

-spec query(server_loc(), query_fun(),
local | consistent | leader, timeout()) ->
local | consistent | leader,
query_options(),
timeout()) ->
ra_server_proc:ra_leader_call_ret({ra_idxterm(), Reply :: term()})
| ra_server_proc:ra_leader_call_ret(Reply :: term())
| {ok, {ra_idxterm(), Reply :: term()}, not_known}.
query(ServerLoc, QueryFun, local, Timeout) ->
statem_call(ServerLoc, {local_query, QueryFun}, Timeout);
query(ServerLoc, QueryFun, leader, Timeout) ->
leader_call(ServerLoc, {local_query, QueryFun}, Timeout);
query(ServerLoc, QueryFun, consistent, Timeout) ->
query(ServerLoc, QueryFun, local, Options, Timeout) ->
statem_call(ServerLoc, {local_query, QueryFun, Options}, Timeout);
query(ServerLoc, QueryFun, leader, Options, Timeout) ->
leader_call(ServerLoc, {local_query, QueryFun, Options}, Timeout);
query(ServerLoc, QueryFun, consistent, _Options, Timeout) ->
leader_call(ServerLoc, {consistent_query, QueryFun}, Timeout).

-spec log_fold(ra_server_id(), fun(), term(), integer()) -> term().
Expand Down Expand Up @@ -498,11 +506,10 @@ leader(EventType, flush_commands,
_ ->
next_state(NextState, State, Actions)
end;
leader({call, From}, {local_query, QueryFun},
#state{conf = Conf,
server_state = ServerState} = State) ->
Reply = perform_local_query(QueryFun, id(State), ServerState, Conf),
{keep_state, State, [{reply, From, Reply}]};
leader({call, _From} = EventType, {local_query, QueryFun}, State) ->
leader(EventType, {local_query, QueryFun, #{}}, State);
leader({call, From}, {local_query, QueryFun, Options}, State) ->
perform_or_delay_local_query(leader, From, QueryFun, Options, State);
leader({call, From}, {state_query, Spec}, State) ->
Reply = {ok, do_state_query(Spec, State), id(State)},
{keep_state, State, [{reply, From, Reply}]};
Expand Down Expand Up @@ -595,10 +602,10 @@ candidate(cast, {command, _Priority,
State) ->
_ = reject_command(Pid, Corr, State),
{keep_state, State, []};
candidate({call, From}, {local_query, QueryFun},
#state{conf = Conf, server_state = ServerState} = State) ->
Reply = perform_local_query(QueryFun, not_known, ServerState, Conf),
{keep_state, State, [{reply, From, Reply}]};
candidate({call, _From} = EventType, {local_query, QueryFun}, State) ->
candidate(EventType, {local_query, QueryFun, #{}}, State);
candidate({call, From}, {local_query, QueryFun, Options}, State) ->
perform_or_delay_local_query(candidate, From, QueryFun, Options, State);
candidate({call, From}, {state_query, Spec}, State) ->
Reply = {ok, do_state_query(Spec, State), id(State)},
{keep_state, State, [{reply, From, Reply}]};
Expand Down Expand Up @@ -643,10 +650,10 @@ pre_vote(cast, {command, _Priority,
State) ->
_ = reject_command(Pid, Corr, State),
{keep_state, State, []};
pre_vote({call, From}, {local_query, QueryFun},
#state{conf = Conf, server_state = ServerState} = State) ->
Reply = perform_local_query(QueryFun, not_known, ServerState, Conf),
{keep_state, State, [{reply, From, Reply}]};
pre_vote({call, _From} = EventType, {local_query, QueryFun}, State) ->
pre_vote(EventType, {local_query, QueryFun, #{}}, State);
pre_vote({call, From}, {local_query, QueryFun, Options}, State) ->
perform_or_delay_local_query(pre_vote, From, QueryFun, Options, State);
pre_vote({call, From}, {state_query, Spec}, State) ->
Reply = {ok, do_state_query(Spec, State), id(State)},
{keep_state, State, [{reply, From, Reply}]};
Expand Down Expand Up @@ -725,14 +732,10 @@ follower(cast, {command, _Priority,
State) ->
_ = reject_command(Pid, Corr, State),
{keep_state, State, []};
follower({call, From}, {local_query, QueryFun},
#state{conf = Conf, server_state = ServerState} = State) ->
Leader = case ra_server:leader_id(ServerState) of
undefined -> not_known;
L -> L
end,
Reply = perform_local_query(QueryFun, Leader, ServerState, Conf),
{keep_state, State, [{reply, From, Reply}]};
follower({call, _From} = EventType, {local_query, QueryFun}, State) ->
follower(EventType, {local_query, QueryFun, #{}}, State);
follower({call, From}, {local_query, QueryFun, Options}, State) ->
perform_or_delay_local_query(follower, From, QueryFun, Options, State);
follower({call, From}, {state_query, Spec}, State) ->
Reply = {ok, do_state_query(Spec, State), id(State)},
{keep_state, State, [{reply, From, Reply}]};
Expand Down Expand Up @@ -929,10 +932,11 @@ await_condition({call, From}, {leader_call, Msg}, State) ->
maybe_redirect(From, Msg, State);
await_condition(EventType, {local_call, Msg}, State) ->
await_condition(EventType, Msg, State);
await_condition({call, From}, {local_query, QueryFun},
#state{conf = Conf, server_state = ServerState} = State) ->
Reply = perform_local_query(QueryFun, follower, ServerState, Conf),
{keep_state, State, [{reply, From, Reply}]};
await_condition({call, _From} = EventType, {local_query, QueryFun}, State) ->
await_condition(EventType, {local_query, QueryFun, #{}}, State);
await_condition({call, From}, {local_query, QueryFun, Options}, State) ->
perform_or_delay_local_query(
await_condition, From, QueryFun, Options, State);
await_condition({call, From}, {state_query, Spec}, State) ->
Reply = {ok, do_state_query(Spec, State), id(State)},
{keep_state, State, [{reply, From, Reply}]};
Expand Down Expand Up @@ -1152,6 +1156,89 @@ handle_receive_snapshot(Msg, State) ->
handle_await_condition(Msg, State) ->
handle_raft_state(?FUNCTION_NAME, Msg, State).

perform_or_delay_local_query(
RaftState, From, QueryFun, Options,
#state{conf = Conf,
server_state = ServerState,
pending_queries = PendingQueries} = State) ->
%% The caller might decide it wants the query to be executed only after a
%% specific index has been applied on the local node. It can specify that
%% with the `wait_for_index' option.
%%
%% If the option is unset or set to `undefined', the query is performed
%% immediatly. That is the default behavior.
%%
%% If the option is set to `{Index, Term}', the query is added to a list
%% of pending queries. It will be evaluated once that index is applied
%% locally.
Leader = determine_leader(RaftState, State),
case maps:get(wait_for_index, Options, undefined) of
undefined ->
Reply = perform_local_query(QueryFun, Leader, ServerState, Conf),
{keep_state, State, [{reply, From, Reply}]};
{Index, Term} = IdxTerm
when is_integer(Index) andalso is_integer(Term) ->
PendingQuery = {IdxTerm, From, QueryFun},
PendingQueries1 = [PendingQuery | PendingQueries],
State1 = State#state{pending_queries = PendingQueries1},
%% It's possible that the specified index was already applied.
%% That's why we evaluate pending queries just after adding the
%% query to the list.
{State2, Actions} = perform_pending_queries(RaftState, State1),
{keep_state, State2, Actions}
end.

perform_pending_queries(RaftState, State) ->
#{current_term := CurrentTerm,
last_applied := LastApplied} = do_state_query(overview, State),
perform_pending_queries(RaftState, {LastApplied, CurrentTerm}, State, []).

perform_pending_queries(RaftState, {LastApplied, CurrentTerm},
#state{conf = Conf,
server_state = ServerState,
pending_queries = PendingQueries} = State,
Actions) ->
Leader = determine_leader(RaftState, State),
{PendingQueries1,
Actions1} = lists:foldr(
fun
({{TargetIndex, TargetTerm}, From, QueryFun}, {PQ, A})
when TargetTerm =:= CurrentTerm andalso
TargetIndex =< LastApplied ->
%% The local node reached or passed the target
%% index. We can evaluate the query.
Reply = perform_local_query(
QueryFun, Leader, ServerState, Conf),
A1 = [{reply, From, Reply} | A],
{PQ, A1};
({{_TargetIndex, TargetTerm}, From, _}, {PQ, A})
when TargetTerm < CurrentTerm ->
%% The specified term ended. Therefore, we can't be
%% sure if the target index was ever applied. Let's
%% return an error for that query.
Reply = {error, term_is_over},
A1 = [{reply, From, Reply} | A],
{PQ, A1};
(PendingQuery, {PQ, A}) ->
PQ1 = [PendingQuery | PQ],
{PQ1, A}
end, {[], Actions}, PendingQueries),
State1 = State#state{pending_queries = PendingQueries1},
{State1, Actions1}.

determine_leader(RaftState, #state{server_state = ServerState} = State) ->
case RaftState of
leader ->
id(State);
follower ->
case ra_server:leader_id(ServerState) of
undefined -> not_known;
L -> L
end;
_ ->
not_known
end.

perform_local_query(QueryFun, Leader, ServerState, Conf) ->
incr_counter(Conf, ?C_RA_SRV_LOCAL_QUERIES, 1),
try ra_server:machine_query(QueryFun, ServerState) of
Expand Down Expand Up @@ -1270,6 +1357,10 @@ handle_effect(_, {notify, Nots}, _, #state{} = State0, Actions) ->
%% should only be done by leader
State = send_applied_notifications(State0, Nots),
{State, Actions};
handle_effect(RaftState, {applied_to, IdxTerm}, _, State, Actions) ->
%% The last applied index made progress. Check if there are pending
%% queries that wait for this index.
perform_pending_queries(RaftState, IdxTerm, State, Actions);
handle_effect(_, {cast, To, Msg}, _, State, Actions) ->
%% TODO: handle send failure
_ = gen_cast(To, Msg, State),
Expand Down
Loading

0 comments on commit 10a1f9d

Please sign in to comment.