Skip to content

Commit

Permalink
Various improvements to data safety when log infrastructure processes…
Browse files Browse the repository at this point in the history
… encounter faults.

In particular there are many improvements and fixes relating to the server -> wal resend protocol including:

Bug fix to ra_log_cache that would cause most triggered resends result in a ra process crash.
Dropping fewer messages using the gen_state postpone feature.
Ra leaders would previously just exit with wal_down - now they enter the same await_condition state although with a shorter timeout after which the begin a leader transfer process
Improved detection and availability when a command is lost on the way to the wal and no further commands are sent.
Also there is a new feature to configure on a per system basis what kind of server recovery should take place when a ra system starts/restarts. There are 3 options:

undefined : do not restart any ra server
registered: restart all locally registered servers for the system
mfa: call a custom function that performs the restart.
This feature will allow dynamically started ra server to be restarted should the ra system crash and restart.

Also improvements to code coverage and refactoring.
improvements to data safety when log infra crashes.
  • Loading branch information
kjnilsson committed Apr 24, 2024
1 parent 41beec4 commit 80d041c
Show file tree
Hide file tree
Showing 25 changed files with 1,326 additions and 328 deletions.
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ PLT_APPS += eunit proper syntax_tools erts kernel stdlib common_test inets aten
EDOC_OUTPUT = docs
EDOC_OPTS = {pretty_printer, erl_pp}, {sort_functions, false}

COVER_EXCLUDE_MODS = ra_server_meck_original \
ra_server_proc_meck_original \
ra_log_wal_meck_original \
ra_log_segment_writer_meck_original \
ra_log_meck_original \
ra_snapshot_meck_original \
ra_machine_meck_original \
ra_log_meta_meck_original

all::

escript-zip::
Expand Down
35 changes: 35 additions & 0 deletions src/ra_file.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries.
%%
%% @hidden
-module(ra_file).

-include("ra.hrl").

-define(HANDLE_EAGAIN(Op),
case Op of
{error, eagain} ->
?INFO("EAGAIN during file operation, retrying once in 10ms...", []),
timer:sleep(10),
case Op of
{error, eagain} = Err ->
?INFO("EAGAIN again during file operation", []),
Err;
Res ->
Res
end;
Res ->
Res
end).

-export([
% open/1,
% write/2,
sync/1
]).

sync(Fd) ->
?HANDLE_EAGAIN(file:sync(Fd)).
19 changes: 0 additions & 19 deletions src/ra_leaderboard.erl
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,3 @@ lookup(ClusterName) ->
error:badarg ->
undefined
end.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

lookup_leader_test() ->
ClusterName = <<"mah-cluster">>,
?assertEqual(undefined, lookup_leader(ClusterName)),
init(),
?assertEqual(undefined, lookup_leader(ClusterName)),
Me = {me, node()},
record(ClusterName, Me, [Me]),
?assertEqual(Me, lookup_leader(ClusterName)),
?assertEqual([Me], lookup_members(ClusterName)),
You = {you, node()},
record(ClusterName, You, [Me, You]),
?assertEqual(You, lookup_leader(ClusterName)),
?assertEqual([Me, You], lookup_members(ClusterName)),

ok.
-endif.
2 changes: 1 addition & 1 deletion src/ra_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ sync_file(Name) ->
-spec sync_and_close_fd(file:fd()) ->
ok | file_err().
sync_and_close_fd(Fd) ->
case file:sync(Fd) of
case ra_file:sync(Fd) of
ok ->
file:close(Fd);
Err ->
Expand Down
152 changes: 122 additions & 30 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
sparse_read/2,
last_index_term/1,
set_last_index/2,
reset_to_last_known_written/1,
handle_event/2,
last_written/1,
fetch/2,
Expand Down Expand Up @@ -47,7 +48,8 @@

% external reader
register_reader/2,
readers/1
readers/1,
tick/2
]).

-include("ra.hrl").
Expand All @@ -56,6 +58,7 @@
-define(MIN_SNAPSHOT_INTERVAL, 4096).
-define(MIN_CHECKPOINT_INTERVAL, 16384).
-define(LOG_APPEND_TIMEOUT, 5000).
-define(WAL_RESEND_TIMEOUT, 5000).

-type ra_meta_key() :: atom().
-type segment_ref() :: {From :: ra_index(), To :: ra_index(),
Expand Down Expand Up @@ -104,7 +107,8 @@
% if this is set a snapshot write is in progress for the
% index specified
cache = ra_log_cache:init() :: ra_log_cache:state(),
last_resend_time :: option(integer()),
last_resend_time :: option({integer(), WalPid :: pid() | undefined}),
last_wal_write :: {pid(), Ms :: integer()},
reader :: ra_log_reader:state(),
readers = [] :: [pid()]
}).
Expand Down Expand Up @@ -233,7 +237,8 @@ init(#{uid := UId,
first_index = max(SnapIdx + 1, FirstIdx),
last_index = max(SnapIdx, LastIdx0),
reader = Reader,
snapshot_state = SnapshotState
snapshot_state = SnapshotState,
last_wal_write = {whereis(Wal), now_ms()}
},
put_counter(Cfg, ?C_RA_SVR_METRIC_SNAPSHOT_INDEX, SnapIdx),
LastIdx = State000#?MODULE.last_index,
Expand Down Expand Up @@ -307,9 +312,13 @@ write([{FstIdx, _, _} = First | Rest] = Entries,
% it is the next entry after a snapshot
% we need to tell the wal to truncate as we
% are not going to receive any entries prior to the snapshot
State0 = wal_truncate_write(State00, First),
% write the rest normally
write_entries(Rest, State0);
try wal_truncate_write(State00, First) of
State0 ->
% write the rest normally
write_entries(Rest, State0)
catch error:wal_down ->
{error, wal_down}
end;
_ ->
write_entries(Entries, State00)
end;
Expand Down Expand Up @@ -431,6 +440,26 @@ set_last_index(Idx, #?MODULE{cfg = Cfg,
last_written_index_term = {LWIdx, LWTerm}}}
end.

%% this function forces both last_index and last_written_index_term to
%% the last know index to be written to the wal.
%% This is only used after the wal has been detected down
%% to try to avoid ever having to resend data to the wal
-spec reset_to_last_known_written(state()) -> state().
reset_to_last_known_written(#?MODULE{cfg = Cfg,
cache = Cache0,
last_index = LastIdx,
last_written_index_term = LW} = State0) ->
{Idx, Term, State} = last_index_term_in_wal(LastIdx, State0),
?DEBUG("~ts ~s: index: ~b term: ~b: previous ~w",
[Cfg#cfg.log_id, ?FUNCTION_NAME, Idx, Term, LW]),
Cache = ra_log_cache:set_last(Idx, Cache0),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, Idx),
State#?MODULE{last_index = Idx,
last_term = Term,
cache = Cache,
last_written_index_term = {Idx, Term}}.

-spec handle_event(event_body(), state()) ->
{state(), [effect()]}.
handle_event({written, {FromIdx, _ToIdx, _Term}},
Expand Down Expand Up @@ -480,15 +509,15 @@ handle_event({written, {FromIdx, ToIdx0, Term}},
[State#?MODULE.cfg#cfg.log_id, Term, ToIdx, OtherTerm]),
{State, []}
end;
handle_event({written, {FromIdx, _, _}},
handle_event({written, {FromIdx, _, _Term}},
#?MODULE{cfg = #cfg{log_id = LogId},
last_written_index_term = {LastWrittenIdx, _}} = State0)
last_written_index_term = {LastWrittenIdx, _}} = State)
when FromIdx > LastWrittenIdx + 1 ->
% leaving a gap is not ok - resend from cache
% leaving a gap is not ok - may need to resend from cache
Expected = LastWrittenIdx + 1,
?DEBUG("~ts: ra_log: written gap detected at ~b expected ~b!",
[LogId, FromIdx, Expected]),
{resend_from(Expected, State0), []};
?INFO("~ts: ra_log: written gap detected at ~b expected ~b!",
[LogId, FromIdx, Expected]),
{resend_from(Expected, State), []};
handle_event({truncate_cache, FromIdx, ToIdx}, State) ->
truncate_cache(FromIdx, ToIdx, State, []);
handle_event(flush_cache, State) ->
Expand Down Expand Up @@ -711,6 +740,24 @@ flush_cache(#?MODULE{cache = Cache} = State) ->
needs_cache_flush(#?MODULE{cache = Cache}) ->
ra_log_cache:needs_flush(Cache).

-spec tick(Now :: integer(), state()) -> state().
tick(Now, #?MODULE{cfg = #cfg{wal = Wal},
cache = Cache,
last_written_index_term = {LastWrittenIdx, _},
last_wal_write = {WalPid, Ms}} = State) ->
CurWalPid = whereis(Wal),
case Now > Ms + ?WAL_RESEND_TIMEOUT andalso
CurWalPid =/= undefined andalso
CurWalPid =/= WalPid andalso
ra_log_cache:size(Cache) > 0 of
true ->
%% the wal has restarted, it has been at least 5s and there are
%% cached items, resend them
resend_from(LastWrittenIdx + 1, State);
false ->
State
end.

suggest_snapshot0(SnapKind, Idx, Cluster, MacVersion, MacState, State0) ->
ClusterServerIds = maps:map(fun (_, V) ->
maps:with([voter_status], V)
Expand Down Expand Up @@ -805,6 +852,7 @@ overview(#?MODULE{last_index = LastIndex,
last_written_index_term = LWIT,
snapshot_state = SnapshotState,
reader = Reader,
last_wal_write = {_LastPid, LastMs},
cache = Cache}) ->
#{type => ?MODULE,
last_index => LastIndex,
Expand All @@ -821,7 +869,8 @@ overview(#?MODULE{last_index = LastIndex,
undefined -> undefined;
{I, _} -> I
end,
cache_size => ra_log_cache:size(Cache)
cache_size => ra_log_cache:size(Cache),
last_wal_write => LastMs
}.

-spec write_config(ra_server:config(), state()) -> ok.
Expand Down Expand Up @@ -925,7 +974,7 @@ delete_segments(SnapIdx, #?MODULE{cfg = #cfg{log_id = LogId,
UId, Pivot)
end),
Active = ra_log_reader:segment_refs(Reader),
?DEBUG("~ts: ~b obsolete segments at ~b - remaining: ~b, pivot ~w",
?DEBUG("~ts: ~b obsolete segments at ~b - remaining: ~b, pivot ~0p",
[LogId, length(Obsolete), SnapIdx, length(Active), Pivot]),
State = State0#?MODULE{reader = Reader},
{State, log_update_effects(Readers, Pid, State)}
Expand All @@ -938,24 +987,47 @@ wal_truncate_write(#?MODULE{cfg = #cfg{uid = UId,
% this is the next write after a snapshot was taken or received
% we need to indicate to the WAL that this may be a non-contiguous write
% and that prior entries should be considered stale
ok = ra_log_wal:truncate_write({UId, self()}, Wal, Idx, Term, Cmd),
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1),
ok = put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
State#?MODULE{last_index = Idx, last_term = Term,
cache = ra_log_cache:add(Entry, Cache)}.
case ra_log_wal:truncate_write({UId, self()}, Wal, Idx, Term, Cmd) of
{ok, Pid} ->
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
State#?MODULE{last_index = Idx, last_term = Term,
last_wal_write = {Pid, now_ms()},
cache = ra_log_cache:add(Entry, Cache)};
{error, wal_down} ->
error(wal_down)
end.

wal_write(#?MODULE{cfg = #cfg{uid = UId,
wal = Wal} = Cfg,
cache = Cache} = State,
{Idx, Term, Cmd} = Entry) ->
case ra_log_wal:write({UId, self()}, Wal, Idx, Term, Cmd) of
ok ->
{ok, Pid} ->
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
State#?MODULE{last_index = Idx, last_term = Term,
last_wal_write = {Pid, now_ms()},
cache = ra_log_cache:add(Entry, Cache)};
{error, wal_down} ->
exit(wal_down)
error(wal_down)
end.

%% unly used by resend to wal functionality and doesn't set the cache as it
%% is already set
wal_rewrite(#?MODULE{cfg = #cfg{uid = UId,
wal = Wal} = Cfg} = State,
{Idx, Term, Cmd}) ->
case ra_log_wal:write({UId, self()}, Wal, Idx, Term, Cmd) of
{ok, Pid} ->
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
State#?MODULE{last_index = Idx,
last_term = Term,
last_wal_write = {Pid, now_ms()}
};
{error, wal_down} ->
error(wal_down)
end.

wal_write_batch(#?MODULE{cfg = #cfg{uid = UId,
Expand All @@ -972,14 +1044,15 @@ wal_write_batch(#?MODULE{cfg = #cfg{uid = UId,

[{_, _, LastIdx, LastTerm, _} | _] = WalCommands,
case ra_log_wal:write_batch(Wal, lists:reverse(WalCommands)) of
ok ->
{ok, Pid} ->
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, Num),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, LastIdx),
State#?MODULE{last_index = LastIdx,
last_term = LastTerm,
last_wal_write = {Pid, now_ms()},
cache = Cache};
{error, wal_down} ->
exit(wal_down)
error(wal_down)
end.

truncate_cache(_FromIdx, ToIdx,
Expand All @@ -1002,7 +1075,7 @@ resend_from(Idx, #?MODULE{cfg = #cfg{uid = UId}} = State0) ->
try resend_from0(Idx, State0) of
State -> State
catch
exit:wal_down ->
error:wal_down ->
?WARN("~ts: ra_log: resending from ~b failed with wal_down",
[UId, Idx]),
State0
Expand All @@ -1017,15 +1090,18 @@ resend_from0(Idx, #?MODULE{cfg = Cfg,
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_RESENDS, LastIdx - Idx + 1),
lists:foldl(fun (I, Acc) ->
{I, T, C} = ra_log_cache:fetch(I, Cache),
wal_write(Acc, {I, T, C})
wal_rewrite(Acc, {I, T, C})
end,
State#?MODULE{last_resend_time = erlang:system_time(seconds)},
State#?MODULE{last_resend_time = {erlang:system_time(seconds),
whereis(Cfg#cfg.wal)}},
lists:seq(Idx, LastIdx));
resend_from0(Idx, #?MODULE{last_resend_time = LastResend,
resend_from0(Idx, #?MODULE{last_resend_time = {LastResend, WalPid},
cfg = #cfg{resend_window_seconds = ResendWindow}} = State) ->
case erlang:system_time(seconds) > LastResend + ResendWindow of
case erlang:system_time(seconds) > LastResend + ResendWindow orelse
(is_pid(WalPid) andalso not is_process_alive(WalPid)) of
true ->
% it has been more than a minute since last resend
% it has been more than the resend window since last resend
% _or_ the wal has been restarted since then
% ok to try again
resend_from(Idx, State#?MODULE{last_resend_time = undefined});
false ->
Expand All @@ -1051,7 +1127,7 @@ write_entries([{FstIdx, _, _} | Rest] = Entries, State0) ->
try
{ok, wal_write_batch(State0, Entries)}
catch
exit:wal_down ->
error:wal_down ->
{error, wal_down}
end;
Error ->
Expand Down Expand Up @@ -1161,6 +1237,22 @@ maps_with_values(Keys, Map) ->
end
end, [], Keys).

last_index_term_in_wal(Idx, #?MODULE{last_written_index_term = {Idx, Term}} = State) ->
% we reached the lower limit which is the last known written index
{Idx, Term, State};
last_index_term_in_wal(Idx, #?MODULE{reader = Reader0} = State) ->
case ra_log_reader:fetch_term(Idx, Reader0) of
{undefined, Reader} ->
last_index_term_in_wal(Idx-1, State#?MODULE{reader = Reader});
{Term, Reader} ->
%% if it can be read when bypassing the local cache it is in the
%% wal
{Idx, Term, State#?MODULE{reader = Reader}}
end.

now_ms() ->
erlang:system_time(millisecond).

%%%% TESTS

-ifdef(TEST).
Expand Down
Loading

0 comments on commit 80d041c

Please sign in to comment.