Skip to content

Commit

Permalink
improvements to data safety when log infra crashes.
Browse files Browse the repository at this point in the history
Crashes to the segment writer and/or wal could cause
ra members to crash and/or corrupt their logs. This
change fixes a number of issues in this area and improves
handling for resending commands to the wal and coping better
with unavailability of infrastructure.
  • Loading branch information
kjnilsson committed Apr 11, 2024
1 parent 001c723 commit 5beb361
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 31 deletions.
87 changes: 75 additions & 12 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
sparse_read/2,
last_index_term/1,
set_last_index/2,
reset_to_last_known_written/1,
handle_event/2,
last_written/1,
fetch/2,
fetch_term/2,
fetch_term_no_cache/2,
next_index/1,
snapshot_state/1,
set_snapshot_state/2,
Expand Down Expand Up @@ -104,7 +106,7 @@
% if this is set a snapshot write is in progress for the
% index specified
cache = ra_log_cache:init() :: ra_log_cache:state(),
last_resend_time :: option(integer()),
last_resend_time :: option({integer(), WalPid :: pid() | undefined}),
reader :: ra_log_reader:state(),
readers = [] :: [pid()]
}).
Expand Down Expand Up @@ -431,6 +433,26 @@ set_last_index(Idx, #?MODULE{cfg = Cfg,
last_written_index_term = {LWIdx, LWTerm}}}
end.

%% this function forces both last_index and last_written_index_term to
%% the last know index to be written to the wal.
%% This is only used after the wal has been detected down
%% to try to avoid ever having to resend data to the wal
-spec reset_to_last_known_written(state()) -> state().
reset_to_last_known_written(#?MODULE{cfg = Cfg,
cache = Cache0,
last_index = LastIdx,
last_written_index_term = LW} = State0) ->
{Idx, Term, State} = last_index_term_in_wal(LastIdx, State0),
?DEBUG("~ts ~s: index: ~b term: ~b: previous ~w",
[Cfg#cfg.log_id, ?FUNCTION_NAME, Idx, Term, LW]),
Cache = ra_log_cache:set_last(Idx, Cache0),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, Idx),
State#?MODULE{last_index = Idx,
last_term = Term,
cache = Cache,
last_written_index_term = {Idx, Term}}.

-spec handle_event(event_body(), state()) ->
{state(), [effect()]}.
handle_event({written, {FromIdx, _ToIdx, _Term}},
Expand Down Expand Up @@ -480,15 +502,15 @@ handle_event({written, {FromIdx, ToIdx0, Term}},
[State#?MODULE.cfg#cfg.log_id, Term, ToIdx, OtherTerm]),
{State, []}
end;
handle_event({written, {FromIdx, _, _}},
handle_event({written, {FromIdx, _, _Term}},
#?MODULE{cfg = #cfg{log_id = LogId},
last_written_index_term = {LastWrittenIdx, _}} = State0)
last_written_index_term = {LastWrittenIdx, _}} = State)
when FromIdx > LastWrittenIdx + 1 ->
% leaving a gap is not ok - resend from cache
% leaving a gap is not ok - may need to resend from cache
Expected = LastWrittenIdx + 1,
?DEBUG("~ts: ra_log: written gap detected at ~b expected ~b!",
[LogId, FromIdx, Expected]),
{resend_from(Expected, State0), []};
?INFO("~ts: ra_log: written gap detected at ~b expected ~b!",
[LogId, FromIdx, Expected]),
{resend_from(Expected, State), []};
handle_event({truncate_cache, FromIdx, ToIdx}, State) ->
truncate_cache(FromIdx, ToIdx, State, []);
handle_event(flush_cache, State) ->
Expand Down Expand Up @@ -623,6 +645,16 @@ fetch_term(Idx, #?MODULE{cache = Cache, reader = Reader0} = State0) ->
{Term, State0}
end.

-spec fetch_term_no_cache(ra_index(), state()) ->
{option(ra_term()), state()}.
fetch_term_no_cache(Idx, #?MODULE{last_index = LastIdx,
first_index = FirstIdx} = State0)
when Idx < FirstIdx orelse Idx > LastIdx ->
{undefined, State0};
fetch_term_no_cache(Idx, #?MODULE{reader = Reader0} = State0) ->
{Term, Reader} = ra_log_reader:fetch_term(Idx, Reader0),
{Term, State0#?MODULE{reader = Reader}}.

-spec snapshot_state(State :: state()) -> ra_snapshot:state().
snapshot_state(State) ->
State#?MODULE.?FUNCTION_NAME.
Expand Down Expand Up @@ -958,6 +990,21 @@ wal_write(#?MODULE{cfg = #cfg{uid = UId,
exit(wal_down)
end.

%% unly used by resend to wal functionality and doesn't set the cache as it
%% is already set
wal_rewrite(#?MODULE{cfg = #cfg{uid = UId,
wal = Wal} = Cfg} = State,
{Idx, Term, Cmd}) ->
case ra_log_wal:write({UId, self()}, Wal, Idx, Term, Cmd) of
ok ->
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
State#?MODULE{last_index = Idx,
last_term = Term};
{error, wal_down} ->
exit(wal_down)
end.

wal_write_batch(#?MODULE{cfg = #cfg{uid = UId,
wal = Wal} = Cfg,
cache = Cache0} = State,
Expand Down Expand Up @@ -1017,15 +1064,18 @@ resend_from0(Idx, #?MODULE{cfg = Cfg,
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_RESENDS, LastIdx - Idx + 1),
lists:foldl(fun (I, Acc) ->
{I, T, C} = ra_log_cache:fetch(I, Cache),
wal_write(Acc, {I, T, C})
wal_rewrite(Acc, {I, T, C})
end,
State#?MODULE{last_resend_time = erlang:system_time(seconds)},
State#?MODULE{last_resend_time = {erlang:system_time(seconds),
whereis(Cfg#cfg.wal)}},
lists:seq(Idx, LastIdx));
resend_from0(Idx, #?MODULE{last_resend_time = LastResend,
resend_from0(Idx, #?MODULE{last_resend_time = {LastResend, WalPid},
cfg = #cfg{resend_window_seconds = ResendWindow}} = State) ->
case erlang:system_time(seconds) > LastResend + ResendWindow of
case erlang:system_time(seconds) > LastResend + ResendWindow orelse
(is_pid(WalPid) andalso not is_process_alive(WalPid)) of
true ->
% it has been more than a minute since last resend
% it has been more than the resend window since last resend
% _or_ the wal has been restarted since then
% ok to try again
resend_from(Idx, State#?MODULE{last_resend_time = undefined});
false ->
Expand Down Expand Up @@ -1161,6 +1211,19 @@ maps_with_values(Keys, Map) ->
end
end, [], Keys).

last_index_term_in_wal(Idx, #?MODULE{last_written_index_term = {Idx, Term}} = State) ->
% we reached the lower limit which is the last known written index
{Idx, Term, State};
last_index_term_in_wal(Idx, #?MODULE{reader = Reader0} = State) ->
case ra_log_reader:fetch_term(Idx, Reader0) of
{undefined, Reader} ->
last_index_term_in_wal(Idx-1, State#?MODULE{reader = Reader});
{Term, Reader} ->
%% if it can be read when bypassing the local cache it is in the
%% wal
{Idx, Term, State#?MODULE{reader = Reader}}
end.

%%%% TESTS

-ifdef(TEST).
Expand Down
7 changes: 6 additions & 1 deletion src/ra_log_cache.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
flush/1,
needs_flush/1,
size/1,
range/1
range/1,
dump_keys/1
]).

%% holds static or rarely changing fields
Expand Down Expand Up @@ -169,6 +170,10 @@ size(#?MODULE{tbl = Tid, cache = Cache}) ->
range(#?MODULE{range = Range}) ->
Range.

dump_keys(#?MODULE{tbl = Tid, cache = Cache}) ->
EtsKeys = [K || {K, _, _} <- ets:tab2list(Tid)],
{maps:keys(Cache), EtsKeys}.

%% INTERNAL

cache_without(FromIdx, Idx, Cache, _Tid)
Expand Down
8 changes: 5 additions & 3 deletions src/ra_log_segment.erl
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,7 @@ fold0(Cfg, Cache0, Idx, FinalIdx, Index, Fun, AccFun, Acc0) ->
Cfg#cfg.filename})
end;
_ ->
exit({ra_log_segment_unexpected_eof, Idx,
Cfg#cfg.filename})
exit({missing_key, Idx, Cfg#cfg.filename})
end.

-spec range(state()) -> option({ra_index(), ra_index()}).
Expand Down Expand Up @@ -450,9 +449,12 @@ dump_index(File) ->
end.

dump(File) ->
dump(File, fun (B) -> B end).

dump(File, Fun) ->
{ok, S0} = open(File, #{mode => read}),
{Idx, Last} = range(S0),
L = fold(S0, Idx, Last, fun erlang:binary_to_term/1,
L = fold(S0, Idx, Last, Fun,
fun (E, A) -> [E | A] end, []),
close(S0),
lists:reverse(L).
Expand Down
5 changes: 3 additions & 2 deletions src/ra_log_wal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@
-type wal_op() :: {cast, wal_command()} |
{call, from(), wal_command()}.

-spec write(writer_id(), atom(), ra_index(), ra_term(), term()) ->
-spec write(writer_id(), atom() | pid(), ra_index(), ra_term(), term()) ->
ok | {error, wal_down}.
write(From, Wal, Idx, Term, Cmd) ->
named_cast(Wal, {append, From, Idx, Term, Cmd}).
Expand Down Expand Up @@ -294,7 +294,8 @@ handle_batch(Ops, #state{conf = #conf{explicit_gc = Gc}} = State0) ->
end,
{ok, Actions, complete_batch(State)}.

terminate(_Reason, State) ->
terminate(Reason, State) ->
?DEBUG("wal: terminating with ~W", [Reason, 20]),
_ = cleanup(State),
ok.

Expand Down
20 changes: 18 additions & 2 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1048,9 +1048,24 @@ handle_follower(#append_entries_rpc{term = Term,
{NextState, State,
[{next_event, {ra_log_event, flush_cache}} | Effects]};
{error, wal_down} ->
%% at this point we know the wal process exited
%% but we dont know exactly which in flight messages
%% made it to the wal before it crashed.
%% we can check which entries actually made it to the
%% wal / mem_tables and revert the last_index and last written
%% index that should avoid the need to resend entries
%% after writing a gap into the log
%% Note that the wal writes and syncs to disk _before_
%% updating the ETS tables so this is perfectly ok
%% TODO: check this doesn't affect state machine
%% application as applied index could be higher
%% than written (if consensus has already been acheived from
%% other members)
Log = ra_log:reset_to_last_known_written(Log1),
{await_condition,
State1#{log => Log1,
condition => fun wal_down_condition/2},
State1#{log => Log,
condition => fun wal_down_condition/2
},
Effects0};
{error, _} = Err ->
exit(Err)
Expand Down Expand Up @@ -1380,6 +1395,7 @@ handle_await_condition(Msg, #{condition := Cond} = State0) ->
{true, State} ->
{follower, State, [{next_event, Msg}]};
{false, State} ->
%% do not log unhandled messages as they are often expected
% log_unhandled_msg(await_condition, Msg, State),
{await_condition, State, []}
end.
Expand Down
110 changes: 108 additions & 2 deletions test/coordination_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ all_tests() ->
bench,
disconnected_node_catches_up,
key_metrics,
recover_from_checkpoint
recover_from_checkpoint,
segment_writer_crash
].

groups() ->
Expand Down Expand Up @@ -785,6 +786,106 @@ recover_from_checkpoint(Config) ->
[ok = slave:stop(S) || {_, S} <- ServerIds],
ok.

segment_writer_crash(Config) ->
%% this test crashes the segment writer for a follower node whilst the
%% ra cluster is active and receiving and replicating commands.
%% it tests the segment writer and wal is able to recover without the
%% follower crashing.
%% Finally we stop and restart the follower to make sure it can recover
%% correactly and that the log data contains no missing entries
PrivDir = ?config(data_dir, Config),
ClusterName = ?config(cluster_name, Config),
ServerNames = [s1, s2, s3],
ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- ServerNames],
Configs = [begin
UId = atom_to_binary(Name, utf8),
#{cluster_name => ClusterName,
id => NodeId,
uid => UId,
initial_members => ServerIds,
machine => {module, ?MODULE, #{}},
log_init_args => #{uid => UId,
min_snapshot_interval => 5}}
end || {Name, _Node} = NodeId <- ServerIds],
{ok, Started, []} = ra:start_cluster(?SYS, Configs),


{ok, _, Leader} = ra:members(hd(Started)),
[{FollowerName, FollowerNode} = Follower, _] = lists:delete(Leader, Started),

Data = crypto:strong_rand_bytes(10_000),
WriterFun = fun Recur() ->
{ok, _, _} = ra:process_command(Leader, {?FUNCTION_NAME, Data}),
receive
stop ->
ok
after 1 ->
Recur()
end
end,
FollowerPid = ct_rpc:call(FollowerNode, erlang, whereis, [FollowerName]),
?assert(is_pid(FollowerPid)),

AwaitReplicated = fun () ->
LastIdxs =
[begin
{ok, #{current_term := T,
log := #{last_index := L}}, _} =
ra:member_overview(S),
{T, L}
end || {_, _N} = S <- ServerIds],
1 == length(lists:usort(LastIdxs))
end,
[begin
ct:pal("running iteration ~b", [I]),

WriterPid = spawn(WriterFun),
timer:sleep(rand:uniform(500) + 5_000),

case I rem 2 == 0 of
true ->
ct:pal("killing segment writer"),
_ = ct_rpc:call(FollowerNode, ra_log_wal, force_rollover, [ra_log_wal]),
timer:sleep(10),
Pid = ct_rpc:call(FollowerNode, erlang, whereis, [ra_log_segment_writer]),
true = ct_rpc:call(FollowerNode, erlang, exit, [Pid, kill]);
false ->
ct:pal("killing wal"),
Pid = ct_rpc:call(FollowerNode, erlang, whereis, [ra_log_wal]),
true = ct_rpc:call(FollowerNode, erlang, exit, [Pid, kill])
end,


timer:sleep(1000),
WriterPid ! stop,
await_condition(fun () -> not is_process_alive(WriterPid) end, 1000),

%% assert stuff
await_condition(AwaitReplicated, 100),
?assertMatch({ok, #{log := #{cache_size := 0}}, _},
ra:member_overview(Follower)),
%% follower hasn't crashed
?assertEqual(FollowerPid, ct_rpc:call(FollowerNode, erlang, whereis,
[FollowerName]))
end || I <- lists:seq(1, 10)],

%% stop and restart the follower
ok = ra:stop_server(Follower),
ok = ra:restart_server(Follower),

await_condition(AwaitReplicated, 100),

_ = ct_rpc:call(FollowerNode, ra_log_wal, force_rollover, [ra_log_wal]),

ok = ra:stop_server(Follower),
ok = ra:restart_server(Follower),

await_condition(AwaitReplicated, 100),


[ok = slave:stop(S) || {_, S} <- ServerIds],
ok.

%% Utility

test_local_msg(Leader, ReceiverNode, ExpectedSenderNode, CmdTag, Opts0) ->
Expand Down Expand Up @@ -855,14 +956,17 @@ search_paths() ->
code:get_path()).

start_follower(N, PrivDir) ->
start_follower(N, PrivDir, []).

start_follower(N, PrivDir, Env) ->
Dir0 = filename:join(PrivDir, N),
Dir = "'\"" ++ Dir0 ++ "\"'",
Host = get_current_host(),
Pa = string:join(["-pa" | search_paths()] ++ ["-s ra -ra data_dir", Dir], " "),
ct:pal("starting secondary node with ~ts on host ~ts for node ~ts", [Pa, Host, node()]),
{ok, S} = slave:start_link(Host, N, Pa),
ok = ct_rpc:call(S, ?MODULE, node_setup, [PrivDir]),
ok = erpc:call(S, ra, start, []),
{ok, _} = erpc:call(S, ra, start, [Env]),
ok = ct_rpc:call(S, logger, set_primary_config,
[level, all]),
S.
Expand Down Expand Up @@ -901,6 +1005,8 @@ apply(#{index := Idx}, checkpoint, State) ->
{State, ok, [{checkpoint, Idx, CheckpointState}]};
apply(#{index := Idx}, promote_checkpoint, State) ->
{State, ok, [{release_cursor, Idx}]};
apply(#{index := _Idx}, {segment_writer_crash, _}, State) ->
{State, ok, []};
apply(#{index := Idx}, _Cmd, State) ->
{State, ok, [{release_cursor, Idx, State}]}.

Expand Down
Loading

0 comments on commit 5beb361

Please sign in to comment.