Skip to content

Commit

Permalink
khepri_machine: Restore projections when a snapshot is installed
Browse files Browse the repository at this point in the history
This hooks into the new `ra_machine:snapshot_installed/2` callback
introduced in the Ra version update in the parent commit.

We want to restore projections when a snapshot is installed the same
way as we do when the khepri machine recovers. A cluster member may be
far behind other members in the cluster causing the cluster leader to
try to "catch up" that member by sending it a snapshot. Once the
snapshot is installed the machine state maches the leader at the
snapshot index, but this 'jump' forward doesn't trigger any changes to
the projections. So we need to sync the new machine state (the tree) and
the projection tables by using the existing `restore_projections` aux
effect.

This fixes a bug reproducible in the server with the following
reproduction steps:

* Start a 3-node cluster with `make start-cluster`.
* Enable the `khepri_db` feature flag.
* Start a definitions import with a very large data set, for example
  the `100-queues-with-100-bindings-each.json` case from the
  rabbitmq/sample-configs repo.
* Part-way through the import, perform a rolling restart of the cluster
  with `make restart-cluster`.
* Examine a projection table affected by the definition import. Note the
  discrepancy between numbers of bindings in the `rabbit_khepri_bindings`
  table:

```
for i in 1 2 3; printf "rabbit-$i: "; rabbitmqctl -n rabbit-$i eval 'length(ets:tab2list(rabbit_khepri_bindings)).'; end
rabbit-1: 49003
rabbit-2: 49003
rabbit-3: 23370
```

The rolling restart stops and restarts rabbit-3 before the other nodes.
The definition import continues while rabbit-3 is restarting though
because rabbit-1 and rabbit-2 still form a majority. When those nodes
restart, one will become leader (because they are ahead of rabbit-3 in
terms of commit index) and will catch up rabbit-3 with a snapshot. The
number of raft indices skipped ahead with that snapshot is nearly the
same as the number of records missing from the projection table: the
records missing from the projection table are the ones sent by the
leader in the snapshot.

By restoring projections after the snapshot is installed, all nodes
reflect the same numbers of bindings.
  • Loading branch information
the-mikedavis committed Jun 19, 2024
1 parent c32096e commit 24b2814
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 1 deletion.
7 changes: 7 additions & 0 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
handle_aux/6,
apply/3,
state_enter/2,
snapshot_installed/2,
overview/1,
version/0,
which_module/1]).
Expand Down Expand Up @@ -1504,6 +1505,12 @@ state_enter(_StateName, _State) ->

%% @private

snapshot_installed(_Meta, _State) ->
SideEffect = {aux, restore_projections},
[SideEffect].

%% @private

emitted_triggers_to_side_effects(State) ->
#config{store_id = StoreId} = get_config(State),
EmittedTriggers = get_emitted_triggers(State),
Expand Down
141 changes: 140 additions & 1 deletion test/cluster_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
handle_leader_down_on_three_node_cluster_response/1,
can_set_snapshot_interval/1,
projections_are_consistent_on_three_node_cluster/1,
projections_are_updated_when_a_snapshot_is_installed/1,
async_command_leader_change_in_three_node_cluster/1,
spam_txs_during_election/1]).

Expand All @@ -71,6 +72,7 @@ all() ->
handle_leader_down_on_three_node_cluster_response,
can_set_snapshot_interval,
projections_are_consistent_on_three_node_cluster,
projections_are_updated_when_a_snapshot_is_installed,
async_command_leader_change_in_three_node_cluster,
spam_txs_during_election].

Expand Down Expand Up @@ -126,6 +128,7 @@ init_per_testcase(Testcase, Config)
Testcase =:= handle_leader_down_on_three_node_cluster_command orelse
Testcase =:= handle_leader_down_on_three_node_cluster_response orelse
Testcase =:= projections_are_consistent_on_three_node_cluster orelse
Testcase =:= projections_are_updated_when_a_snapshot_is_installed orelse
Testcase =:= async_command_leader_change_in_three_node_cluster ->
Nodes = start_n_nodes(Testcase, 3),
PropsPerNode0 = [begin
Expand Down Expand Up @@ -1674,7 +1677,7 @@ await_snapshot_index(RaServer, ExpectedIndex, Retries) ->
projections_are_consistent_on_three_node_cluster(Config) ->
ProjectionName = ?MODULE,

%% We call `khepri_projection:new/2 on the local node and thus need
%% We call `khepri_projection:new/2' on the local node and thus need
%% Khepri.
?assertMatch({ok, _}, application:ensure_all_started(khepri)),

Expand Down Expand Up @@ -1775,6 +1778,142 @@ wait_for_projection_on_nodes([Node | Rest] = Nodes, ProjectionName) ->
wait_for_projection_on_nodes(Rest, ProjectionName)
end.

projections_are_updated_when_a_snapshot_is_installed(Config) ->
%% When a cluster member falls behind on log entries, the leader tries to
%% catch it up with a snapshot. Specifically: if a cluster member's current
%% latest raft index is older than the leader's snapshot index, the leader
%% catches up that member by sending it a snapshot and then any log entries
%% that follow.
%%
%% When this happens the member doesn't see the changes as regular
%% commands (i.e. handled in `ra_machine:apply/3'). Instead the machine
%% state is replaced entirely. So when a snapshot is installed we must
%% restore projections the same way we do as when we restart a member.
%% In `khepri_machine' this is done in the `snapshot_installed/2` callback
%% implementation.
%%
%% To test this we stop a member, apply enough commands to cause the leader
%% to take a snapshot, and then restart the member and assert that the
%% projection contents are as expected.

ProjectionName = ?MODULE,

%% We call `khepri_projection:new/2' on the local node and thus need
%% Khepri.
?assertMatch({ok, _}, application:ensure_all_started(khepri)),

PropsPerNode = ?config(ra_system_props, Config),
[Node1, Node2, Node3] = Nodes = maps:keys(PropsPerNode),
%% We assume all nodes are using the same Ra system name & store ID.
#{ra_system := RaSystem} = maps:get(Node1, PropsPerNode),
StoreId = RaSystem,
%% Set the snapshot interval low so that we can trigger a snapshot by
%% sending 4 commands.
RaServerConfig = #{cluster_name => StoreId,
machine_config => #{snapshot_interval => 4}},

ct:pal("Start database + cluster nodes"),
lists:foreach(
fun(Node) ->
ct:pal("- khepri:start() from node ~s", [Node]),
?assertEqual(
{ok, StoreId},
rpc:call(Node, khepri, start, [RaSystem, RaServerConfig]))
end, Nodes),
lists:foreach(
fun(Node) ->
ct:pal("- khepri_cluster:join() from node ~s", [Node]),
?assertEqual(
ok,
rpc:call(Node, khepri_cluster, join, [StoreId, Node3]))
end, [Node1, Node2]),

ct:pal("Register projection on node ~s", [Node1]),
Projection = khepri_projection:new(
ProjectionName,
fun(Path, Payload) -> {Path, Payload} end),
rpc:call(Node1,
khepri, register_projection,
[StoreId, [?KHEPRI_WILDCARD_STAR_STAR], Projection]),
ok = wait_for_projection_on_nodes([Node2, Node3], ProjectionName),

?assertEqual(
ok,
rpc:call(Node3, khepri, put, [StoreId, [key1], value1v1,
#{reply_from => local}])),
?assertEqual(
value1v1,
rpc:call(Node3, ets, lookup_element, [ProjectionName, [key1], 2])),

ct:pal(
"Stop cluster member ~s (quorum is maintained)", [Node1]),
ok = rpc:call(Node1, khepri, stop, [StoreId]),

?assertMatch(
{ok, #{log := #{snapshot_index := undefined}}, _},
ra:member_overview(khepri_cluster:node_to_member(StoreId, Node3))),

ct:pal("Submit enough commands to trigger a snapshot"),
ct:pal("- set key1:value1v2"),
ok = rpc:call(Node3, khepri, put, [StoreId, [key1], value1v2]),
ct:pal("- set key2:value2v1"),
ok = rpc:call(Node3, khepri, put, [StoreId, [key2], value2v1]),
ct:pal("- set key3:value3v1"),
ok = rpc:call(Node3, khepri, put, [StoreId, [key3], value3v1]),
ct:pal("- set key4:value4v1"),
ok = rpc:call(Node3, khepri, put, [StoreId, [key4], value4v1]),

{ok, #{log := #{snapshot_index := SnapshotIndex}}, _} =
ra:member_overview(khepri_cluster:node_to_member(StoreId, Node3)),
?assert(is_number(SnapshotIndex) andalso SnapshotIndex > 4),

ct:pal("Restart cluster member ~s", [Node1]),
{ok, StoreId} = rpc:call(Node1, khepri, start, [RaSystem, RaServerConfig]),

%% Execute a command with local-reply from Node1 - this will ensure that
%% we block until Node1 has caught up with the latest changes before we
%% check its projection table. We have to retry the command a few times
%% if it times out to deal with CI runners with few schedulers.
ok = put_with_retry(
StoreId, Node1,
[key5], value5v1, #{reply_from => local}),
?assertEqual(
value5v1,
rpc:call(Node1, ets, lookup_element, [ProjectionName, [key5], 2])),

?assertEqual(
value1v2,
rpc:call(Node1, ets, lookup_element, [ProjectionName, [key1], 2])),
?assertEqual(
value2v1,
rpc:call(Node1, ets, lookup_element, [ProjectionName, [key2], 2])),
?assertEqual(
value3v1,
rpc:call(Node1, ets, lookup_element, [ProjectionName, [key3], 2])),
?assertEqual(
value4v1,
rpc:call(Node1, ets, lookup_element, [ProjectionName, [key4], 2])),

ok.

put_with_retry(StoreId, Node, Key, Value, Options) ->
put_with_retry(StoreId, Node, Key, Value, Options, 10).

put_with_retry(StoreId, Node, Key, Value, Options, Retries) ->
ct:pal("- put (~p) '~p':'~p' (try ~b)", [Node, Key, Value, 10 - Retries]),
case rpc:call(Node, khepri, put, [StoreId, Key, Value, Options]) of
{error, timeout} = Err ->
case Retries of
0 ->
erlang:error({?FUNCTION_NAME, [{actual, Err}]});
_ ->
timer:sleep(10),
put_with_retry(StoreId, Node, Key, Value, Options, Retries - 1)
end;
Ret ->
Ret
end.

async_command_leader_change_in_three_node_cluster(Config) ->
PropsPerNode = ?config(ra_system_props, Config),
[Node1, Node2, Node3] = Nodes = maps:keys(PropsPerNode),
Expand Down

0 comments on commit 24b2814

Please sign in to comment.