diff --git a/src/ra_log.erl b/src/ra_log.erl index 60658262..ff08f838 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -20,10 +20,12 @@ sparse_read/2, last_index_term/1, set_last_index/2, + reset_to_last_known_written/1, handle_event/2, last_written/1, fetch/2, fetch_term/2, + fetch_term_no_cache/2, next_index/1, snapshot_state/1, set_snapshot_state/2, @@ -104,7 +106,7 @@ % if this is set a snapshot write is in progress for the % index specified cache = ra_log_cache:init() :: ra_log_cache:state(), - last_resend_time :: option(integer()), + last_resend_time :: option({integer(), WalPid :: pid() | undefined}), reader :: ra_log_reader:state(), readers = [] :: [pid()] }). @@ -431,6 +433,26 @@ set_last_index(Idx, #?MODULE{cfg = Cfg, last_written_index_term = {LWIdx, LWTerm}}} end. +%% this function forces both last_index and last_written_index_term to +%% the last know index to be written to the wal. +%% This is only used after the wal has been detected down +%% to try to avoid ever having to resend data to the wal +-spec reset_to_last_known_written(state()) -> state(). +reset_to_last_known_written(#?MODULE{cfg = Cfg, + cache = Cache0, + last_index = LastIdx, + last_written_index_term = LW} = State0) -> + {Idx, Term, State} = last_index_term_in_wal(LastIdx, State0), + ?DEBUG("~ts ~s: index: ~b term: ~b: previous ~w", + [Cfg#cfg.log_id, ?FUNCTION_NAME, Idx, Term, LW]), + Cache = ra_log_cache:set_last(Idx, Cache0), + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx), + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, Idx), + State#?MODULE{last_index = Idx, + last_term = Term, + cache = Cache, + last_written_index_term = {Idx, Term}}. + -spec handle_event(event_body(), state()) -> {state(), [effect()]}. handle_event({written, {FromIdx, _ToIdx, _Term}}, @@ -480,15 +502,15 @@ handle_event({written, {FromIdx, ToIdx0, Term}}, [State#?MODULE.cfg#cfg.log_id, Term, ToIdx, OtherTerm]), {State, []} end; -handle_event({written, {FromIdx, _, _}}, +handle_event({written, {FromIdx, _, _Term}}, #?MODULE{cfg = #cfg{log_id = LogId}, - last_written_index_term = {LastWrittenIdx, _}} = State0) + last_written_index_term = {LastWrittenIdx, _}} = State) when FromIdx > LastWrittenIdx + 1 -> - % leaving a gap is not ok - resend from cache + % leaving a gap is not ok - may need to resend from cache Expected = LastWrittenIdx + 1, - ?DEBUG("~ts: ra_log: written gap detected at ~b expected ~b!", - [LogId, FromIdx, Expected]), - {resend_from(Expected, State0), []}; + ?INFO("~ts: ra_log: written gap detected at ~b expected ~b!", + [LogId, FromIdx, Expected]), + {resend_from(Expected, State), []}; handle_event({truncate_cache, FromIdx, ToIdx}, State) -> truncate_cache(FromIdx, ToIdx, State, []); handle_event(flush_cache, State) -> @@ -623,6 +645,16 @@ fetch_term(Idx, #?MODULE{cache = Cache, reader = Reader0} = State0) -> {Term, State0} end. +-spec fetch_term_no_cache(ra_index(), state()) -> + {option(ra_term()), state()}. +fetch_term_no_cache(Idx, #?MODULE{last_index = LastIdx, + first_index = FirstIdx} = State0) + when Idx < FirstIdx orelse Idx > LastIdx -> + {undefined, State0}; +fetch_term_no_cache(Idx, #?MODULE{reader = Reader0} = State0) -> + {Term, Reader} = ra_log_reader:fetch_term(Idx, Reader0), + {Term, State0#?MODULE{reader = Reader}}. + -spec snapshot_state(State :: state()) -> ra_snapshot:state(). snapshot_state(State) -> State#?MODULE.?FUNCTION_NAME. @@ -958,6 +990,21 @@ wal_write(#?MODULE{cfg = #cfg{uid = UId, exit(wal_down) end. +%% unly used by resend to wal functionality and doesn't set the cache as it +%% is already set +wal_rewrite(#?MODULE{cfg = #cfg{uid = UId, + wal = Wal} = Cfg} = State, + {Idx, Term, Cmd}) -> + case ra_log_wal:write({UId, self()}, Wal, Idx, Term, Cmd) of + ok -> + ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1), + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx), + State#?MODULE{last_index = Idx, + last_term = Term}; + {error, wal_down} -> + exit(wal_down) + end. + wal_write_batch(#?MODULE{cfg = #cfg{uid = UId, wal = Wal} = Cfg, cache = Cache0} = State, @@ -1017,15 +1064,18 @@ resend_from0(Idx, #?MODULE{cfg = Cfg, ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_RESENDS, LastIdx - Idx + 1), lists:foldl(fun (I, Acc) -> {I, T, C} = ra_log_cache:fetch(I, Cache), - wal_write(Acc, {I, T, C}) + wal_rewrite(Acc, {I, T, C}) end, - State#?MODULE{last_resend_time = erlang:system_time(seconds)}, + State#?MODULE{last_resend_time = {erlang:system_time(seconds), + whereis(Cfg#cfg.wal)}}, lists:seq(Idx, LastIdx)); -resend_from0(Idx, #?MODULE{last_resend_time = LastResend, +resend_from0(Idx, #?MODULE{last_resend_time = {LastResend, WalPid}, cfg = #cfg{resend_window_seconds = ResendWindow}} = State) -> - case erlang:system_time(seconds) > LastResend + ResendWindow of + case erlang:system_time(seconds) > LastResend + ResendWindow orelse + (is_pid(WalPid) andalso not is_process_alive(WalPid)) of true -> - % it has been more than a minute since last resend + % it has been more than the resend window since last resend + % _or_ the wal has been restarted since then % ok to try again resend_from(Idx, State#?MODULE{last_resend_time = undefined}); false -> @@ -1161,6 +1211,19 @@ maps_with_values(Keys, Map) -> end end, [], Keys). +last_index_term_in_wal(Idx, #?MODULE{last_written_index_term = {Idx, Term}} = State) -> + % we reached the lower limit which is the last known written index + {Idx, Term, State}; +last_index_term_in_wal(Idx, #?MODULE{reader = Reader0} = State) -> + case ra_log_reader:fetch_term(Idx, Reader0) of + {undefined, Reader} -> + last_index_term_in_wal(Idx-1, State#?MODULE{reader = Reader}); + {Term, Reader} -> + %% if it can be read when bypassing the local cache it is in the + %% wal + {Idx, Term, State#?MODULE{reader = Reader}} + end. + %%%% TESTS -ifdef(TEST). diff --git a/src/ra_log_cache.erl b/src/ra_log_cache.erl index 545893eb..bf961fb3 100644 --- a/src/ra_log_cache.erl +++ b/src/ra_log_cache.erl @@ -17,7 +17,8 @@ flush/1, needs_flush/1, size/1, - range/1 + range/1, + dump_keys/1 ]). %% holds static or rarely changing fields @@ -169,6 +170,10 @@ size(#?MODULE{tbl = Tid, cache = Cache}) -> range(#?MODULE{range = Range}) -> Range. +dump_keys(#?MODULE{tbl = Tid, cache = Cache}) -> + EtsKeys = [K || {K, _, _} <- ets:tab2list(Tid)], + {maps:keys(Cache), EtsKeys}. + %% INTERNAL cache_without(FromIdx, Idx, Cache, _Tid) diff --git a/src/ra_log_segment.erl b/src/ra_log_segment.erl index acab93ae..ec9076b0 100644 --- a/src/ra_log_segment.erl +++ b/src/ra_log_segment.erl @@ -363,8 +363,7 @@ fold0(Cfg, Cache0, Idx, FinalIdx, Index, Fun, AccFun, Acc0) -> Cfg#cfg.filename}) end; _ -> - exit({ra_log_segment_unexpected_eof, Idx, - Cfg#cfg.filename}) + exit({missing_key, Idx, Cfg#cfg.filename}) end. -spec range(state()) -> option({ra_index(), ra_index()}). @@ -450,9 +449,12 @@ dump_index(File) -> end. dump(File) -> + dump(File, fun (B) -> B end). + +dump(File, Fun) -> {ok, S0} = open(File, #{mode => read}), {Idx, Last} = range(S0), - L = fold(S0, Idx, Last, fun erlang:binary_to_term/1, + L = fold(S0, Idx, Last, Fun, fun (E, A) -> [E | A] end, []), close(S0), lists:reverse(L). diff --git a/src/ra_log_wal.erl b/src/ra_log_wal.erl index 29448ddb..2908727d 100644 --- a/src/ra_log_wal.erl +++ b/src/ra_log_wal.erl @@ -148,7 +148,7 @@ -type wal_op() :: {cast, wal_command()} | {call, from(), wal_command()}. --spec write(writer_id(), atom(), ra_index(), ra_term(), term()) -> +-spec write(writer_id(), atom() | pid(), ra_index(), ra_term(), term()) -> ok | {error, wal_down}. write(From, Wal, Idx, Term, Cmd) -> named_cast(Wal, {append, From, Idx, Term, Cmd}). @@ -294,7 +294,8 @@ handle_batch(Ops, #state{conf = #conf{explicit_gc = Gc}} = State0) -> end, {ok, Actions, complete_batch(State)}. -terminate(_Reason, State) -> +terminate(Reason, State) -> + ?DEBUG("wal: terminating with ~W", [Reason, 20]), _ = cleanup(State), ok. diff --git a/src/ra_server.erl b/src/ra_server.erl index ac02ba19..1c15cb4c 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -1047,9 +1047,24 @@ handle_follower(#append_entries_rpc{term = Term, {NextState, State, [{next_event, {ra_log_event, flush_cache}} | Effects]}; {error, wal_down} -> + %% at this point we know the wal process exited + %% but we dont know exactly which in flight messages + %% made it to the wal before it crashed. + %% we can check which entries actually made it to the + %% wal / mem_tables and revert the last_index and last written + %% index that should avoid the need to resend entries + %% after writing a gap into the log + %% Note that the wal writes and syncs to disk _before_ + %% updating the ETS tables so this is perfectly ok + %% TODO: check this doesn't affect state machine + %% application as applied index could be higher + %% than written (if consensus has already been acheived from + %% other members) + Log = ra_log:reset_to_last_known_written(Log1), {await_condition, - State1#{log => Log1, - condition => fun wal_down_condition/2}, + State1#{log => Log, + condition => fun wal_down_condition/2 + }, Effects0}; {error, _} = Err -> exit(Err) @@ -1378,6 +1393,7 @@ handle_await_condition(Msg, #{condition := Cond} = State0) -> {true, State} -> {follower, State, [{next_event, Msg}]}; {false, State} -> + %% do not log unhandled messages as they are often expected % log_unhandled_msg(await_condition, Msg, State), {await_condition, State, []} end. diff --git a/test/coordination_SUITE.erl b/test/coordination_SUITE.erl index aa88d262..c13ea080 100644 --- a/test/coordination_SUITE.erl +++ b/test/coordination_SUITE.erl @@ -48,7 +48,8 @@ all_tests() -> bench, disconnected_node_catches_up, key_metrics, - recover_from_checkpoint + recover_from_checkpoint, + segment_writer_crash ]. groups() -> @@ -785,6 +786,106 @@ recover_from_checkpoint(Config) -> [ok = slave:stop(S) || {_, S} <- ServerIds], ok. +segment_writer_crash(Config) -> + %% this test crashes the segment writer for a follower node whilst the + %% ra cluster is active and receiving and replicating commands. + %% it tests the segment writer and wal is able to recover without the + %% follower crashing. + %% Finally we stop and restart the follower to make sure it can recover + %% correactly and that the log data contains no missing entries + PrivDir = ?config(data_dir, Config), + ClusterName = ?config(cluster_name, Config), + ServerNames = [s1, s2, s3], + ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- ServerNames], + Configs = [begin + UId = atom_to_binary(Name, utf8), + #{cluster_name => ClusterName, + id => NodeId, + uid => UId, + initial_members => ServerIds, + machine => {module, ?MODULE, #{}}, + log_init_args => #{uid => UId, + min_snapshot_interval => 5}} + end || {Name, _Node} = NodeId <- ServerIds], + {ok, Started, []} = ra:start_cluster(?SYS, Configs), + + + {ok, _, Leader} = ra:members(hd(Started)), + [{FollowerName, FollowerNode} = Follower, _] = lists:delete(Leader, Started), + + Data = crypto:strong_rand_bytes(10_000), + WriterFun = fun Recur() -> + {ok, _, _} = ra:process_command(Leader, {?FUNCTION_NAME, Data}), + receive + stop -> + ok + after 1 -> + Recur() + end + end, + FollowerPid = ct_rpc:call(FollowerNode, erlang, whereis, [FollowerName]), + ?assert(is_pid(FollowerPid)), + + AwaitReplicated = fun () -> + LastIdxs = + [begin + {ok, #{current_term := T, + log := #{last_index := L}}, _} = + ra:member_overview(S), + {T, L} + end || {_, _N} = S <- ServerIds], + 1 == length(lists:usort(LastIdxs)) + end, + [begin + ct:pal("running iteration ~b", [I]), + + WriterPid = spawn(WriterFun), + timer:sleep(rand:uniform(500) + 5_000), + + case I rem 2 == 0 of + true -> + ct:pal("killing segment writer"), + _ = ct_rpc:call(FollowerNode, ra_log_wal, force_rollover, [ra_log_wal]), + timer:sleep(10), + Pid = ct_rpc:call(FollowerNode, erlang, whereis, [ra_log_segment_writer]), + true = ct_rpc:call(FollowerNode, erlang, exit, [Pid, kill]); + false -> + ct:pal("killing wal"), + Pid = ct_rpc:call(FollowerNode, erlang, whereis, [ra_log_wal]), + true = ct_rpc:call(FollowerNode, erlang, exit, [Pid, kill]) + end, + + + timer:sleep(1000), + WriterPid ! stop, + await_condition(fun () -> not is_process_alive(WriterPid) end, 1000), + + %% assert stuff + await_condition(AwaitReplicated, 100), + ?assertMatch({ok, #{log := #{cache_size := 0}}, _}, + ra:member_overview(Follower)), + %% follower hasn't crashed + ?assertEqual(FollowerPid, ct_rpc:call(FollowerNode, erlang, whereis, + [FollowerName])) + end || I <- lists:seq(1, 10)], + + %% stop and restart the follower + ok = ra:stop_server(Follower), + ok = ra:restart_server(Follower), + + await_condition(AwaitReplicated, 100), + + _ = ct_rpc:call(FollowerNode, ra_log_wal, force_rollover, [ra_log_wal]), + + ok = ra:stop_server(Follower), + ok = ra:restart_server(Follower), + + await_condition(AwaitReplicated, 100), + + + [ok = slave:stop(S) || {_, S} <- ServerIds], + ok. + %% Utility test_local_msg(Leader, ReceiverNode, ExpectedSenderNode, CmdTag, Opts0) -> @@ -855,6 +956,9 @@ search_paths() -> code:get_path()). start_follower(N, PrivDir) -> + start_follower(N, PrivDir, []). + +start_follower(N, PrivDir, Env) -> Dir0 = filename:join(PrivDir, N), Dir = "'\"" ++ Dir0 ++ "\"'", Host = get_current_host(), @@ -862,7 +966,7 @@ start_follower(N, PrivDir) -> ct:pal("starting secondary node with ~ts on host ~ts for node ~ts", [Pa, Host, node()]), {ok, S} = slave:start_link(Host, N, Pa), ok = ct_rpc:call(S, ?MODULE, node_setup, [PrivDir]), - ok = erpc:call(S, ra, start, []), + {ok, _} = erpc:call(S, ra, start, [Env]), ok = ct_rpc:call(S, logger, set_primary_config, [level, all]), S. @@ -901,6 +1005,8 @@ apply(#{index := Idx}, checkpoint, State) -> {State, ok, [{checkpoint, Idx, CheckpointState}]}; apply(#{index := Idx}, promote_checkpoint, State) -> {State, ok, [{release_cursor, Idx}]}; +apply(#{index := _Idx}, {segment_writer_crash, _}, State) -> + {State, ok, []}; apply(#{index := Idx}, _Cmd, State) -> {State, ok, [{release_cursor, Idx, State}]}. diff --git a/test/ra_log_segment_SUITE.erl b/test/ra_log_segment_SUITE.erl index 6734e6e8..9106dc4a 100644 --- a/test/ra_log_segment_SUITE.erl +++ b/test/ra_log_segment_SUITE.erl @@ -71,25 +71,57 @@ open_close_persists_max_count(Config) -> ok. corrupted_segment(Config) -> - % tests items are bing persisted and index can be recovered Dir = ?config(data_dir, Config), Fn = filename:join(Dir, "seg1.seg"), - Data = make_data(1024), + % Data = make_data(1024), + Data = <<"banana">>, ok = open_write_close(1, 2, Data, Fn), %% truncate file a bit to simulate lost bytes - {ok, Fd} = file:open(Fn, [read, write, raw, binary]), - {ok, Pos} = file:position(Fd, eof), - {ok, _} = file:position(Fd, Pos -2), - ok = file:truncate(Fd), - ok = file:close(Fd), + truncate(Fn, {eof, -2}), + % ct:pal("DUMP PRE ~p", [ra_log_segment:dump_index(Fn)]), + %% check that the current state throws a missing key + {ok, SegR0} = ra_log_segment:open(Fn, #{mode => read}), + ?assertExit({missing_key, 2}, + read_sparse(SegR0, [1, 2])), + + %% rewrite items as would happen if error was encountered + ok = open_write_close(1, 2, Data, Fn), + + % ct:pal("DUMP ~p", [ra_log_segment:dump(Fn)]), + % {ok, SegR} = ra_log_segment:open(Fn, #{mode => read}), + % write_trunc_until_full(Fn), {ok, SegR} = ra_log_segment:open(Fn, #{mode => read}), + ct:pal("Range ~p", [ra_log_segment:segref(SegR)]), + ct:pal("SegR ~p", [SegR]), + [{1, 2, Data}] = + ra_log_segment:fold(SegR, 1, 1, + fun ra_lib:id/1, + fun (E, A) -> [E | A] end, + []), %% for now we are just going to exit when reaching this point %% in the future we can find a strategy for handling this case - ?assertExit({missing_key, 2}, - read_sparse(SegR, [1, 2])), ok. +truncate(Fn, Pos) -> + {ok, Fd} = file:open(Fn, [read, write, raw, binary]), + {ok, _Pos} = file:position(Fd, Pos), + ok = file:truncate(Fd), + ok = file:close(Fd), + ok. + +write_trunc_until_full(Fn) -> + {ok, Seg0} = ra_log_segment:open(Fn), + case ra_log_segment:append(Seg0, 1, 2, <<"banana">>) of + {ok, Seg1} -> + ok = ra_log_segment:close(Seg1), + truncate(Fn, {eof, -2}), + write_trunc_until_full(Fn); + {error, full} -> + ok = ra_log_segment:close(Seg0), + ok + end. + large_segment(Config) -> % tests items are bing persisted and index can be recovered diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index 64685dbf..b5bffff2 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -767,6 +767,7 @@ wal_down_condition(_Config) -> % meck:new(ra_log, [passthrough]), meck:expect(ra_log, write, fun (_Es, _L) -> {error, wal_down} end), meck:expect(ra_log, can_write, fun (_L) -> false end), + meck:expect(ra_log, reset_to_last_known_written, fun (L) -> L end), % ra log fails {await_condition, State = #{condition := _}, [{record_leader_msg, _}]}