Skip to content

Commit

Permalink
Merge pull request #476 from rabbitmq/log-init-bug
Browse files Browse the repository at this point in the history
Log init bug
  • Loading branch information
kjnilsson authored Nov 14, 2024
2 parents c538a41 + fc94a68 commit 9e4cbb0
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 33 deletions.
53 changes: 33 additions & 20 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,13 @@
-type overview() ::
#{type := ra_log,
last_index := ra_index(),
last_term := ra_term(),
first_index := ra_index(),
last_written_index_term := ra_idxterm(),
num_segments := non_neg_integer(),
open_segments => non_neg_integer(),
snapshot_index => undefined | ra_index(),
snapshot_term => undefined | ra_index(),
mem_table_size => non_neg_integer(),
latest_checkpoint_index => undefined | ra_index(),
atom() => term()}.
Expand Down Expand Up @@ -213,10 +215,8 @@ init(#{uid := UId,
{DeleteSpecs, Mt} = ra_mt:set_first(FirstIdx, Mt0),

ok = exec_mem_table_delete(Names, UId, DeleteSpecs),
Reader0 = ra_log_reader:init(UId, Dir, FirstIdx, MaxOpen, AccessPattern, SegRefs,
Names, Counter),
%% TODO: can there be obsolete segments returned here?
{Reader, []} = ra_log_reader:update_first_index(FirstIdx, Reader0),
Reader = ra_log_reader:init(UId, Dir, MaxOpen, AccessPattern, SegRefs,
Names, Counter),
%% assert there is no gap between the snapshot
%% and the first index in the log
case (FirstIdx - SnapIdx) > 1 of
Expand All @@ -237,13 +237,13 @@ init(#{uid := UId,
counter = Counter,
names = Names},
State0 = #?MODULE{cfg = Cfg,
first_index = max(SnapIdx + 1, FirstIdx),
last_index = max(SnapIdx, LastIdx0),
reader = Reader,
mem_table = Mt,
snapshot_state = SnapshotState,
last_wal_write = {whereis(Wal), now_ms()}
},
first_index = max(SnapIdx + 1, FirstIdx),
last_index = max(SnapIdx, LastIdx0),
reader = Reader,
mem_table = Mt,
snapshot_state = SnapshotState,
last_wal_write = {whereis(Wal), now_ms()}
},
put_counter(Cfg, ?C_RA_SVR_METRIC_SNAPSHOT_INDEX, SnapIdx),
LastIdx = State0#?MODULE.last_index,
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, LastIdx),
Expand Down Expand Up @@ -273,15 +273,20 @@ init(#{uid := UId,
LastWrittenIdx = case ra_log_wal:last_writer_seq(Wal, UId) of
{ok, undefined} ->
%% take last segref index
LastSegRefIdx;
max(SnapIdx, LastSegRefIdx);
{ok, Idx} ->
max(Idx, LastSegRefIdx);
{error, wal_down} ->
?ERROR("~ts: ra_log:init/1 cannot complete as wal process is down.",
[State2#?MODULE.cfg#cfg.log_id]),
exit(wal_down)
end,
{LastWrittenTerm, State3} = fetch_term(LastWrittenIdx, State2),
{LastWrittenTerm, State3} = case LastWrittenIdx of
SnapIdx ->
{SnapTerm, State2};
_ ->
fetch_term(LastWrittenIdx, State2)
end,

LastTerm = ra_lib:default(LastTerm0, -1),
State4 = State3#?MODULE{last_term = LastTerm,
Expand All @@ -292,10 +297,13 @@ init(#{uid := UId,
% and an empty meta data map
State = maybe_append_first_entry(State4),
?DEBUG("~ts: ra_log:init recovered last_index_term ~w"
" first index ~b",
" snapshot_index_term ~w, last_written_index_term ~w",
[State#?MODULE.cfg#cfg.log_id,
last_index_term(State),
State#?MODULE.first_index]),
{SnapIdx, SnapTerm},
State#?MODULE.last_written_index_term
]),
?DEBUG("~ts: ra_log:init overview ~p", [overview(State)]),
element(1, delete_segments(SnapIdx, State)).

-spec close(state()) -> ok.
Expand Down Expand Up @@ -936,23 +944,30 @@ exists({Idx, Term}, Log0) ->

-spec overview(state()) -> overview().
overview(#?MODULE{last_index = LastIndex,
last_term = LastTerm,
first_index = FirstIndex,
last_written_index_term = LWIT,
snapshot_state = SnapshotState,
reader = Reader,
last_wal_write = {_LastPid, LastMs},
mem_table = Mt
}) ->
CurrSnap = ra_snapshot:current(SnapshotState),
#{type => ?MODULE,
last_index => LastIndex,
last_term => LastTerm,
first_index => FirstIndex,
last_written_index_term => LWIT,
num_segments => length(ra_log_reader:segment_refs(Reader)),
open_segments => ra_log_reader:num_open_segments(Reader),
snapshot_index => case ra_snapshot:current(SnapshotState) of
snapshot_index => case CurrSnap of
undefined -> undefined;
{I, _} -> I
end,
snapshot_term => case CurrSnap of
undefined -> undefined;
{_, T} -> T
end,
latest_checkpoint_index =>
case ra_snapshot:latest_checkpoint(SnapshotState) of
undefined -> undefined;
Expand Down Expand Up @@ -1026,14 +1041,13 @@ release_resources(MaxOpenSegments,
directory = Dir,
counter = Counter,
names = Names},
first_index = FstIdx,
reader = Reader} = State) ->
ActiveSegs = ra_log_reader:segment_refs(Reader),
% close all open segments
% deliberately ignoring return value
_ = ra_log_reader:close(Reader),
%% open a new segment with the new max open segment value
State#?MODULE{reader = ra_log_reader:init(UId, Dir, FstIdx, MaxOpenSegments,
State#?MODULE{reader = ra_log_reader:init(UId, Dir, MaxOpenSegments,
AccessPattern,
ActiveSegs, Names, Counter)}.

Expand All @@ -1042,11 +1056,10 @@ release_resources(MaxOpenSegments,
register_reader(Pid, #?MODULE{cfg = #cfg{uid = UId,
directory = Dir,
names = Names},
first_index = Idx,
reader = Reader,
readers = Readers} = State) ->
SegRefs = ra_log_reader:segment_refs(Reader),
NewReader = ra_log_reader:init(UId, Dir, Idx, 1, SegRefs, Names),
NewReader = ra_log_reader:init(UId, Dir, 1, SegRefs, Names),
{State#?MODULE{readers = [Pid | Readers]},
[{reply, {ok, NewReader}},
{monitor, process, log, Pid}]}.
Expand Down
24 changes: 12 additions & 12 deletions src/ra_log_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
-compile(inline_list_funcs).

-export([
init/6,
init/8,
init/5,
init/7,
close/1,
update_segments/2,
handle_log_update/2,
Expand All @@ -37,7 +37,7 @@
-type segment_ref() :: {From :: ra_index(), To :: ra_index(),
File :: string()}.
-record(?STATE, {cfg :: #cfg{},
first_index = 0 :: ra_index(),
% first_index = 0 :: ra_index(),
last_index = 0 :: ra:index(),
segment_refs = [] :: [segment_ref()],
open_segments :: ra_flru:state()
Expand All @@ -52,16 +52,16 @@

%% PUBLIC

-spec init(ra_uid(), file:filename(), ra_index(), non_neg_integer(),
-spec init(ra_uid(), file:filename(), non_neg_integer(),
[segment_ref()], ra_system:names()) -> state().
init(UId, Dir, FirstIdx, MaxOpen, SegRefs, Names) ->
init(UId, Dir, FirstIdx, MaxOpen, random, SegRefs, Names, undefined).
init(UId, Dir, MaxOpen, SegRefs, Names) ->
init(UId, Dir, MaxOpen, random, SegRefs, Names, undefined).

-spec init(ra_uid(), file:filename(), ra_index(), non_neg_integer(),
-spec init(ra_uid(), file:filename(), non_neg_integer(),
access_pattern(),
[segment_ref()], ra_system:names(),
undefined | counters:counters_ref()) -> state().
init(UId, Dir, FirstIdx, MaxOpen, AccessPattern, SegRefs, #{}, Counter)
init(UId, Dir, MaxOpen, AccessPattern, SegRefs, #{}, Counter)
when is_binary(UId) ->
Cfg = #cfg{uid = UId,
counter = Counter,
Expand All @@ -79,7 +79,7 @@ init(UId, Dir, FirstIdx, MaxOpen, AccessPattern, SegRefs, #{}, Counter)
end,
#?STATE{cfg = Cfg,
open_segments = ra_flru:new(MaxOpen, FlruHandler),
first_index = FirstIdx,
% first_index = FirstIdx,
last_index = LastIdx,
segment_refs = SegRefs}.

Expand Down Expand Up @@ -107,7 +107,7 @@ update_segments(NewSegmentRefs,

-spec handle_log_update({ra_log_update, undefined | pid(), ra_index(),
[segment_ref()]}, state()) -> state().
handle_log_update({ra_log_update, From, FstIdx, SegRefs},
handle_log_update({ra_log_update, From, _FstIdx, SegRefs},
#?STATE{open_segments = Open0} = State) ->
Open = ra_flru:evict_all(Open0),
case From of
Expand All @@ -117,7 +117,7 @@ handle_log_update({ra_log_update, From, FstIdx, SegRefs},
From ! ra_log_update_processed
end,
State#?MODULE{segment_refs = SegRefs,
first_index = FstIdx,
% first_index = FstIdx,
open_segments = Open}.

-spec update_first_index(ra_index(), state()) ->
Expand All @@ -140,7 +140,7 @@ update_first_index(FstIdx, #?STATE{segment_refs = SegRefs0,
end
end, OpenSegs0, ObsoleteKeys),
{State#?STATE{open_segments = OpenSegs,
first_index = FstIdx,
% first_index = FstIdx,
segment_refs = Active},
Obsolete}
end.
Expand Down
11 changes: 10 additions & 1 deletion src/ra_log_wal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,16 @@ recover_records(#conf{names = Names} = Conf, Fd,
recover_records(Conf, Fd, Chunk, Cache, State)
end;
ok ->
recover_records(Conf, Fd, Rest, Cache, State0);
%% best the the snapshot index as the last
%% writer index
Writers = case State0#recovery.writers of
#{UId := {in_seq, SnapIdx}} = W ->
W;
W ->
W#{UId => {in_seq, SnapIdx}}
end,
recover_records(Conf, Fd, Rest, Cache,
State0#recovery{writers = Writers});
error ->
?DEBUG("WAL: record failed CRC check. If this is the last record"
" recovery can resume", []),
Expand Down
21 changes: 21 additions & 0 deletions test/ra_log_2_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ all_tests() ->
sparse_read_out_of_range_2,
written_event_after_snapshot,
writes_lower_than_snapshot_index_are_dropped,
recover_after_snapshot,
updated_segment_can_be_read,
open_segments_limit,
%% TODO mt: do or deprecate in current minor
Expand Down Expand Up @@ -476,6 +477,26 @@ written_event_after_snapshot(Config) ->
% false = filelib:is_file(Snap1),
ok.


recover_after_snapshot(Config) ->
Log0 = ra_log_init(Config, #{min_snapshot_interval => 1}),
Log1 = ra_log:append({1, 1, <<"one">>}, Log0),
Log2 = ra_log:append({2, 1, <<"two">>}, Log1),
{Log3, _} = ra_log:update_release_cursor(2, #{}, 1,
<<"one+two">>, Log2),
Log4 = deliver_all_log_events(Log3, 100),
ra_log:close(Log4),
restart_wal(),
timer:sleep(1000),
Log = ra_log_init(Config, #{min_snapshot_interval => 1}),
Overview = ra_log:overview(Log),
ra_log:close(Log),
?assertMatch(#{last_index := 2,
last_term := 1,
snapshot_index := 2,
last_written_index_term := {2, 1}}, Overview),
ok.

writes_lower_than_snapshot_index_are_dropped(Config) ->
logger:set_primary_config(level, debug),
Log0 = ra_log_init(Config, #{min_snapshot_interval => 1}),
Expand Down

0 comments on commit 9e4cbb0

Please sign in to comment.