Skip to content

Commit

Permalink
Merge pull request #445 from rabbitmq/wal-drop-entries-lower-than-snap
Browse files Browse the repository at this point in the history
WAL: drop entries with indexes lower or equal to the last snapshot
  • Loading branch information
kjnilsson authored Jun 6, 2024
2 parents d128c81 + 5b7a265 commit 82329ec
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 38 deletions.
25 changes: 21 additions & 4 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,8 @@ handle_event({segments, Tid, NewSegs},
handle_event({snapshot_written, {SnapIdx, _} = Snap, SnapKind},
#?MODULE{cfg = Cfg,
first_index = FstIdx,
last_index = LstIdx,
last_written_index_term = {LastWrittenIdx, _} = LWIdxTerm0,
snapshot_state = SnapState0} = State0)
%% only update snapshot if it is newer than the last snapshot
when SnapIdx >= FstIdx ->
Expand All @@ -587,10 +589,23 @@ handle_event({snapshot_written, {SnapIdx, _} = Snap, SnapKind},
CPEffects = [{delete_snapshot,
ra_snapshot:directory(SnapState, checkpoint),
Checkpoint} || Checkpoint <- Checkpoints],
Effects = [DeleteCurrentSnap | CPEffects] ++ Effects0,
%% do not set last written index here as the snapshot may
%% be for a past index
Effects1 = [DeleteCurrentSnap | CPEffects] ++ Effects0,

{LWIdxTerm, Effects} =
case LastWrittenIdx > SnapIdx of
true ->
{LWIdxTerm0, Effects1};
false ->
{Snap,
[{next_event,
{ra_log_event,
{truncate_cache, LastWrittenIdx, SnapIdx}}}
| Effects1]}
end,

{State#?MODULE{first_index = SnapIdx + 1,
last_index = max(LstIdx, SnapIdx),
last_written_index_term = LWIdxTerm,
snapshot_state = SnapState}, Effects};
checkpoint ->
put_counter(Cfg, ?C_RA_SVR_METRIC_CHECKPOINT_INDEX, SnapIdx),
Expand Down Expand Up @@ -870,6 +885,7 @@ overview(#?MODULE{last_index = LastIndex,
{I, _} -> I
end,
cache_size => ra_log_cache:size(Cache),
cache_range => ra_log_cache:range(Cache),
last_wal_write => LastMs
}.

Expand Down Expand Up @@ -1058,7 +1074,8 @@ wal_write_batch(#?MODULE{cfg = #cfg{uid = UId,
truncate_cache(_FromIdx, ToIdx,
#?MODULE{cache = Cache} = State,
Effects) ->
{State#?MODULE{cache = ra_log_cache:trim(ToIdx, Cache)}, Effects}.
CacheAfter = ra_log_cache:trim(ToIdx, Cache),
{State#?MODULE{cache = CacheAfter}, Effects}.

maybe_append_first_entry(State0 = #?MODULE{last_index = -1}) ->
State = append({0, 0, undefined}, State0),
Expand Down
6 changes: 4 additions & 2 deletions src/ra_log_cache.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ init() ->
-spec reset(state()) -> state().
reset(#?MODULE{range = undefined} = State) ->
State;
reset(#?MODULE{tbl = Tid,
cache = _Cache} = State) ->
reset(#?MODULE{tbl = Tid} = State) ->
true = ets:delete_all_objects(Tid),
State#?MODULE{cache = #{},
range = undefined}.
Expand Down Expand Up @@ -133,6 +132,9 @@ trim(To, #?MODULE{tbl = Tid,
NewRange = {To + 1, RangeTo},
State#?MODULE{range = NewRange,
cache = cache_without(From, To, Cache, Tid)};
trim(To, #?MODULE{range = {From, _RangeTo}} = State)
when To < From ->
State;
trim(_To, State) ->
reset(State).

Expand Down
80 changes: 51 additions & 29 deletions src/ra_log_wal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@
names :: ra_system:names(),
explicit_gc = false :: boolean(),
pre_allocate = false :: boolean(),
compress_mem_tables = false :: boolean()
compress_mem_tables = false :: boolean(),
ra_log_snapshot_state_tid :: ets:tid()
}).

-record(wal, {fd :: option(file:io_device()),
Expand Down Expand Up @@ -280,7 +281,8 @@ init(#{dir := Dir} = Conf0) ->
names = Names,
explicit_gc = Gc,
pre_allocate = PreAllocate,
compress_mem_tables = CompressMemTables},
compress_mem_tables = CompressMemTables,
ra_log_snapshot_state_tid = ets:whereis(ra_log_snapshot_state)},
try recover_wal(Dir, Conf) of
Result ->
{ok, Result}
Expand Down Expand Up @@ -418,7 +420,7 @@ serialize_header(UId, Trunc, {Next, Cache} = WriterCache) ->
{Next + 1, Cache#{UId => BinId}}}
end.

write_data({UId, _} = Id, Idx, Term, Data0, Trunc,
write_data({UId, _} = Id, Idx, Term, Data0, Trunc, SnapIdx,
#state{conf = #conf{compute_checksums = ComputeChecksum},
wal = #wal{writer_name_cache = Cache0,
entry_count = Count} = Wal} = State00) ->
Expand All @@ -427,7 +429,7 @@ write_data({UId, _} = Id, Idx, Term, Data0, Trunc,
case should_roll_wal(State00) of
true ->
State = roll_over(State00),
write_data(Id, Idx, Term, Data0, Trunc, State);
write_data(Id, Idx, Term, Data0, Trunc, SnapIdx, State);
false ->
EntryData = to_binary(Data0),
EntryDataLen = iolist_size(EntryData),
Expand All @@ -448,17 +450,22 @@ write_data({UId, _} = Id, Idx, Term, Data0, Trunc,
<<Checksum:32/integer, EntryDataLen:32/unsigned>> |
Entry],
append_data(State0, Id, Idx, Term, Data0,
DataSize, Record, Trunc)
DataSize, Record, Trunc, SnapIdx)
end.


handle_msg({append, {UId, Pid} = Id, Idx, Term, Entry},
#state{writers = Writers} = State0) ->
#state{conf = Conf,
writers = Writers} = State0) ->
SnapIdx = snap_idx(Conf, UId),
case maps:find(UId, Writers) of
_ when Idx =< SnapIdx ->
%% a snapshot already exists that is higher - just drop the write
State0#state{writers = Writers#{UId => {in_seq, SnapIdx}}};
{ok, {_, PrevIdx}} when Idx =< PrevIdx + 1 ->
write_data(Id, Idx, Term, Entry, false, State0);
write_data(Id, Idx, Term, Entry, false, SnapIdx, State0);
error ->
write_data(Id, Idx, Term, Entry, false, State0);
write_data(Id, Idx, Term, Entry, false, SnapIdx, State0);
{ok, {out_of_seq, _}} ->
% writer is out of seq simply ignore drop the write
% TODO: capture metric for dropped writes
Expand All @@ -472,18 +479,19 @@ handle_msg({append, {UId, Pid} = Id, Idx, Term, Entry},
Pid ! {ra_log_event, {resend_write, PrevIdx + 1}},
State0#state{writers = Writers#{UId => {out_of_seq, PrevIdx}}}
end;
handle_msg({truncate, Id, Idx, Term, Entry}, State0) ->
write_data(Id, Idx, Term, Entry, true, State0);
handle_msg({truncate, Id, Idx, Term, Entry}, #state{conf = Conf} = State0) ->
SnapIdx = snap_idx(Conf, Id),
write_data(Id, Idx, Term, Entry, true, SnapIdx, State0);
handle_msg(rollover, State) ->
roll_over(State).

append_data(#state{conf = #conf{counter = C} = Cfg,
file_size = FileSize,
batch = Batch0,
writers = Writers} = State,
{UId, Pid}, Idx, Term, Entry, DataSize, Data, Truncate) ->
{UId, Pid}, Idx, Term, Entry, DataSize, Data, Truncate, SnapIdx) ->
Batch = incr_batch(Cfg, Batch0, UId, Pid,
{Idx, Term, Entry}, Data, Truncate),
{Idx, Term, Entry}, Data, Truncate, SnapIdx),
counters:add(C, ?C_BYTES_WRITTEN, DataSize),
State#state{file_size = FileSize + DataSize,
batch = Batch,
Expand All @@ -493,13 +501,13 @@ incr_batch(#conf{open_mem_tbls_tid = OpnMemTbl} = Cfg,
#batch{writes = Writes,
waiting = Waiting0,
pending = Pend} = Batch,
UId, Pid, {Idx, Term, _} = Record, Data, Truncate) ->
UId, Pid, {Idx, Term, _} = Record, Data, Truncate, SnapIdx) ->
Waiting = case Waiting0 of
#{Pid := #batch_writer{tbl_start = TblStart0,
tid = _Tid,
from = From,
inserts = Inserts0} = W} ->
TblStart = table_start(Truncate, Idx, TblStart0),
TblStart = max(SnapIdx, table_start(Truncate, Idx, TblStart0)),
Inserts = case Inserts0 of
[] ->
[Record];
Expand All @@ -526,14 +534,16 @@ incr_batch(#conf{open_mem_tbls_tid = OpnMemTbl} = Cfg,
{Tid, TblStart} =
case ets:lookup(OpnMemTbl, UId) of
[{_UId, TblStart0, _TblEnd, T}] ->
{T, table_start(Truncate, Idx, TblStart0)};
{T, max(SnapIdx,
table_start(Truncate, Idx, TblStart0))};
_ ->
%% there is no table so need
%% to open one
TS = max(SnapIdx, Idx),
T = open_mem_table(Cfg, UId),
true = ets:insert_new(OpnMemTbl,
{UId, Idx, Idx - 1, T}),
{T, Idx}
{UId, TS, Idx - 1, T}),
{T, TS}
end,
Writer = #batch_writer{tbl_start = TblStart,
from = Idx,
Expand All @@ -553,23 +563,27 @@ update_mem_table(#conf{open_mem_tbls_tid = OpnMemTbl} = Cfg,
UId, Idx, Term, Entry, Truncate) ->
% TODO: if Idx =< First we could truncate the entire table and save
% some disk space when it later is flushed to disk
SnapIdx = snap_idx(Cfg, UId),
case ets:lookup(OpnMemTbl, UId) of
[{_UId, From0, _To, Tid}] ->
true = ets:insert(Tid, {Idx, Term, Entry}),
From = table_start(Truncate, Idx, From0),
% update Last idx for current tbl
% this is how followers overwrite previously seen entries
% TODO: OPTIMISATION
% Writers don't need this updated for every entry. As they keep
% a local cache of unflushed entries it is sufficient to update
% ra_log_open_mem_tables before completing the batch.
% Instead the `From` and `To` could be kept in the batch.
_ = ets:update_element(OpnMemTbl, UId, [{2, From}, {3, Idx}]);
[] ->
case Idx > SnapIdx of
true ->
true = ets:insert(Tid, {Idx, Term, Entry}),
From = table_start(Truncate, Idx, From0),
% update Last idx for current tbl
% this is how followers overwrite previously seen entries
_ = ets:update_element(OpnMemTbl, UId, [{2, From}, {3, Idx}]);
false ->
From = max(SnapIdx, table_start(Truncate, Idx, From0)),
_ = ets:update_element(OpnMemTbl, UId, [{2, From}])
end;
[] when Idx > SnapIdx ->
% open new ets table
Tid = open_mem_table(Cfg, UId),
true = ets:insert_new(OpnMemTbl, {UId, Idx, Idx, Tid}),
true = ets:insert(Tid, {Idx, Term, Entry})
true = ets:insert(Tid, {Idx, Term, Entry});
_ ->
true
end.

roll_over(#state{conf = #conf{open_mem_tbls_tid = Tbl}} = State0) ->
Expand Down Expand Up @@ -1004,3 +1018,11 @@ table_start(false, Idx, TblStart) ->
min(TblStart, Idx);
table_start(true, Idx, _TblStart) ->
Idx.

snap_idx(#conf{ra_log_snapshot_state_tid = Tid}, ServerUId) ->
try ets:lookup_element(Tid, ServerUId, 2) of
Idx ->
Idx
catch _:badarg ->
-1
end.
9 changes: 7 additions & 2 deletions src/ra_snapshot.erl
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ find_snapshots(#?MODULE{uid = UId,
case pick_first_valid(UId, Module, SnapshotsDir, Snaps) of
undefined ->
ok = delete_snapshots(SnapshotsDir, Snaps),
%% initialise snapshots table even if no snapshots have been taken
%% this ensure these is an entry when the WAL queries it
true = ets:insert(?ETSTBL, {UId, -1}),
State;
Current0 ->
Current = filename:join(SnapshotsDir, Current0),
Expand Down Expand Up @@ -293,8 +296,10 @@ directory(#?MODULE{checkpoint_directory = Dir}, checkpoint) -> Dir.
-spec last_index_for(ra_uid()) -> option(ra_index()).
last_index_for(UId) ->
case ets:lookup(?ETSTBL, UId) of
[] -> undefined;
[{_, Index}] -> Index
[{_, Index}] when Index >= 0 ->
Index;
_ ->
undefined
end.

-spec begin_snapshot(meta(), ReleaseCursorRef :: term(), kind(), state()) ->
Expand Down
57 changes: 57 additions & 0 deletions test/ra_log_2_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ all_tests() ->
sparse_read_out_of_range,
sparse_read_out_of_range_2,
written_event_after_snapshot,
writes_lower_than_snapshot_index_are_dropped,
updated_segment_can_be_read,
open_segments_limit,
external_reader,
Expand Down Expand Up @@ -426,6 +427,62 @@ written_event_after_snapshot(Config) ->
% false = filelib:is_file(Snap1),
ok.

writes_lower_than_snapshot_index_are_dropped(Config) ->
Log0 = ra_log_init(Config, #{min_snapshot_interval => 1}),
Log1 = ra_log:append({1, 1, <<"one">>}, Log0),
Log1b = deliver_all_log_events(ra_log:append({2, 1, <<"two">>}, Log1), 500),
true = erlang:suspend_process(whereis(ra_log_wal)),
Log2 = write_n(3, 500, 1, Log1b),
{Log3, _} = ra_log:update_release_cursor(100, #{}, 1,
<<"100">>, Log2),
Log4 = deliver_all_log_events(Log3, 500),

Overview = ra_log:overview(Log4),
?assertMatch(#{last_index := 499,
first_index := 101,
cache_range := {101, 499},
last_written_index_term := {100, 1}}, Overview),

true = erlang:resume_process(whereis(ra_log_wal)),

%% no written notifications for anything lower than the snapshot should
%% be received
Log5 = receive
{ra_log_event, {written, {From, _To, _Term}} = E}
when From == 101 ->
{Log4b, Effs} = ra_log:handle_event(E, Log4),
Log4c = lists:foldl(
fun ({next_event, {ra_log_event, Evt}}, Acc0) ->
{Acc, _} = ra_log:handle_event(Evt, Acc0),
Acc;
(_, Acc) ->
Acc
end, Log4b, Effs),
deliver_all_log_events(Log4c, 200);
{ra_log_event, E} ->
ct:fail("unexpected log event ~p", [E])
after 500 ->
flush(),
ct:fail("expected log event not received")
end,
OverviewAfter = ra_log:overview(Log5),
?assertMatch(#{last_index := 499,
first_index := 101,
snapshot_index := 100,
cache_size := 0,
cache_range := undefined,
last_written_index_term := {499, 1}}, OverviewAfter),
erlang:monitor(process, whereis(ra_log_segment_writer)),
exit(whereis(ra_log_wal), kill),
receive
{'DOWN', _, _, _, _} = D ->
ct:fail("DOWN received ~p", [D])
after 500 ->
ok
end,
flush(),
ok.

updated_segment_can_be_read(Config) ->
ra_counters:new(?FUNCTION_NAME, ?RA_COUNTER_FIELDS),
Log0 = ra_log_init(Config,
Expand Down
25 changes: 24 additions & 1 deletion test/ra_log_wal_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ all_tests() ->
recover_with_last_entry_corruption_pre_allocate,
checksum_failure_in_middle_of_file_should_fail,
recover_with_partial_last_entry,
sys_get_status
sys_get_status,
drop_writes_if_snapshot_has_higher_index
].

groups() ->
Expand Down Expand Up @@ -108,6 +109,7 @@ init_per_testcase(TestCase, Config) ->
max_size_bytes => ?MAX_SIZE_BYTES},
_ = ets:new(ra_open_file_metrics, [named_table, public, {write_concurrency, true}]),
_ = ets:new(ra_io_metrics, [named_table, public, {write_concurrency, true}]),
_ = ets:new(ra_log_snapshot_state, [named_table, public, {write_concurrency, true}]),
[{ra_log_ets, Ets},
{writer_id, {UId, self()}},
{test_case, TestCase},
Expand Down Expand Up @@ -872,6 +874,27 @@ checksum_failure_in_middle_of_file_should_fail(Config) ->
meck:unload(),
ok.

drop_writes_if_snapshot_has_higher_index(Config) ->
ok = logger:set_primary_config(level, all),
Conf = ?config(wal_conf, Config),
{UId, _} = WriterId = ?config(writer_id, Config),
{ok, Pid} = ra_log_wal:start_link(Conf),
{ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 12, 1, "value"),
{12, 1, "value"} = await_written(WriterId, {12, 12, 1}),
{ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 13, 1, "value2"),
{13, 1, "value2"} = await_written(WriterId, {13, 13, 1}),

ets:insert(ra_log_snapshot_state, {UId, 20}),
{ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 14, 1, "value2"),
timer:sleep(500),

undefined = mem_tbl_read(UId, 14),
ra_lib:dump(ets:tab2list(ra_log_open_mem_tables)),
proc_lib:stop(Pid),
[{_, _, _, Tid}] = ets:lookup(ra_log_open_mem_tables, UId),
?assert(not ets:info(Tid, compressed)),
ok.

empty_mailbox() ->
receive
_ ->
Expand Down

0 comments on commit 82329ec

Please sign in to comment.