Skip to content

Commit

Permalink
Fix bugs in pre-init
Browse files Browse the repository at this point in the history
The first thing a ra system does when it starts is run a pre-init
phase for each registered Ra server, mostly to recover the
ra_log_snapshot_state table. This apperas to have been broken
for along time and the ra_log_snapshot_table has not been populated
ahead of WAL / segment writer recovery. This was fine as this bit
was just an optimisation and never affected the workings of
the Ra infrastructure.

However since 5b7a265 it needs
this in order to avoid the segment writer crashing when it detects
a gap (caused by the WAL dropping entries lower than the current
snapshot).

This commit mostly fixes the pre-init process but also addresses
a potential race condition which still could cause the segment
writer to crash for the same reason.
  • Loading branch information
kjnilsson committed Jul 18, 2024
1 parent 5365e43 commit 8e2bbf8
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 63 deletions.
10 changes: 9 additions & 1 deletion src/ra_directory.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
pid_of/2,
uid_of/2,
overview/1,
list_registered/1
list_registered/1,
is_registered_uid/2
]).

-export_type([
Expand Down Expand Up @@ -199,6 +200,13 @@ list_registered(System) when is_atom(System) ->
Tbl = get_reverse(System),
dets:select(Tbl, [{'_', [], ['$_']}]).

-spec is_registered_uid(atom(), ra_uid()) -> boolean().
is_registered_uid(System, UId)
when is_atom(System) andalso
is_binary(UId) ->
Tbl = get_reverse(System),
[] =/= dets:select(Tbl, [{{'_', UId}, [], ['$_']}]).

get_name(#{directory := Tbl}) ->
Tbl;
get_name(System) when is_atom(System) ->
Expand Down
32 changes: 27 additions & 5 deletions src/ra_log_pre_init.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

-record(state, {}).

-define(ETSTBL, ra_log_snapshot_state).
%%%===================================================================
%%% API functions
%%%===================================================================
Expand All @@ -40,7 +41,16 @@ init([System]) ->
Regd = ra_directory:list_registered(System),
?INFO("ra system '~ts' running pre init for ~b registered servers",
[System, length(Regd)]),
_ = [catch(pre_init(System, Name)) || {Name, _U} <- Regd],
_ = [begin
try pre_init(System, Name, UId) of
ok -> ok
catch _:Err ->
?ERROR("pre_init failed in system ~s for UId ~ts with name ~ts"
" This error may need manual intervention",
[System, UId, Name]),
throw({stop, {error, Err}})
end
end|| {Name, UId} <- Regd],
{ok, #state{} , hibernate}.

handle_call(_Request, _From, State) ->
Expand All @@ -63,8 +73,20 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%===================================================================

pre_init(System, Name) ->
{ok, #{log_init_args := Log}} = ra_server_sup_sup:recover_config(System, Name),
_ = ra_log:pre_init(Log),
ok.
pre_init(System, Name, UId) ->
case ets:lookup(?ETSTBL, UId) of
[{_, _}] ->
%% already initialised
ok;
[] ->
case ra_system:fetch(System) of
undefined ->
{error, system_not_started};
SysCfg ->
{ok, #{log_init_args := Log}} =
ra_server_sup_sup:recover_config(System, Name),
ok = ra_log:pre_init(Log#{system_config => SysCfg}),
ok
end
end.

87 changes: 52 additions & 35 deletions src/ra_log_segment_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ do_segment({ServerUId, StartIdx0, EndIdx, Tid},
_ = clean_closed_mem_tables(System, ServerUId, Tid),
ok;
Segment0 ->
case append_to_segment(ServerUId, Tid, StartIdx0, EndIdx,
Segment0, State) of
case catch append_to_segment(ServerUId, Tid, StartIdx0, EndIdx,
Segment0, State) of
undefined ->
?WARN("segment_writer: skipping segments for ~w as
directory ~ts disappeared whilst writing",
Expand Down Expand Up @@ -308,40 +308,57 @@ append_to_segment(_, _, StartIdx, EndIdx, Seg, Closed, _State)
when StartIdx >= EndIdx ->
{Seg, Closed};
append_to_segment(UId, Tid, Idx, EndIdx, Seg0, Closed, State) ->
[{_, Term, Data0}] = ets:lookup(Tid, Idx),
Data = term_to_iovec(Data0),
DataSize = iolist_size(Data),
case ra_log_segment:append(Seg0, Idx, Term, {DataSize, Data}) of
{ok, Seg} ->
ok = counters:add(State#state.counter, ?C_ENTRIES, 1),
%% this isn't completely accurate as firstly the segment may not
%% have written it to disk and it doesn't include data written to
%% the segment index but is probably good enough to get comparative
%% data rates for different Ra components
ok = counters:add(State#state.counter, ?C_BYTES_WRITTEN, DataSize),
append_to_segment(UId, Tid, Idx+1, EndIdx, Seg, Closed, State);
{error, full} ->
% close and open a new segment
case open_successor_segment(Seg0, State#state.segment_conf) of
undefined ->
%% a successor cannot be opened - this is most likely due
%% to the directory having been deleted.
%% clear close mem tables here
_ = ets:delete(Tid),
_ = clean_closed_mem_tables(State#state.system, UId, Tid),
undefined;
Seg ->
ok = counters:add(State#state.counter, ?C_SEGMENTS, 1),
%% re-evaluate snapshot state for the server in case
%% a snapshot has completed during segment flush
StartIdx = start_index(UId, Idx),
% recurse
append_to_segment(UId, Tid, StartIdx, EndIdx, Seg,
[Seg0 | Closed], State)
case ets:lookup(Tid, Idx) of
[] ->
%% oh dear, an expected index was not found in the mem table.
?WARN("segment_writer: missing index ~b in mem table ~s for uid ~s"
"checking to see if UId has been unregistered",
[Idx, Tid, UId]),
case ra_directory:is_registered_uid(State#state.system, UId) of
true ->
?ERROR("segment_writer: uid ~s is registered, exiting...",
[UId]),
exit({missing_index, UId, Idx});
false ->
?INFO("segment_writer: UId ~s was not registered, skipping",
[UId]),
throw(undefined)
end;
{error, Posix} ->
FileName = ra_log_segment:filename(Seg0),
exit({segment_writer_append_error, FileName, Posix})
[{_, Term, Data0}] ->
Data = term_to_iovec(Data0),
DataSize = iolist_size(Data),
case ra_log_segment:append(Seg0, Idx, Term, {DataSize, Data}) of
{ok, Seg} ->
ok = counters:add(State#state.counter, ?C_ENTRIES, 1),
%% this isn't completely accurate as firstly the segment may not
%% have written it to disk and it doesn't include data written to
%% the segment index but is probably good enough to get comparative
%% data rates for different Ra components
ok = counters:add(State#state.counter, ?C_BYTES_WRITTEN, DataSize),
append_to_segment(UId, Tid, Idx+1, EndIdx, Seg, Closed, State);
{error, full} ->
% close and open a new segment
case open_successor_segment(Seg0, State#state.segment_conf) of
undefined ->
%% a successor cannot be opened - this is most likely due
%% to the directory having been deleted.
%% clear close mem tables here
_ = ets:delete(Tid),
_ = clean_closed_mem_tables(State#state.system, UId, Tid),
undefined;
Seg ->
ok = counters:add(State#state.counter, ?C_SEGMENTS, 1),
%% re-evaluate snapshot state for the server in case
%% a snapshot has completed during segment flush
StartIdx = start_index(UId, Idx),
% recurse
append_to_segment(UId, Tid, StartIdx, EndIdx, Seg,
[Seg0 | Closed], State)
end;
{error, Posix} ->
FileName = ra_log_segment:filename(Seg0),
exit({segment_writer_append_error, FileName, Posix})
end
end.

find_segment_files(Dir) ->
Expand Down
14 changes: 10 additions & 4 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1022,11 +1022,17 @@ terminate(Reason, StateName,
ra_server:system_config(ServerState),
UId = uid(State),
Id = id(State),
_ = ra_server:terminate(ServerState, Reason),
case Reason of
{shutdown, delete} ->
Parent = ra_directory:where_is_parent(Names, UId),
%% we need to unregister _before_ the log closes
%% in the ra_server:terminate/2 function
%% as we want the directory to be deleted
%% after the server is removed from the ra directory.
%% This is so that the segment writer can avoid
%% crashing if it detects a missing key
catch ra_directory:unregister_name(Names, UId),
_ = ra_server:terminate(ServerState, Reason),
catch ra_log_meta:delete_sync(MetaName, UId),
catch ra_counters:delete(Id),
Self = self(),
Expand All @@ -1044,9 +1050,9 @@ terminate(Reason, StateName,
end
end),
ok;


_ -> ok
_ ->
_ = ra_server:terminate(ServerState, Reason),
ok
end,
catch ra_leaderboard:clear(ClusterName),
_ = ets:delete(ra_metrics, MetricsKey),
Expand Down
1 change: 0 additions & 1 deletion test/ra_dbg_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
-compile(nowarn_export_all).
-compile(export_all).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").

all() ->
Expand Down
2 changes: 2 additions & 0 deletions test/ra_directory_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ basics(_Config) ->
% registrations should always succeed - no negative test
Self = ra_directory:where_is(?SYS, UId),
UId = ra_directory:uid_of(?SYS, test1),
?assert(ra_directory:is_registered_uid(?SYS, UId)),
% ensure it can be read from another process
_ = spawn_link(
fun () ->
Expand All @@ -86,6 +87,7 @@ basics(_Config) ->
undefined = ra_directory:name_of(?SYS, UId),
undefined = ra_directory:cluster_name_of(?SYS, UId),
undefined = ra_directory:uid_of(?SYS, test1),
?assertNot(ra_directory:is_registered_uid(?SYS, UId)),
ok.

persistence(_Config) ->
Expand Down
50 changes: 33 additions & 17 deletions test/ra_log_2_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -67,31 +67,38 @@ groups() ->
].

init_per_suite(Config) ->
{ok, _} = ra:start([{data_dir, ?config(priv_dir, Config)},
{segment_max_entries, 128}]),
Config.

end_per_suite(Config) ->
application:stop(ra),
Config.

init_per_group(G, Config) ->
[{access_pattern, G} | Config].
DataDir = filename:join(?config(priv_dir, Config), G),
[{access_pattern, G},
{work_dir, DataDir}
| Config].

end_per_group(_, Config) ->
Config.

init_per_testcase(TestCase, Config) ->
ok = start_ra(Config),
ra_env:configure_logger(logger),
PrivDir = ?config(priv_dir, Config),
DataDir = ?config(work_dir, Config),
UId = <<(atom_to_binary(TestCase, utf8))/binary,
(atom_to_binary(?config(access_pattern, Config)))/binary>>,
ra:start(),
ok = ra_directory:register_name(default, UId, self(), undefined,
TestCase, TestCase),
[{uid, UId}, {test_case, TestCase}, {wal_dir, PrivDir} | Config].
ServerConf = #{log_init_args => #{uid => UId}},

ok = ra_lib:make_dir(filename:join([DataDir, node(), UId])),
ok = ra_lib:write_file(filename:join([DataDir, node(), UId, "config"]),
list_to_binary(io_lib:format("~p.", [ServerConf]))),

[{uid, UId}, {test_case, TestCase}, {wal_dir, DataDir} | Config].

end_per_testcase(_, _Config) ->
application:stop(ra),
ok.

-define(N1, {n1, node()}).
Expand Down Expand Up @@ -429,13 +436,13 @@ written_event_after_snapshot(Config) ->
ok.

writes_lower_than_snapshot_index_are_dropped(Config) ->
logger:set_primary_config(level, debug),
Log0 = ra_log_init(Config, #{min_snapshot_interval => 1}),
Log1 = ra_log:append({1, 1, <<"one">>}, Log0),
Log1b = deliver_all_log_events(ra_log:append({2, 1, <<"two">>}, Log1), 500),
true = erlang:suspend_process(whereis(ra_log_wal)),
Log2 = write_n(3, 500, 1, Log1b),
{Log3, _} = ra_log:update_release_cursor(100, #{}, 1,
<<"100">>, Log2),
{Log3, _} = ra_log:update_release_cursor(100, #{}, 1, <<"100">>, Log2),
Log4 = deliver_all_log_events(Log3, 500),

Overview = ra_log:overview(Log4),
Expand Down Expand Up @@ -473,8 +480,10 @@ writes_lower_than_snapshot_index_are_dropped(Config) ->
cache_size := 0,
cache_range := undefined,
last_written_index_term := {499, 1}}, OverviewAfter),
%% restart the app to test recovery with a "gappy" wal
application:stop(ra),
start_ra(Config),
erlang:monitor(process, whereis(ra_log_segment_writer)),
exit(whereis(ra_log_wal), kill),
receive
{'DOWN', _, _, _, _} = D ->
ct:fail("DOWN received ~p", [D])
Expand Down Expand Up @@ -589,7 +598,7 @@ recovery(Config) ->
Log4 = assert_log_events(Log3, Pred, 2000),
ra_log:close(Log4),
application:stop(ra),
ra:start(),
start_ra(Config),

Log5 = ra_log_init(Config),
{20, 3} = ra_log:last_index_term(Log5),
Expand All @@ -610,7 +619,7 @@ recover_bigly(Config) ->
Log2 = assert_log_events(Log1, Pred, 2000),
ra_log:close(Log2),
application:stop(ra),
ra:start(),
start_ra(Config),
Log = ra_log_init(Config),
{9999, 1} = ra_log:last_written(Log),
{9999, 1} = ra_log:last_index_term(Log),
Expand Down Expand Up @@ -1150,21 +1159,23 @@ transient_writer_is_handled(Config) ->
Self = self(),
UId2 = <<(?config(uid, Config))/binary, "sub_proc">>,
_Pid = spawn(fun () ->
ra_directory:register_name(default, <<"sub_proc">>,
ra_directory:register_name(default, UId2,
self(), undefined,
sub_proc, sub_proc),
Log0 = ra_log_init(Config, #{uid => UId2}),
Log1 = append_n(1, 10, 2, Log0),
% ignore events
Log2 = deliver_all_log_events(Log1, 500),
ra_log:close(Log2),
Self ! done
Self ! done,
ok
end),
receive done -> ok
after 2000 -> exit(timeout)
end,
ra:start(),
UId2 = ra_directory:unregister_name(default, UId2),
_ = ra_log_init(Config),
ct:pal("~p", [ra_directory:list_registered(default)]),
ok.

open_segments_limit(Config) ->
Expand Down Expand Up @@ -1499,8 +1510,8 @@ meta(Idx, Term, Cluster) ->
machine_version => 1}.

create_snapshot_chunk(Config, #{index := Idx} = Meta, Context) ->
OthDir = filename:join(?config(priv_dir, Config), "snapshot_installation"),
CPDir = filename:join(?config(priv_dir, Config), "checkpoints"),
OthDir = filename:join(?config(work_dir, Config), "snapshot_installation"),
CPDir = filename:join(?config(work_dir, Config), "checkpoints"),
ok = ra_lib:make_dir(OthDir),
ok = ra_lib:make_dir(CPDir),
Sn0 = ra_snapshot:init(<<"someotheruid_adsfasdf">>, ra_log_snapshot,
Expand Down Expand Up @@ -1538,3 +1549,8 @@ restart_wal() ->
ok = supervisor:terminate_child(SupPid, ra_log_wal),
{ok, _} = supervisor:restart_child(SupPid, ra_log_wal),
ok.

start_ra(Config) ->
{ok, _} = ra:start([{data_dir, ?config(work_dir, Config)},
{segment_max_entries, 128}]),
ok.
Loading

0 comments on commit 8e2bbf8

Please sign in to comment.