Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for pre-init recovery and handling of gaps in mem tables due to snapshotting #456

Merged
merged 1 commit into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/ra_directory.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
pid_of/2,
uid_of/2,
overview/1,
list_registered/1
list_registered/1,
is_registered_uid/2
]).

-export_type([
Expand Down Expand Up @@ -199,6 +200,13 @@ list_registered(System) when is_atom(System) ->
Tbl = get_reverse(System),
dets:select(Tbl, [{'_', [], ['$_']}]).

-spec is_registered_uid(atom(), ra_uid()) -> boolean().
is_registered_uid(System, UId)
when is_atom(System) andalso
is_binary(UId) ->
Tbl = get_reverse(System),
[] =/= dets:select(Tbl, [{{'_', UId}, [], ['$_']}]).

get_name(#{directory := Tbl}) ->
Tbl;
get_name(System) when is_atom(System) ->
Expand Down
32 changes: 27 additions & 5 deletions src/ra_log_pre_init.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

-record(state, {}).

-define(ETSTBL, ra_log_snapshot_state).
%%%===================================================================
%%% API functions
%%%===================================================================
Expand All @@ -40,7 +41,16 @@ init([System]) ->
Regd = ra_directory:list_registered(System),
?INFO("ra system '~ts' running pre init for ~b registered servers",
[System, length(Regd)]),
_ = [catch(pre_init(System, Name)) || {Name, _U} <- Regd],
_ = [begin
try pre_init(System, Name, UId) of
ok -> ok
catch _:Err ->
?ERROR("pre_init failed in system ~s for UId ~ts with name ~ts"
" This error may need manual intervention",
[System, UId, Name]),
throw({stop, {error, Err}})
end
end|| {Name, UId} <- Regd],
{ok, #state{} , hibernate}.

handle_call(_Request, _From, State) ->
Expand All @@ -63,8 +73,20 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%===================================================================

pre_init(System, Name) ->
{ok, #{log_init_args := Log}} = ra_server_sup_sup:recover_config(System, Name),
_ = ra_log:pre_init(Log),
ok.
pre_init(System, Name, UId) ->
case ets:lookup(?ETSTBL, UId) of
[{_, _}] ->
%% already initialised
ok;
[] ->
case ra_system:fetch(System) of
undefined ->
{error, system_not_started};
SysCfg ->
{ok, #{log_init_args := Log}} =
ra_server_sup_sup:recover_config(System, Name),
ok = ra_log:pre_init(Log#{system_config => SysCfg}),
ok
end
end.

83 changes: 50 additions & 33 deletions src/ra_log_segment_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -308,40 +308,57 @@ append_to_segment(_, _, StartIdx, EndIdx, Seg, Closed, _State)
when StartIdx >= EndIdx ->
{Seg, Closed};
append_to_segment(UId, Tid, Idx, EndIdx, Seg0, Closed, State) ->
[{_, Term, Data0}] = ets:lookup(Tid, Idx),
Data = term_to_iovec(Data0),
DataSize = iolist_size(Data),
case ra_log_segment:append(Seg0, Idx, Term, {DataSize, Data}) of
{ok, Seg} ->
ok = counters:add(State#state.counter, ?C_ENTRIES, 1),
%% this isn't completely accurate as firstly the segment may not
%% have written it to disk and it doesn't include data written to
%% the segment index but is probably good enough to get comparative
%% data rates for different Ra components
ok = counters:add(State#state.counter, ?C_BYTES_WRITTEN, DataSize),
append_to_segment(UId, Tid, Idx+1, EndIdx, Seg, Closed, State);
{error, full} ->
% close and open a new segment
case open_successor_segment(Seg0, State#state.segment_conf) of
undefined ->
%% a successor cannot be opened - this is most likely due
%% to the directory having been deleted.
%% clear close mem tables here
_ = ets:delete(Tid),
_ = clean_closed_mem_tables(State#state.system, UId, Tid),
undefined;
Seg ->
ok = counters:add(State#state.counter, ?C_SEGMENTS, 1),
%% re-evaluate snapshot state for the server in case
%% a snapshot has completed during segment flush
StartIdx = start_index(UId, Idx),
% recurse
append_to_segment(UId, Tid, StartIdx, EndIdx, Seg,
[Seg0 | Closed], State)
case ets:lookup(Tid, Idx) of
[] ->
%% oh dear, an expected index was not found in the mem table.
?WARN("segment_writer: missing index ~b in mem table ~s for uid ~s"
"checking to see if UId has been unregistered",
[Idx, Tid, UId]),
case ra_directory:is_registered_uid(State#state.system, UId) of
true ->
?ERROR("segment_writer: uid ~s is registered, exiting...",
[UId]),
exit({missing_index, UId, Idx});
false ->
?INFO("segment_writer: UId ~s was not registered, skipping",
[UId]),
undefined
end;
{error, Posix} ->
FileName = ra_log_segment:filename(Seg0),
exit({segment_writer_append_error, FileName, Posix})
[{_, Term, Data0}] ->
Data = term_to_iovec(Data0),
DataSize = iolist_size(Data),
case ra_log_segment:append(Seg0, Idx, Term, {DataSize, Data}) of
{ok, Seg} ->
ok = counters:add(State#state.counter, ?C_ENTRIES, 1),
%% this isn't completely accurate as firstly the segment may not
%% have written it to disk and it doesn't include data written to
%% the segment index but is probably good enough to get comparative
%% data rates for different Ra components
ok = counters:add(State#state.counter, ?C_BYTES_WRITTEN, DataSize),
append_to_segment(UId, Tid, Idx+1, EndIdx, Seg, Closed, State);
{error, full} ->
% close and open a new segment
case open_successor_segment(Seg0, State#state.segment_conf) of
undefined ->
%% a successor cannot be opened - this is most likely due
%% to the directory having been deleted.
%% clear close mem tables here
_ = ets:delete(Tid),
_ = clean_closed_mem_tables(State#state.system, UId, Tid),
undefined;
Seg ->
ok = counters:add(State#state.counter, ?C_SEGMENTS, 1),
%% re-evaluate snapshot state for the server in case
%% a snapshot has completed during segment flush
StartIdx = start_index(UId, Idx),
% recurse
append_to_segment(UId, Tid, StartIdx, EndIdx, Seg,
[Seg0 | Closed], State)
end;
{error, Posix} ->
FileName = ra_log_segment:filename(Seg0),
exit({segment_writer_append_error, FileName, Posix})
end
end.

find_segment_files(Dir) ->
Expand Down
14 changes: 10 additions & 4 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1022,11 +1022,17 @@ terminate(Reason, StateName,
ra_server:system_config(ServerState),
UId = uid(State),
Id = id(State),
_ = ra_server:terminate(ServerState, Reason),
case Reason of
{shutdown, delete} ->
Parent = ra_directory:where_is_parent(Names, UId),
%% we need to unregister _before_ the log closes
%% in the ra_server:terminate/2 function
%% as we want the directory to be deleted
%% after the server is removed from the ra directory.
%% This is so that the segment writer can avoid
%% crashing if it detects a missing key
catch ra_directory:unregister_name(Names, UId),
_ = ra_server:terminate(ServerState, Reason),
catch ra_log_meta:delete_sync(MetaName, UId),
catch ra_counters:delete(Id),
Self = self(),
Expand All @@ -1044,9 +1050,9 @@ terminate(Reason, StateName,
end
end),
ok;


_ -> ok
_ ->
_ = ra_server:terminate(ServerState, Reason),
ok
end,
catch ra_leaderboard:clear(ClusterName),
_ = ets:delete(ra_metrics, MetricsKey),
Expand Down
13 changes: 7 additions & 6 deletions test/ra_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -977,11 +977,12 @@ snapshot_installation_with_call_crash(Config) ->
{ok, _, _} = ra:process_command(Leader, deq),

meck:new(ra_server, [passthrough]),
meck:expect(ra_server, handle_follower, fun (#install_snapshot_rpc{}, _) ->
exit(timeout);
(A, B) ->
meck:passthrough([A, B])
end),
meck:expect(ra_server, handle_follower,
fun (#install_snapshot_rpc{}, _) ->
exit(timeout);
(A, B) ->
meck:passthrough([A, B])
end),
%% start the down node again, catchup should involve sending a snapshot
ok = ra:restart_server(?SYS, Down),

Expand All @@ -994,7 +995,7 @@ snapshot_installation_with_call_crash(Config) ->
{ok, {N2Idx, _}, _} = ra:local_query(N2, fun ra_lib:id/1),
{ok, {N3Idx, _}, _} = ra:local_query(N3, fun ra_lib:id/1),
(N1Idx == N2Idx) and (N1Idx == N3Idx)
end, 20)),
end, 200)),
ok.


Expand Down
1 change: 0 additions & 1 deletion test/ra_dbg_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
-compile(nowarn_export_all).
-compile(export_all).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").

all() ->
Expand Down
2 changes: 2 additions & 0 deletions test/ra_directory_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ basics(_Config) ->
% registrations should always succeed - no negative test
Self = ra_directory:where_is(?SYS, UId),
UId = ra_directory:uid_of(?SYS, test1),
?assert(ra_directory:is_registered_uid(?SYS, UId)),
% ensure it can be read from another process
_ = spawn_link(
fun () ->
Expand All @@ -86,6 +87,7 @@ basics(_Config) ->
undefined = ra_directory:name_of(?SYS, UId),
undefined = ra_directory:cluster_name_of(?SYS, UId),
undefined = ra_directory:uid_of(?SYS, test1),
?assertNot(ra_directory:is_registered_uid(?SYS, UId)),
ok.

persistence(_Config) ->
Expand Down
50 changes: 33 additions & 17 deletions test/ra_log_2_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -67,31 +67,38 @@ groups() ->
].

init_per_suite(Config) ->
{ok, _} = ra:start([{data_dir, ?config(priv_dir, Config)},
{segment_max_entries, 128}]),
Config.

end_per_suite(Config) ->
application:stop(ra),
Config.

init_per_group(G, Config) ->
[{access_pattern, G} | Config].
DataDir = filename:join(?config(priv_dir, Config), G),
[{access_pattern, G},
{work_dir, DataDir}
| Config].

end_per_group(_, Config) ->
Config.

init_per_testcase(TestCase, Config) ->
ok = start_ra(Config),
ra_env:configure_logger(logger),
PrivDir = ?config(priv_dir, Config),
DataDir = ?config(work_dir, Config),
UId = <<(atom_to_binary(TestCase, utf8))/binary,
(atom_to_binary(?config(access_pattern, Config)))/binary>>,
ra:start(),
ok = ra_directory:register_name(default, UId, self(), undefined,
TestCase, TestCase),
[{uid, UId}, {test_case, TestCase}, {wal_dir, PrivDir} | Config].
ServerConf = #{log_init_args => #{uid => UId}},

ok = ra_lib:make_dir(filename:join([DataDir, node(), UId])),
ok = ra_lib:write_file(filename:join([DataDir, node(), UId, "config"]),
list_to_binary(io_lib:format("~p.", [ServerConf]))),

[{uid, UId}, {test_case, TestCase}, {wal_dir, DataDir} | Config].

end_per_testcase(_, _Config) ->
application:stop(ra),
ok.

-define(N1, {n1, node()}).
Expand Down Expand Up @@ -429,13 +436,13 @@ written_event_after_snapshot(Config) ->
ok.

writes_lower_than_snapshot_index_are_dropped(Config) ->
logger:set_primary_config(level, debug),
Log0 = ra_log_init(Config, #{min_snapshot_interval => 1}),
Log1 = ra_log:append({1, 1, <<"one">>}, Log0),
Log1b = deliver_all_log_events(ra_log:append({2, 1, <<"two">>}, Log1), 500),
true = erlang:suspend_process(whereis(ra_log_wal)),
Log2 = write_n(3, 500, 1, Log1b),
{Log3, _} = ra_log:update_release_cursor(100, #{}, 1,
<<"100">>, Log2),
{Log3, _} = ra_log:update_release_cursor(100, #{}, 1, <<"100">>, Log2),
Log4 = deliver_all_log_events(Log3, 500),

Overview = ra_log:overview(Log4),
Expand Down Expand Up @@ -473,8 +480,10 @@ writes_lower_than_snapshot_index_are_dropped(Config) ->
cache_size := 0,
cache_range := undefined,
last_written_index_term := {499, 1}}, OverviewAfter),
%% restart the app to test recovery with a "gappy" wal
application:stop(ra),
start_ra(Config),
erlang:monitor(process, whereis(ra_log_segment_writer)),
exit(whereis(ra_log_wal), kill),
receive
{'DOWN', _, _, _, _} = D ->
ct:fail("DOWN received ~p", [D])
Expand Down Expand Up @@ -589,7 +598,7 @@ recovery(Config) ->
Log4 = assert_log_events(Log3, Pred, 2000),
ra_log:close(Log4),
application:stop(ra),
ra:start(),
start_ra(Config),

Log5 = ra_log_init(Config),
{20, 3} = ra_log:last_index_term(Log5),
Expand All @@ -610,7 +619,7 @@ recover_bigly(Config) ->
Log2 = assert_log_events(Log1, Pred, 2000),
ra_log:close(Log2),
application:stop(ra),
ra:start(),
start_ra(Config),
Log = ra_log_init(Config),
{9999, 1} = ra_log:last_written(Log),
{9999, 1} = ra_log:last_index_term(Log),
Expand Down Expand Up @@ -1150,21 +1159,23 @@ transient_writer_is_handled(Config) ->
Self = self(),
UId2 = <<(?config(uid, Config))/binary, "sub_proc">>,
_Pid = spawn(fun () ->
ra_directory:register_name(default, <<"sub_proc">>,
ra_directory:register_name(default, UId2,
self(), undefined,
sub_proc, sub_proc),
Log0 = ra_log_init(Config, #{uid => UId2}),
Log1 = append_n(1, 10, 2, Log0),
% ignore events
Log2 = deliver_all_log_events(Log1, 500),
ra_log:close(Log2),
Self ! done
Self ! done,
ok
end),
receive done -> ok
after 2000 -> exit(timeout)
end,
ra:start(),
UId2 = ra_directory:unregister_name(default, UId2),
_ = ra_log_init(Config),
ct:pal("~p", [ra_directory:list_registered(default)]),
ok.

open_segments_limit(Config) ->
Expand Down Expand Up @@ -1499,8 +1510,8 @@ meta(Idx, Term, Cluster) ->
machine_version => 1}.

create_snapshot_chunk(Config, #{index := Idx} = Meta, Context) ->
OthDir = filename:join(?config(priv_dir, Config), "snapshot_installation"),
CPDir = filename:join(?config(priv_dir, Config), "checkpoints"),
OthDir = filename:join(?config(work_dir, Config), "snapshot_installation"),
CPDir = filename:join(?config(work_dir, Config), "checkpoints"),
ok = ra_lib:make_dir(OthDir),
ok = ra_lib:make_dir(CPDir),
Sn0 = ra_snapshot:init(<<"someotheruid_adsfasdf">>, ra_log_snapshot,
Expand Down Expand Up @@ -1538,3 +1549,8 @@ restart_wal() ->
ok = supervisor:terminate_child(SupPid, ra_log_wal),
{ok, _} = supervisor:restart_child(SupPid, ra_log_wal),
ok.

start_ra(Config) ->
{ok, _} = ra:start([{data_dir, ?config(work_dir, Config)},
{segment_max_entries, 128}]),
ok.
Loading
Loading