From 5b7a26555b6aeefda143297d92e543e276046d07 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 24 May 2024 11:48:21 +0100 Subject: [PATCH] WAL: drop entries with indexes lower or equal to last snapshot In situations where the WAL lags behind but entries are still committed the WAL will still write entries with indexes lower than the last snapshot index for a given server. To avoid this unnessary work we no check the ra_log_snapshot_state table for each write and do not write them if they are lower than the snapshot index. This allows the wal a better chance to catch up on it's backlog. Some changes needed to be made to the ra_log also to handle this new behaviour. --- src/ra_log.erl | 25 ++++++++++-- src/ra_log_cache.erl | 6 ++- src/ra_log_wal.erl | 80 +++++++++++++++++++++++++-------------- src/ra_snapshot.erl | 9 ++++- test/ra_log_2_SUITE.erl | 57 ++++++++++++++++++++++++++++ test/ra_log_wal_SUITE.erl | 25 +++++++++++- 6 files changed, 164 insertions(+), 38 deletions(-) diff --git a/src/ra_log.erl b/src/ra_log.erl index b4a2bf0d..2c16673c 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -566,6 +566,8 @@ handle_event({segments, Tid, NewSegs}, handle_event({snapshot_written, {SnapIdx, _} = Snap, SnapKind}, #?MODULE{cfg = Cfg, first_index = FstIdx, + last_index = LstIdx, + last_written_index_term = {LastWrittenIdx, _} = LWIdxTerm0, snapshot_state = SnapState0} = State0) %% only update snapshot if it is newer than the last snapshot when SnapIdx >= FstIdx -> @@ -587,10 +589,23 @@ handle_event({snapshot_written, {SnapIdx, _} = Snap, SnapKind}, CPEffects = [{delete_snapshot, ra_snapshot:directory(SnapState, checkpoint), Checkpoint} || Checkpoint <- Checkpoints], - Effects = [DeleteCurrentSnap | CPEffects] ++ Effects0, - %% do not set last written index here as the snapshot may - %% be for a past index + Effects1 = [DeleteCurrentSnap | CPEffects] ++ Effects0, + + {LWIdxTerm, Effects} = + case LastWrittenIdx > SnapIdx of + true -> + {LWIdxTerm0, Effects1}; + false -> + {Snap, + [{next_event, + {ra_log_event, + {truncate_cache, LastWrittenIdx, SnapIdx}}} + | Effects1]} + end, + {State#?MODULE{first_index = SnapIdx + 1, + last_index = max(LstIdx, SnapIdx), + last_written_index_term = LWIdxTerm, snapshot_state = SnapState}, Effects}; checkpoint -> put_counter(Cfg, ?C_RA_SVR_METRIC_CHECKPOINT_INDEX, SnapIdx), @@ -870,6 +885,7 @@ overview(#?MODULE{last_index = LastIndex, {I, _} -> I end, cache_size => ra_log_cache:size(Cache), + cache_range => ra_log_cache:range(Cache), last_wal_write => LastMs }. @@ -1058,7 +1074,8 @@ wal_write_batch(#?MODULE{cfg = #cfg{uid = UId, truncate_cache(_FromIdx, ToIdx, #?MODULE{cache = Cache} = State, Effects) -> - {State#?MODULE{cache = ra_log_cache:trim(ToIdx, Cache)}, Effects}. + CacheAfter = ra_log_cache:trim(ToIdx, Cache), + {State#?MODULE{cache = CacheAfter}, Effects}. maybe_append_first_entry(State0 = #?MODULE{last_index = -1}) -> State = append({0, 0, undefined}, State0), diff --git a/src/ra_log_cache.erl b/src/ra_log_cache.erl index bf961fb3..1eeec3f0 100644 --- a/src/ra_log_cache.erl +++ b/src/ra_log_cache.erl @@ -42,8 +42,7 @@ init() -> -spec reset(state()) -> state(). reset(#?MODULE{range = undefined} = State) -> State; -reset(#?MODULE{tbl = Tid, - cache = _Cache} = State) -> +reset(#?MODULE{tbl = Tid} = State) -> true = ets:delete_all_objects(Tid), State#?MODULE{cache = #{}, range = undefined}. @@ -133,6 +132,9 @@ trim(To, #?MODULE{tbl = Tid, NewRange = {To + 1, RangeTo}, State#?MODULE{range = NewRange, cache = cache_without(From, To, Cache, Tid)}; +trim(To, #?MODULE{range = {From, _RangeTo}} = State) + when To < From -> + State; trim(_To, State) -> reset(State). diff --git a/src/ra_log_wal.erl b/src/ra_log_wal.erl index 3affd577..bd7b7635 100644 --- a/src/ra_log_wal.erl +++ b/src/ra_log_wal.erl @@ -91,7 +91,8 @@ names :: ra_system:names(), explicit_gc = false :: boolean(), pre_allocate = false :: boolean(), - compress_mem_tables = false :: boolean() + compress_mem_tables = false :: boolean(), + ra_log_snapshot_state_tid :: ets:tid() }). -record(wal, {fd :: option(file:io_device()), @@ -280,7 +281,8 @@ init(#{dir := Dir} = Conf0) -> names = Names, explicit_gc = Gc, pre_allocate = PreAllocate, - compress_mem_tables = CompressMemTables}, + compress_mem_tables = CompressMemTables, + ra_log_snapshot_state_tid = ets:whereis(ra_log_snapshot_state)}, try recover_wal(Dir, Conf) of Result -> {ok, Result} @@ -418,7 +420,7 @@ serialize_header(UId, Trunc, {Next, Cache} = WriterCache) -> {Next + 1, Cache#{UId => BinId}}} end. -write_data({UId, _} = Id, Idx, Term, Data0, Trunc, +write_data({UId, _} = Id, Idx, Term, Data0, Trunc, SnapIdx, #state{conf = #conf{compute_checksums = ComputeChecksum}, wal = #wal{writer_name_cache = Cache0, entry_count = Count} = Wal} = State00) -> @@ -427,7 +429,7 @@ write_data({UId, _} = Id, Idx, Term, Data0, Trunc, case should_roll_wal(State00) of true -> State = roll_over(State00), - write_data(Id, Idx, Term, Data0, Trunc, State); + write_data(Id, Idx, Term, Data0, Trunc, SnapIdx, State); false -> EntryData = to_binary(Data0), EntryDataLen = iolist_size(EntryData), @@ -448,17 +450,22 @@ write_data({UId, _} = Id, Idx, Term, Data0, Trunc, <> | Entry], append_data(State0, Id, Idx, Term, Data0, - DataSize, Record, Trunc) + DataSize, Record, Trunc, SnapIdx) end. handle_msg({append, {UId, Pid} = Id, Idx, Term, Entry}, - #state{writers = Writers} = State0) -> + #state{conf = Conf, + writers = Writers} = State0) -> + SnapIdx = snap_idx(Conf, UId), case maps:find(UId, Writers) of + _ when Idx =< SnapIdx -> + %% a snapshot already exists that is higher - just drop the write + State0#state{writers = Writers#{UId => {in_seq, SnapIdx}}}; {ok, {_, PrevIdx}} when Idx =< PrevIdx + 1 -> - write_data(Id, Idx, Term, Entry, false, State0); + write_data(Id, Idx, Term, Entry, false, SnapIdx, State0); error -> - write_data(Id, Idx, Term, Entry, false, State0); + write_data(Id, Idx, Term, Entry, false, SnapIdx, State0); {ok, {out_of_seq, _}} -> % writer is out of seq simply ignore drop the write % TODO: capture metric for dropped writes @@ -472,8 +479,9 @@ handle_msg({append, {UId, Pid} = Id, Idx, Term, Entry}, Pid ! {ra_log_event, {resend_write, PrevIdx + 1}}, State0#state{writers = Writers#{UId => {out_of_seq, PrevIdx}}} end; -handle_msg({truncate, Id, Idx, Term, Entry}, State0) -> - write_data(Id, Idx, Term, Entry, true, State0); +handle_msg({truncate, Id, Idx, Term, Entry}, #state{conf = Conf} = State0) -> + SnapIdx = snap_idx(Conf, Id), + write_data(Id, Idx, Term, Entry, true, SnapIdx, State0); handle_msg(rollover, State) -> roll_over(State). @@ -481,9 +489,9 @@ append_data(#state{conf = #conf{counter = C} = Cfg, file_size = FileSize, batch = Batch0, writers = Writers} = State, - {UId, Pid}, Idx, Term, Entry, DataSize, Data, Truncate) -> + {UId, Pid}, Idx, Term, Entry, DataSize, Data, Truncate, SnapIdx) -> Batch = incr_batch(Cfg, Batch0, UId, Pid, - {Idx, Term, Entry}, Data, Truncate), + {Idx, Term, Entry}, Data, Truncate, SnapIdx), counters:add(C, ?C_BYTES_WRITTEN, DataSize), State#state{file_size = FileSize + DataSize, batch = Batch, @@ -493,13 +501,13 @@ incr_batch(#conf{open_mem_tbls_tid = OpnMemTbl} = Cfg, #batch{writes = Writes, waiting = Waiting0, pending = Pend} = Batch, - UId, Pid, {Idx, Term, _} = Record, Data, Truncate) -> + UId, Pid, {Idx, Term, _} = Record, Data, Truncate, SnapIdx) -> Waiting = case Waiting0 of #{Pid := #batch_writer{tbl_start = TblStart0, tid = _Tid, from = From, inserts = Inserts0} = W} -> - TblStart = table_start(Truncate, Idx, TblStart0), + TblStart = max(SnapIdx, table_start(Truncate, Idx, TblStart0)), Inserts = case Inserts0 of [] -> [Record]; @@ -526,14 +534,16 @@ incr_batch(#conf{open_mem_tbls_tid = OpnMemTbl} = Cfg, {Tid, TblStart} = case ets:lookup(OpnMemTbl, UId) of [{_UId, TblStart0, _TblEnd, T}] -> - {T, table_start(Truncate, Idx, TblStart0)}; + {T, max(SnapIdx, + table_start(Truncate, Idx, TblStart0))}; _ -> %% there is no table so need %% to open one + TS = max(SnapIdx, Idx), T = open_mem_table(Cfg, UId), true = ets:insert_new(OpnMemTbl, - {UId, Idx, Idx - 1, T}), - {T, Idx} + {UId, TS, Idx - 1, T}), + {T, TS} end, Writer = #batch_writer{tbl_start = TblStart, from = Idx, @@ -553,23 +563,27 @@ update_mem_table(#conf{open_mem_tbls_tid = OpnMemTbl} = Cfg, UId, Idx, Term, Entry, Truncate) -> % TODO: if Idx =< First we could truncate the entire table and save % some disk space when it later is flushed to disk + SnapIdx = snap_idx(Cfg, UId), case ets:lookup(OpnMemTbl, UId) of [{_UId, From0, _To, Tid}] -> - true = ets:insert(Tid, {Idx, Term, Entry}), - From = table_start(Truncate, Idx, From0), - % update Last idx for current tbl - % this is how followers overwrite previously seen entries - % TODO: OPTIMISATION - % Writers don't need this updated for every entry. As they keep - % a local cache of unflushed entries it is sufficient to update - % ra_log_open_mem_tables before completing the batch. - % Instead the `From` and `To` could be kept in the batch. - _ = ets:update_element(OpnMemTbl, UId, [{2, From}, {3, Idx}]); - [] -> + case Idx > SnapIdx of + true -> + true = ets:insert(Tid, {Idx, Term, Entry}), + From = table_start(Truncate, Idx, From0), + % update Last idx for current tbl + % this is how followers overwrite previously seen entries + _ = ets:update_element(OpnMemTbl, UId, [{2, From}, {3, Idx}]); + false -> + From = max(SnapIdx, table_start(Truncate, Idx, From0)), + _ = ets:update_element(OpnMemTbl, UId, [{2, From}]) + end; + [] when Idx > SnapIdx -> % open new ets table Tid = open_mem_table(Cfg, UId), true = ets:insert_new(OpnMemTbl, {UId, Idx, Idx, Tid}), - true = ets:insert(Tid, {Idx, Term, Entry}) + true = ets:insert(Tid, {Idx, Term, Entry}); + _ -> + true end. roll_over(#state{conf = #conf{open_mem_tbls_tid = Tbl}} = State0) -> @@ -1004,3 +1018,11 @@ table_start(false, Idx, TblStart) -> min(TblStart, Idx); table_start(true, Idx, _TblStart) -> Idx. + +snap_idx(#conf{ra_log_snapshot_state_tid = Tid}, ServerUId) -> + try ets:lookup_element(Tid, ServerUId, 2) of + Idx -> + Idx + catch _:badarg -> + -1 + end. diff --git a/src/ra_snapshot.erl b/src/ra_snapshot.erl index 98d4ae08..01138228 100644 --- a/src/ra_snapshot.erl +++ b/src/ra_snapshot.erl @@ -190,6 +190,9 @@ find_snapshots(#?MODULE{uid = UId, case pick_first_valid(UId, Module, SnapshotsDir, Snaps) of undefined -> ok = delete_snapshots(SnapshotsDir, Snaps), + %% initialise snapshots table even if no snapshots have been taken + %% this ensure these is an entry when the WAL queries it + true = ets:insert(?ETSTBL, {UId, -1}), State; Current0 -> Current = filename:join(SnapshotsDir, Current0), @@ -293,8 +296,10 @@ directory(#?MODULE{checkpoint_directory = Dir}, checkpoint) -> Dir. -spec last_index_for(ra_uid()) -> option(ra_index()). last_index_for(UId) -> case ets:lookup(?ETSTBL, UId) of - [] -> undefined; - [{_, Index}] -> Index + [{_, Index}] when Index >= 0 -> + Index; + _ -> + undefined end. -spec begin_snapshot(meta(), ReleaseCursorRef :: term(), kind(), state()) -> diff --git a/test/ra_log_2_SUITE.erl b/test/ra_log_2_SUITE.erl index 572537ca..6d39eb18 100644 --- a/test/ra_log_2_SUITE.erl +++ b/test/ra_log_2_SUITE.erl @@ -52,6 +52,7 @@ all_tests() -> sparse_read_out_of_range, sparse_read_out_of_range_2, written_event_after_snapshot, + writes_lower_than_snapshot_index_are_dropped, updated_segment_can_be_read, open_segments_limit, external_reader, @@ -426,6 +427,62 @@ written_event_after_snapshot(Config) -> % false = filelib:is_file(Snap1), ok. +writes_lower_than_snapshot_index_are_dropped(Config) -> + Log0 = ra_log_init(Config, #{min_snapshot_interval => 1}), + Log1 = ra_log:append({1, 1, <<"one">>}, Log0), + Log1b = deliver_all_log_events(ra_log:append({2, 1, <<"two">>}, Log1), 500), + true = erlang:suspend_process(whereis(ra_log_wal)), + Log2 = write_n(3, 500, 1, Log1b), + {Log3, _} = ra_log:update_release_cursor(100, #{}, 1, + <<"100">>, Log2), + Log4 = deliver_all_log_events(Log3, 500), + + Overview = ra_log:overview(Log4), + ?assertMatch(#{last_index := 499, + first_index := 101, + cache_range := {101, 499}, + last_written_index_term := {100, 1}}, Overview), + + true = erlang:resume_process(whereis(ra_log_wal)), + + %% no written notifications for anything lower than the snapshot should + %% be received + Log5 = receive + {ra_log_event, {written, {From, _To, _Term}} = E} + when From == 101 -> + {Log4b, Effs} = ra_log:handle_event(E, Log4), + Log4c = lists:foldl( + fun ({next_event, {ra_log_event, Evt}}, Acc0) -> + {Acc, _} = ra_log:handle_event(Evt, Acc0), + Acc; + (_, Acc) -> + Acc + end, Log4b, Effs), + deliver_all_log_events(Log4c, 200); + {ra_log_event, E} -> + ct:fail("unexpected log event ~p", [E]) + after 500 -> + flush(), + ct:fail("expected log event not received") + end, + OverviewAfter = ra_log:overview(Log5), + ?assertMatch(#{last_index := 499, + first_index := 101, + snapshot_index := 100, + cache_size := 0, + cache_range := undefined, + last_written_index_term := {499, 1}}, OverviewAfter), + erlang:monitor(process, whereis(ra_log_segment_writer)), + exit(whereis(ra_log_wal), kill), + receive + {'DOWN', _, _, _, _} = D -> + ct:fail("DOWN received ~p", [D]) + after 500 -> + ok + end, + flush(), + ok. + updated_segment_can_be_read(Config) -> ra_counters:new(?FUNCTION_NAME, ?RA_COUNTER_FIELDS), Log0 = ra_log_init(Config, diff --git a/test/ra_log_wal_SUITE.erl b/test/ra_log_wal_SUITE.erl index 86ce5481..c00c14e1 100644 --- a/test/ra_log_wal_SUITE.erl +++ b/test/ra_log_wal_SUITE.erl @@ -47,7 +47,8 @@ all_tests() -> recover_with_last_entry_corruption_pre_allocate, checksum_failure_in_middle_of_file_should_fail, recover_with_partial_last_entry, - sys_get_status + sys_get_status, + drop_writes_if_snapshot_has_higher_index ]. groups() -> @@ -108,6 +109,7 @@ init_per_testcase(TestCase, Config) -> max_size_bytes => ?MAX_SIZE_BYTES}, _ = ets:new(ra_open_file_metrics, [named_table, public, {write_concurrency, true}]), _ = ets:new(ra_io_metrics, [named_table, public, {write_concurrency, true}]), + _ = ets:new(ra_log_snapshot_state, [named_table, public, {write_concurrency, true}]), [{ra_log_ets, Ets}, {writer_id, {UId, self()}}, {test_case, TestCase}, @@ -872,6 +874,27 @@ checksum_failure_in_middle_of_file_should_fail(Config) -> meck:unload(), ok. +drop_writes_if_snapshot_has_higher_index(Config) -> + ok = logger:set_primary_config(level, all), + Conf = ?config(wal_conf, Config), + {UId, _} = WriterId = ?config(writer_id, Config), + {ok, Pid} = ra_log_wal:start_link(Conf), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 12, 1, "value"), + {12, 1, "value"} = await_written(WriterId, {12, 12, 1}), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 13, 1, "value2"), + {13, 1, "value2"} = await_written(WriterId, {13, 13, 1}), + + ets:insert(ra_log_snapshot_state, {UId, 20}), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 14, 1, "value2"), + timer:sleep(500), + + undefined = mem_tbl_read(UId, 14), + ra_lib:dump(ets:tab2list(ra_log_open_mem_tables)), + proc_lib:stop(Pid), + [{_, _, _, Tid}] = ets:lookup(ra_log_open_mem_tables, UId), + ?assert(not ets:info(Tid, compressed)), + ok. + empty_mailbox() -> receive _ ->