Skip to content

Commit

Permalink
Add limit option to leader and local queries
Browse files Browse the repository at this point in the history
[Why]
It allows to wait for a specific index to be applied locally (or on the
leader) before a query is evaluated. It is useful when the caller wants
to be sure that the result of the previous command is "visible" by the
next query.

By default, it's not guarantied because the command will be considered
successfully applied as long as a quorum of Ra servers applied it. This
list of Ra servers may not include the local node for instance.

[How]
If the `limit` option is specified with a `{Index, Term}` tuple, the
query will be evaluated right away if that index is already applied, or
it will be added to a list of pending queries.

Pending queries are evaluated after each applied batch of commands by
the local node. If a pending query's target index was reached or passed,
it is evaluated. If a pending query's target term ended, an error is
returned.

Note that pending queries that timed out from the callers' point of view
will still be evaluated once their target is applied. The reply will be
discarded by Erlang however because the process alias will be inactivate
at that point.

Here is an example:

    ra:local_query(ServerId, QueryFun, #{limit => {Index, Term}}).

The `local_query` tuple sent to the Ra server changes format. The old
one was:

    {local_query, QueryFun}

The new one is:

    {local_query, QueryFun, Options}

If the remote Ra server that receives the query runs a version of Ra
older than the one having this change and thus doesn't understand the
new tuple, it will ignore and drop the query. This will lead to a
timeout of the query, or an indefinitely hanging call if the timeout was
set to `infinity`.

Note in the opposite situation, i.e. if a Ra server that knows the new
query tuple receives an old tuple, it will evaluate the query as if the
options was an empty map.

V2: Rename the option from `limit` to `wait_for_index` which is more
    explicit.
V3: Rename the option back to `limit`. It allows to pass other types of
    condition in the future.
    Also change the place where pending queries are evaluated. This
    allows to get rid of the `applied_to` effect.
  • Loading branch information
dumbbell committed Jun 26, 2024
1 parent 6de4074 commit f340066
Show file tree
Hide file tree
Showing 5 changed files with 336 additions and 56 deletions.
64 changes: 51 additions & 13 deletions src/ra.erl
Original file line number Diff line number Diff line change
Expand Up @@ -928,19 +928,38 @@ pipeline_command(ServerId, Command) ->
local_query(ServerId, QueryFun) ->
local_query(ServerId, QueryFun, ?DEFAULT_TIMEOUT).

%% @doc Same as `local_query/2' but accepts a custom timeout.
%% @doc Same as `local_query/2' but accepts a custom timeout or a map of
%% options.
%%
%% The supported options are:
%% <ul>
%% <li>`limit': the query will be evaluated only once the specific `{Index,
%% Term}' is applied locally.</li>
%% <li>`timeout': the maximum time to wait for the query to be evaluated.</li>
%% </ul>
%%
%% @param ServerId the ra server id to send the query to
%% @param QueryFun the query function to run
%% @param Timeout the timeout to use
%% @param TimeoutOrOptions the timeout to use or a map of options
%% @see local_query/2
%% @end
-spec local_query(ServerId :: ra_server_id(),
QueryFun :: query_fun(),
Timeout :: timeout()) ->
TimeoutOrOptions) ->
ra_server_proc:ra_leader_call_ret({ra_idxterm(), Reply :: term()}) |
{ok, {ra_idxterm(), Reply :: term()}, not_known}.
local_query(ServerId, QueryFun, Timeout) ->
ra_server_proc:query(ServerId, QueryFun, local, Timeout).
{ok, {ra_idxterm(), Reply :: term()}, not_known}
when TimeoutOrOptions :: Timeout | Options,
Timeout :: timeout(),
Options :: #{limit => ra_idxterm(),
timeout => timeout()}.
local_query(ServerId, QueryFun, Timeout)
when Timeout =:= infinity orelse is_integer(Timeout) ->
ra_server_proc:query(ServerId, QueryFun, local, #{}, Timeout);
local_query(ServerId, QueryFun, Options) when is_map(Options) ->
Timeout = maps:get(timeout, Options, ?DEFAULT_TIMEOUT),
Options1 = maps:remove(timeout, Options),
ra_server_proc:query(
ServerId, QueryFun, local, Options1, Timeout).


%% @doc Query the machine state on the current leader node.
Expand All @@ -959,19 +978,38 @@ local_query(ServerId, QueryFun, Timeout) ->
leader_query(ServerId, QueryFun) ->
leader_query(ServerId, QueryFun, ?DEFAULT_TIMEOUT).

%% @doc Same as `leader_query/2' but accepts a custom timeout.
%% @doc Same as `leader_query/2' but accepts a custom timeout or a map of
%% options.
%%
%% The supported options are:
%% <ul>
%% <li>`limit': the query will be evaluated only once the specific `{Index,
%% Term}' is applied on the leader.</li>
%% <li>`timeout': the maximum time to wait for the query to be evaluated.</li>
%% </ul>
%%
%% @param ServerId the ra server id(s) to send the query to
%% @param QueryFun the query function to run
%% @param Timeout the timeout to use
%% @param TimeoutOrOptions the timeout to use or a map of options
%% @see leader_query/2
%% @end
-spec leader_query(ServerId :: ra_server_id() | [ra_server_id()],
QueryFun :: query_fun(),
Timeout :: timeout()) ->
TimeoutOrOptions) ->
ra_server_proc:ra_leader_call_ret({ra_idxterm(), Reply :: term()}) |
{ok, {ra_idxterm(), Reply :: term()}, not_known}.
leader_query(ServerId, QueryFun, Timeout) ->
ra_server_proc:query(ServerId, QueryFun, leader, Timeout).
{ok, {ra_idxterm(), Reply :: term()}, not_known}
when TimeoutOrOptions :: Timeout | Options,
Timeout :: timeout(),
Options :: #{limit => ra_idxterm(),
timeout => timeout()}.
leader_query(ServerId, QueryFun, Timeout)
when Timeout =:= infinity orelse is_integer(Timeout) ->
ra_server_proc:query(ServerId, QueryFun, leader, #{}, Timeout);
leader_query(ServerId, QueryFun, Options) when is_map(Options) ->
Timeout = maps:get(timeout, Options, ?DEFAULT_TIMEOUT),
Options1 = maps:remove(timeout, Options),
ra_server_proc:query(
ServerId, QueryFun, leader, Options1, Timeout).

%% @doc Query the state machine with a consistency guarantee.
%% This allows the caller to query the state machine on the leader node with
Expand Down Expand Up @@ -999,7 +1037,7 @@ consistent_query(ServerId, QueryFun) ->
Timeout :: timeout()) ->
ra_server_proc:ra_leader_call_ret(Reply :: term()).
consistent_query(ServerId, QueryFun, Timeout) ->
ra_server_proc:query(ServerId, QueryFun, consistent, Timeout).
ra_server_proc:query(ServerId, QueryFun, consistent, #{}, Timeout).

%% @doc Returns a list of cluster members
%%
Expand Down
5 changes: 3 additions & 2 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@
get_membership/1,
get_condition_timeout/2,
recover/1,
state_query/2
state_query/2,
fetch_term/2
]).

-type ra_await_condition_fun() ::
Expand Down Expand Up @@ -2537,7 +2538,7 @@ apply_to(ApplyTo, ApplyFun, Notifys0, Effects0,
log := Log0} = State0)
when ApplyTo > LastApplied andalso MacVer >= EffMacVer ->
From = LastApplied + 1,
{LastIdx, _} = ra_log:last_index_term(Log0),
{LastIdx, _LastTerm} = ra_log:last_index_term(Log0),
To = min(LastIdx, ApplyTo),
FoldState = {MacMod, LastApplied, State0, MacState0,
Effects0, Notifys0, undefined},
Expand Down
Loading

0 comments on commit f340066

Please sign in to comment.