Skip to content

Commit

Permalink
Merge pull request #463 from rabbitmq/md/checkpoint-defer-validation
Browse files Browse the repository at this point in the history
Stop checkpoint validation when encountering a valid checkpoint
  • Loading branch information
kjnilsson authored Aug 14, 2024
2 parents e95ab7b + b7fe2da commit 4c5b409
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 40 deletions.
108 changes: 77 additions & 31 deletions src/ra_snapshot.erl
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,7 @@ pick_first_valid(UId, Mod, Dir, [S | Rem]) ->
pick_first_valid(UId, Mod, Dir, Rem)
end.

find_checkpoints(#?MODULE{uid = UId,
module = Module,
current = Current,
find_checkpoints(#?MODULE{current = Current,
checkpoint_directory = CheckpointDir} = State) ->
case ra_lib:is_dir(CheckpointDir) of
false ->
Expand All @@ -235,37 +233,85 @@ find_checkpoints(#?MODULE{uid = UId,
I
end,
{ok, CPFiles0} = prim_file:list_dir(CheckpointDir),
%% Reverse-sort the files so that the most recent checkpoints
%% come first.
CPFiles = lists:reverse(lists:sort(CPFiles0)),
Checkpoints =
lists:filtermap(
fun(File) ->
CP = filename:join(CheckpointDir, File),
case Module:validate(CP) of
ok ->
{ok, #{index := Idx, term := Term}} =
Module:read_meta(CP),
case Idx > CurrentIdx of
true ->
{true, {Idx, Term}};
false ->
?INFO("ra_snapshot: ~ts: removing "
"checkpoint ~s as was older than the "
"current snapshot.",
[UId, CP]),
delete(CheckpointDir, {Idx, Term}),
false
end;
Err ->
?INFO("ra_snapshot: ~ts: removing checkpoint ~s as "
"did not validate. Err: ~w",
[UId, CP, Err]),
ra_lib:recursive_delete(CP),
false
end
end, CPFiles),
State#?MODULE{checkpoints = Checkpoints}
find_checkpoints(CPFiles, State, CurrentIdx, [])
end.

find_checkpoints([], State, _CurrentIdx, Checkpoints) ->
%% Reverse so that the most recent checkpoints come first.
State#?MODULE{checkpoints = lists:reverse(Checkpoints)};
find_checkpoints([File | Files],
#?MODULE{uid = UId,
module = Module,
checkpoint_directory = CheckpointDir} = State,
CurrentIdx, []) ->
%% When we haven't yet found a valid checkpoint (`Checkpoints =:= []`),
%% fully validate the file with the `ra_snapshot:validate/1` callback to
%% ensure that we can recover from the latest checkpoint.
CP = filename:join(CheckpointDir, File),
case Module:validate(CP) of
ok ->
{ok, #{index := Idx, term := Term}} = Module:read_meta(CP),
case Idx > CurrentIdx of
true ->
find_checkpoints(Files, State, CurrentIdx, [{Idx, Term}]);
false ->
%% If the first valid checkpoint is older than the snapshot
%% index then all checkpoints in `Files` are older as well.
%% Delete all checkpoints and bail.
delete_stale_checkpoints(
UId, CheckpointDir, [File | Files]),
State
end;
Err ->
?INFO("ra_snapshot: ~ts: removing checkpoint ~s as it did not "
"validate. Err: ~w",
[UId, CP, Err]),
_ = ra_lib:recursive_delete(CP),
find_checkpoints(Files, State, CurrentIdx, [])
end;
find_checkpoints([File | Files],
#?MODULE{uid = UId,
module = Module,
checkpoint_directory = CheckpointDir} = State,
CurrentIdx, Checkpoints) ->
%% If a valid checkpoint has already been found it is assumed all older
%% checkpoints are also valid. Scanning all can introduce a lot of
%% additional I/O during recovery.
CP = filename:join(CheckpointDir, File),
case Module:read_meta(CP) of
{ok, #{index := Idx, term := Term}} ->
case Idx > CurrentIdx of
true ->
find_checkpoints(
Files, State, CurrentIdx, [{Idx, Term} | Checkpoints]);
false ->
%% If this checkpoint is older than the current snapshot
%% then all later `Files` will be as well. Delete them and
%% finish searching.
delete_stale_checkpoints(
UId, CheckpointDir, [File | Files]),
find_checkpoints([], State, CurrentIdx, Checkpoints)
end;
Err ->
?INFO("ra_snapshot: ~ts: removing checkpoint ~s as metadata could "
"not be read. Err: ~w",
[UId, CP, Err]),
_ = ra_lib:recursive_delete(CP),
find_checkpoints(Files, State, CurrentIdx, Checkpoints)
end.

delete_stale_checkpoints(UId, CheckpointDir, Files) ->
[begin
CP = filename:join(CheckpointDir, File),
?INFO("ra_snapshot: ~ts: removing checkpoint ~s as it was older than "
"the current snapshot.", [UId, CP]),
_ = ra_lib:recursive_delete(CP)
end || File <- Files],
ok.

-spec init_ets() -> ok.
init_ets() ->
TableFlags = [set,
Expand Down
28 changes: 19 additions & 9 deletions test/ra_checkpoint_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -261,28 +261,38 @@ init_recover_corrupt(Config) ->
State0 = init_state(Config),

%% Take a checkpoint.
Meta = meta(55, 2, [node()]),
MacRef = ?FUNCTION_NAME,
{State1, _} = ra_snapshot:begin_snapshot(Meta, MacRef, checkpoint, State0),
Meta1 = meta(55, 2, [node()]),
{State1, _} = ra_snapshot:begin_snapshot(Meta1, ?FUNCTION_NAME, checkpoint, State0),
State2 = receive
{ra_log_event, {snapshot_written, {55, 2} = IdxTerm1, checkpoint}} ->
ra_snapshot:complete_snapshot(IdxTerm1, checkpoint, State1)
after 1000 ->
error(snapshot_event_timeout)
end,

%% Take another checkpoint.
Meta2 = meta(165, 2, [node()]),
{State3, _} = ra_snapshot:begin_snapshot(Meta2, ?FUNCTION_NAME, checkpoint, State2),
receive
{ra_log_event, {snapshot_written, {55, 2} = IdxTerm, checkpoint}} ->
_ = ra_snapshot:complete_snapshot(IdxTerm, checkpoint, State1),
{ra_log_event, {snapshot_written, {165, 2} = IdxTerm2, checkpoint}} ->
_ = ra_snapshot:complete_snapshot(IdxTerm2, checkpoint, State3),
ok
after 1000 ->
error(snapshot_event_timeout)
end,

%% Delete the file but leave the directory intact.
%% Corrupt the latest checkpoint by deleting the snapshot.dat file but
%% leaving the checkpoint directory intact.
CorruptDir = filename:join(?config(checkpoint_dir, Config),
ra_lib:zpad_hex(2) ++ "_" ++ ra_lib:zpad_hex(55)),
ra_lib:zpad_hex(2) ++ "_" ++ ra_lib:zpad_hex(165)),
ok = file:delete(filename:join(CorruptDir, "snapshot.dat")),

Recover = init_state(Config),
%% The checkpoint isn't recovered and the directory is cleaned up.
undefined = ra_snapshot:pending(Recover),
undefined = ra_snapshot:current(Recover),
undefined = ra_snapshot:latest_checkpoint(Recover),
{error, no_current_snapshot} = ra_snapshot:recover(Recover),
{55, 2} = ra_snapshot:latest_checkpoint(Recover),
{ok, Meta1, ?FUNCTION_NAME} = ra_snapshot:recover(Recover),
false = filelib:is_dir(CorruptDir),

ok.
Expand Down

0 comments on commit 4c5b409

Please sign in to comment.