Skip to content

Commit

Permalink
Merge pull request #456 from rabbitmq/pre-init-bug-fixes
Browse files Browse the repository at this point in the history
Fixes for pre-init recovery and handling of gaps in mem tables due to snapshotting
  • Loading branch information
kjnilsson authored Jul 19, 2024
2 parents 5365e43 + a7ed36e commit 94cb3d2
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 67 deletions.
10 changes: 9 additions & 1 deletion src/ra_directory.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
pid_of/2,
uid_of/2,
overview/1,
list_registered/1
list_registered/1,
is_registered_uid/2
]).

-export_type([
Expand Down Expand Up @@ -199,6 +200,13 @@ list_registered(System) when is_atom(System) ->
Tbl = get_reverse(System),
dets:select(Tbl, [{'_', [], ['$_']}]).

-spec is_registered_uid(atom(), ra_uid()) -> boolean().
is_registered_uid(System, UId)
when is_atom(System) andalso
is_binary(UId) ->
Tbl = get_reverse(System),
[] =/= dets:select(Tbl, [{{'_', UId}, [], ['$_']}]).

get_name(#{directory := Tbl}) ->
Tbl;
get_name(System) when is_atom(System) ->
Expand Down
32 changes: 27 additions & 5 deletions src/ra_log_pre_init.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

-record(state, {}).

-define(ETSTBL, ra_log_snapshot_state).
%%%===================================================================
%%% API functions
%%%===================================================================
Expand All @@ -40,7 +41,16 @@ init([System]) ->
Regd = ra_directory:list_registered(System),
?INFO("ra system '~ts' running pre init for ~b registered servers",
[System, length(Regd)]),
_ = [catch(pre_init(System, Name)) || {Name, _U} <- Regd],
_ = [begin
try pre_init(System, Name, UId) of
ok -> ok
catch _:Err ->
?ERROR("pre_init failed in system ~s for UId ~ts with name ~ts"
" This error may need manual intervention",
[System, UId, Name]),
throw({stop, {error, Err}})
end
end|| {Name, UId} <- Regd],
{ok, #state{} , hibernate}.

handle_call(_Request, _From, State) ->
Expand All @@ -63,8 +73,20 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%===================================================================

pre_init(System, Name) ->
{ok, #{log_init_args := Log}} = ra_server_sup_sup:recover_config(System, Name),
_ = ra_log:pre_init(Log),
ok.
pre_init(System, Name, UId) ->
case ets:lookup(?ETSTBL, UId) of
[{_, _}] ->
%% already initialised
ok;
[] ->
case ra_system:fetch(System) of
undefined ->
{error, system_not_started};
SysCfg ->
{ok, #{log_init_args := Log}} =
ra_server_sup_sup:recover_config(System, Name),
ok = ra_log:pre_init(Log#{system_config => SysCfg}),
ok
end
end.

83 changes: 50 additions & 33 deletions src/ra_log_segment_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -308,40 +308,57 @@ append_to_segment(_, _, StartIdx, EndIdx, Seg, Closed, _State)
when StartIdx >= EndIdx ->
{Seg, Closed};
append_to_segment(UId, Tid, Idx, EndIdx, Seg0, Closed, State) ->
[{_, Term, Data0}] = ets:lookup(Tid, Idx),
Data = term_to_iovec(Data0),
DataSize = iolist_size(Data),
case ra_log_segment:append(Seg0, Idx, Term, {DataSize, Data}) of
{ok, Seg} ->
ok = counters:add(State#state.counter, ?C_ENTRIES, 1),
%% this isn't completely accurate as firstly the segment may not
%% have written it to disk and it doesn't include data written to
%% the segment index but is probably good enough to get comparative
%% data rates for different Ra components
ok = counters:add(State#state.counter, ?C_BYTES_WRITTEN, DataSize),
append_to_segment(UId, Tid, Idx+1, EndIdx, Seg, Closed, State);
{error, full} ->
% close and open a new segment
case open_successor_segment(Seg0, State#state.segment_conf) of
undefined ->
%% a successor cannot be opened - this is most likely due
%% to the directory having been deleted.
%% clear close mem tables here
_ = ets:delete(Tid),
_ = clean_closed_mem_tables(State#state.system, UId, Tid),
undefined;
Seg ->
ok = counters:add(State#state.counter, ?C_SEGMENTS, 1),
%% re-evaluate snapshot state for the server in case
%% a snapshot has completed during segment flush
StartIdx = start_index(UId, Idx),
% recurse
append_to_segment(UId, Tid, StartIdx, EndIdx, Seg,
[Seg0 | Closed], State)
case ets:lookup(Tid, Idx) of
[] ->
%% oh dear, an expected index was not found in the mem table.
?WARN("segment_writer: missing index ~b in mem table ~s for uid ~s"
"checking to see if UId has been unregistered",
[Idx, Tid, UId]),
case ra_directory:is_registered_uid(State#state.system, UId) of
true ->
?ERROR("segment_writer: uid ~s is registered, exiting...",
[UId]),
exit({missing_index, UId, Idx});
false ->
?INFO("segment_writer: UId ~s was not registered, skipping",
[UId]),
undefined
end;
{error, Posix} ->
FileName = ra_log_segment:filename(Seg0),
exit({segment_writer_append_error, FileName, Posix})
[{_, Term, Data0}] ->
Data = term_to_iovec(Data0),
DataSize = iolist_size(Data),
case ra_log_segment:append(Seg0, Idx, Term, {DataSize, Data}) of
{ok, Seg} ->
ok = counters:add(State#state.counter, ?C_ENTRIES, 1),
%% this isn't completely accurate as firstly the segment may not
%% have written it to disk and it doesn't include data written to
%% the segment index but is probably good enough to get comparative
%% data rates for different Ra components
ok = counters:add(State#state.counter, ?C_BYTES_WRITTEN, DataSize),
append_to_segment(UId, Tid, Idx+1, EndIdx, Seg, Closed, State);
{error, full} ->
% close and open a new segment
case open_successor_segment(Seg0, State#state.segment_conf) of
undefined ->
%% a successor cannot be opened - this is most likely due
%% to the directory having been deleted.
%% clear close mem tables here
_ = ets:delete(Tid),
_ = clean_closed_mem_tables(State#state.system, UId, Tid),
undefined;
Seg ->
ok = counters:add(State#state.counter, ?C_SEGMENTS, 1),
%% re-evaluate snapshot state for the server in case
%% a snapshot has completed during segment flush
StartIdx = start_index(UId, Idx),
% recurse
append_to_segment(UId, Tid, StartIdx, EndIdx, Seg,
[Seg0 | Closed], State)
end;
{error, Posix} ->
FileName = ra_log_segment:filename(Seg0),
exit({segment_writer_append_error, FileName, Posix})
end
end.

find_segment_files(Dir) ->
Expand Down
14 changes: 10 additions & 4 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1022,11 +1022,17 @@ terminate(Reason, StateName,
ra_server:system_config(ServerState),
UId = uid(State),
Id = id(State),
_ = ra_server:terminate(ServerState, Reason),
case Reason of
{shutdown, delete} ->
Parent = ra_directory:where_is_parent(Names, UId),
%% we need to unregister _before_ the log closes
%% in the ra_server:terminate/2 function
%% as we want the directory to be deleted
%% after the server is removed from the ra directory.
%% This is so that the segment writer can avoid
%% crashing if it detects a missing key
catch ra_directory:unregister_name(Names, UId),
_ = ra_server:terminate(ServerState, Reason),
catch ra_log_meta:delete_sync(MetaName, UId),
catch ra_counters:delete(Id),
Self = self(),
Expand All @@ -1044,9 +1050,9 @@ terminate(Reason, StateName,
end
end),
ok;


_ -> ok
_ ->
_ = ra_server:terminate(ServerState, Reason),
ok
end,
catch ra_leaderboard:clear(ClusterName),
_ = ets:delete(ra_metrics, MetricsKey),
Expand Down
13 changes: 7 additions & 6 deletions test/ra_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -977,11 +977,12 @@ snapshot_installation_with_call_crash(Config) ->
{ok, _, _} = ra:process_command(Leader, deq),

meck:new(ra_server, [passthrough]),
meck:expect(ra_server, handle_follower, fun (#install_snapshot_rpc{}, _) ->
exit(timeout);
(A, B) ->
meck:passthrough([A, B])
end),
meck:expect(ra_server, handle_follower,
fun (#install_snapshot_rpc{}, _) ->
exit(timeout);
(A, B) ->
meck:passthrough([A, B])
end),
%% start the down node again, catchup should involve sending a snapshot
ok = ra:restart_server(?SYS, Down),

Expand All @@ -994,7 +995,7 @@ snapshot_installation_with_call_crash(Config) ->
{ok, {N2Idx, _}, _} = ra:local_query(N2, fun ra_lib:id/1),
{ok, {N3Idx, _}, _} = ra:local_query(N3, fun ra_lib:id/1),
(N1Idx == N2Idx) and (N1Idx == N3Idx)
end, 20)),
end, 200)),
ok.


Expand Down
1 change: 0 additions & 1 deletion test/ra_dbg_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
-compile(nowarn_export_all).
-compile(export_all).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").

all() ->
Expand Down
2 changes: 2 additions & 0 deletions test/ra_directory_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ basics(_Config) ->
% registrations should always succeed - no negative test
Self = ra_directory:where_is(?SYS, UId),
UId = ra_directory:uid_of(?SYS, test1),
?assert(ra_directory:is_registered_uid(?SYS, UId)),
% ensure it can be read from another process
_ = spawn_link(
fun () ->
Expand All @@ -86,6 +87,7 @@ basics(_Config) ->
undefined = ra_directory:name_of(?SYS, UId),
undefined = ra_directory:cluster_name_of(?SYS, UId),
undefined = ra_directory:uid_of(?SYS, test1),
?assertNot(ra_directory:is_registered_uid(?SYS, UId)),
ok.

persistence(_Config) ->
Expand Down
50 changes: 33 additions & 17 deletions test/ra_log_2_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -67,31 +67,38 @@ groups() ->
].

init_per_suite(Config) ->
{ok, _} = ra:start([{data_dir, ?config(priv_dir, Config)},
{segment_max_entries, 128}]),
Config.

end_per_suite(Config) ->
application:stop(ra),
Config.

init_per_group(G, Config) ->
[{access_pattern, G} | Config].
DataDir = filename:join(?config(priv_dir, Config), G),
[{access_pattern, G},
{work_dir, DataDir}
| Config].

end_per_group(_, Config) ->
Config.

init_per_testcase(TestCase, Config) ->
ok = start_ra(Config),
ra_env:configure_logger(logger),
PrivDir = ?config(priv_dir, Config),
DataDir = ?config(work_dir, Config),
UId = <<(atom_to_binary(TestCase, utf8))/binary,
(atom_to_binary(?config(access_pattern, Config)))/binary>>,
ra:start(),
ok = ra_directory:register_name(default, UId, self(), undefined,
TestCase, TestCase),
[{uid, UId}, {test_case, TestCase}, {wal_dir, PrivDir} | Config].
ServerConf = #{log_init_args => #{uid => UId}},

ok = ra_lib:make_dir(filename:join([DataDir, node(), UId])),
ok = ra_lib:write_file(filename:join([DataDir, node(), UId, "config"]),
list_to_binary(io_lib:format("~p.", [ServerConf]))),

[{uid, UId}, {test_case, TestCase}, {wal_dir, DataDir} | Config].

end_per_testcase(_, _Config) ->
application:stop(ra),
ok.

-define(N1, {n1, node()}).
Expand Down Expand Up @@ -429,13 +436,13 @@ written_event_after_snapshot(Config) ->
ok.

writes_lower_than_snapshot_index_are_dropped(Config) ->
logger:set_primary_config(level, debug),
Log0 = ra_log_init(Config, #{min_snapshot_interval => 1}),
Log1 = ra_log:append({1, 1, <<"one">>}, Log0),
Log1b = deliver_all_log_events(ra_log:append({2, 1, <<"two">>}, Log1), 500),
true = erlang:suspend_process(whereis(ra_log_wal)),
Log2 = write_n(3, 500, 1, Log1b),
{Log3, _} = ra_log:update_release_cursor(100, #{}, 1,
<<"100">>, Log2),
{Log3, _} = ra_log:update_release_cursor(100, #{}, 1, <<"100">>, Log2),
Log4 = deliver_all_log_events(Log3, 500),

Overview = ra_log:overview(Log4),
Expand Down Expand Up @@ -473,8 +480,10 @@ writes_lower_than_snapshot_index_are_dropped(Config) ->
cache_size := 0,
cache_range := undefined,
last_written_index_term := {499, 1}}, OverviewAfter),
%% restart the app to test recovery with a "gappy" wal
application:stop(ra),
start_ra(Config),
erlang:monitor(process, whereis(ra_log_segment_writer)),
exit(whereis(ra_log_wal), kill),
receive
{'DOWN', _, _, _, _} = D ->
ct:fail("DOWN received ~p", [D])
Expand Down Expand Up @@ -589,7 +598,7 @@ recovery(Config) ->
Log4 = assert_log_events(Log3, Pred, 2000),
ra_log:close(Log4),
application:stop(ra),
ra:start(),
start_ra(Config),

Log5 = ra_log_init(Config),
{20, 3} = ra_log:last_index_term(Log5),
Expand All @@ -610,7 +619,7 @@ recover_bigly(Config) ->
Log2 = assert_log_events(Log1, Pred, 2000),
ra_log:close(Log2),
application:stop(ra),
ra:start(),
start_ra(Config),
Log = ra_log_init(Config),
{9999, 1} = ra_log:last_written(Log),
{9999, 1} = ra_log:last_index_term(Log),
Expand Down Expand Up @@ -1150,21 +1159,23 @@ transient_writer_is_handled(Config) ->
Self = self(),
UId2 = <<(?config(uid, Config))/binary, "sub_proc">>,
_Pid = spawn(fun () ->
ra_directory:register_name(default, <<"sub_proc">>,
ra_directory:register_name(default, UId2,
self(), undefined,
sub_proc, sub_proc),
Log0 = ra_log_init(Config, #{uid => UId2}),
Log1 = append_n(1, 10, 2, Log0),
% ignore events
Log2 = deliver_all_log_events(Log1, 500),
ra_log:close(Log2),
Self ! done
Self ! done,
ok
end),
receive done -> ok
after 2000 -> exit(timeout)
end,
ra:start(),
UId2 = ra_directory:unregister_name(default, UId2),
_ = ra_log_init(Config),
ct:pal("~p", [ra_directory:list_registered(default)]),
ok.

open_segments_limit(Config) ->
Expand Down Expand Up @@ -1499,8 +1510,8 @@ meta(Idx, Term, Cluster) ->
machine_version => 1}.

create_snapshot_chunk(Config, #{index := Idx} = Meta, Context) ->
OthDir = filename:join(?config(priv_dir, Config), "snapshot_installation"),
CPDir = filename:join(?config(priv_dir, Config), "checkpoints"),
OthDir = filename:join(?config(work_dir, Config), "snapshot_installation"),
CPDir = filename:join(?config(work_dir, Config), "checkpoints"),
ok = ra_lib:make_dir(OthDir),
ok = ra_lib:make_dir(CPDir),
Sn0 = ra_snapshot:init(<<"someotheruid_adsfasdf">>, ra_log_snapshot,
Expand Down Expand Up @@ -1538,3 +1549,8 @@ restart_wal() ->
ok = supervisor:terminate_child(SupPid, ra_log_wal),
{ok, _} = supervisor:restart_child(SupPid, ra_log_wal),
ok.

start_ra(Config) ->
{ok, _} = ra:start([{data_dir, ?config(work_dir, Config)},
{segment_max_entries, 128}]),
ok.
Loading

0 comments on commit 94cb3d2

Please sign in to comment.