Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
kjnilsson committed Nov 11, 2024
1 parent 3376ab0 commit 6a3a148
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 103 deletions.
95 changes: 42 additions & 53 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ init(#{uid := UId,
%% but can't hurt as it may trigger some cleanup
{DeleteSpecs, Mt} = ra_mt:set_first(FirstIdx, Mt0),

ok = exec_mem_table_delete(Names, DeleteSpecs, Mt),
ok = exec_mem_table_delete(Names, UId, DeleteSpecs),
Reader0 = ra_log_reader:init(UId, Dir, FirstIdx, MaxOpen, AccessPattern, SegRefs,
Names, Counter),
%% TODO: can there be obsolete segments returned here?
Expand Down Expand Up @@ -316,7 +316,7 @@ commit_tx(#?MODULE{cfg = #cfg{uid = UId,
tx = true,
mem_table = Mt1} = State) ->
{Entries, Mt} = ra_mt:commit(Mt1),
Tid = ra_mt:tid(Mt1),
Tid = ra_mt:tid(Mt),
WriterId = {UId, self()},
{WalCommands, Num} =
lists:foldl(fun ({Idx, Term, Cmd0}, {WC, N}) ->
Expand Down Expand Up @@ -363,10 +363,10 @@ append({Idx, Term, Cmd0} = Entry,
{error, wal_down} ->
error(wal_down)
end;
{error, overwriting} ->
?DEBUG("~ts: mem table overwrite detected appending index ~b, "
{error, Reason} ->
?DEBUG("~ts: mem table ~s detected appending index ~b, "
"opening new mem table",
[Cfg#cfg.log_id, Idx]),
[Cfg#cfg.log_id, Reason, Idx]),
%% this function uses the infinity timeout
{ok, M0} = ra_log_ets:new_mem_table_please(Cfg#cfg.names,
Cfg#cfg.uid, Mt0),
Expand All @@ -378,22 +378,21 @@ append({Idx, Term, _Cmd} = Entry,
tx = true,
mem_table = Mt0} = State)
when Idx =:= LastIdx + 1 ->
Mt = case ra_mt:stage(Entry, Mt0) of
{ok, M} ->
M;
{error, overwriting} ->
?DEBUG("~ts: mem table overwrite detected appending index ~b, "
"opening new mem table",
[Cfg#cfg.log_id, Idx]),
%% this function uses the infinity timeout
{ok, M0} = ra_log_ets:new_mem_table_please(Cfg#cfg.names,
Cfg#cfg.uid, Mt0),
append(Entry, State#?MODULE{mem_table = M0})
end,
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, LastIdx),
State#?MODULE{last_index = Idx,
last_term = Term,
mem_table = Mt};
case ra_mt:stage(Entry, Mt0) of
{ok, Mt} ->
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, LastIdx),
State#?MODULE{last_index = Idx,
last_term = Term,
mem_table = Mt};
{error, Reason} ->
?DEBUG("~ts: mem table ~s detected appending index ~b, tx=true "
"opening new mem table",
[Cfg#cfg.log_id, Reason, Idx]),
%% this function uses the infinity timeout
{ok, M0} = ra_log_ets:new_mem_table_please(Cfg#cfg.names,
Cfg#cfg.uid, Mt0),
append(Entry, State#?MODULE{mem_table = M0})
end;
append({Idx, _, _}, #?MODULE{last_index = LastIdx}) ->
Msg = lists:flatten(io_lib:format("tried writing ~b - expected ~b",
[Idx, LastIdx+1])),
Expand Down Expand Up @@ -599,16 +598,18 @@ handle_event({written, _Term, {FromIdx, _}} = Evt,
State#?MODULE{last_written_index_term = {Expected, Term}})
end;
handle_event({segments, TidRanges, NewSegs},
#?MODULE{cfg = #cfg{log_id = _LogId, names = Names},
#?MODULE{cfg = #cfg{uid = UId, names = Names},
reader = Reader0,
mem_table = Mt0,
readers = Readers
} = State0) ->
Reader = ra_log_reader:update_segments(NewSegs, Reader0),
Mt = lists:foldl(
%% the tid ranges arrive in the order they were written so we need to
%% foldr here to process the oldest first
Mt = lists:foldr(
fun ({Tid, Range}, Acc0) ->
{Spec, Acc} = ra_mt:record_flushed(Tid, Range, Acc0),
ok = ra_log_ets:execute_delete(Names, Spec, Acc),
ok = ra_log_ets:execute_delete(Names, UId, Spec),
Acc
end, Mt0, TidRanges),
State = State0#?MODULE{reader = Reader,
Expand All @@ -623,7 +624,8 @@ handle_event({segments, TidRanges, NewSegs},
{State, log_update_effects(Readers, Pid, State)}
end;
handle_event({snapshot_written, {SnapIdx, _} = Snap, SnapKind},
#?MODULE{cfg = #cfg{names = Names} = Cfg,
#?MODULE{cfg = #cfg{uid = UId,
names = Names} = Cfg,
first_index = FstIdx,
last_index = LstIdx,
mem_table = Mt0,
Expand Down Expand Up @@ -662,7 +664,7 @@ handle_event({snapshot_written, {SnapIdx, _} = Snap, SnapKind},
%% segwriter detects a missing index it will query the snaphost
%% state and if that is higher it will resume flush
{Spec, Mt1} = ra_mt:set_first(SnapIdx + 1, Mt0),
ok = exec_mem_table_delete(Names, Spec, Mt1),
ok = exec_mem_table_delete(Names, UId, Spec),

{State#?MODULE{first_index = SnapIdx + 1,
last_index = max(LstIdx, SnapIdx),
Expand Down Expand Up @@ -742,7 +744,8 @@ set_snapshot_state(SnapState, State) ->
-spec install_snapshot(ra_idxterm(), ra_snapshot:state(), state()) ->
{state(), effects()}.
install_snapshot({SnapIdx, SnapTerm} = IdxTerm, SnapState0,
#?MODULE{cfg = #cfg{names = Names} = Cfg,
#?MODULE{cfg = #cfg{uid = UId,
names = Names} = Cfg,
mem_table = Mt0
} = State0) ->
ok = incr_counter(Cfg, ?C_RA_LOG_SNAPSHOTS_INSTALLED, 1),
Expand All @@ -756,7 +759,7 @@ install_snapshot({SnapIdx, SnapTerm} = IdxTerm, SnapState0,
ra_snapshot:directory(SnapState, checkpoint),
Checkpoint} || Checkpoint <- Checkpoints],
{Spec, Mt} = ra_mt:set_first(SnapIdx, Mt0),
ok = exec_mem_table_delete(Names, Spec, Mt),
ok = exec_mem_table_delete(Names, UId, Spec),
{State#?MODULE{snapshot_state = SnapState,
first_index = SnapIdx + 1,
last_index = SnapIdx,
Expand Down Expand Up @@ -1107,8 +1110,7 @@ wal_write_batch(#?MODULE{cfg = #cfg{uid = UId,
mem_table = Mt0} = State,
Entries) ->
WriterId = {UId, self()},
%% TODO: this isn't quite right, entries could theoretically be written to
%% different tids, although the way this is called currently prevents that
%% all entries in a transaction are written to the same tid
Tid = ra_mt:tid(Mt0),
{WalCommands, Num} =
lists:foldl(fun ({Idx, Term, Cmd0}, {WC, N}) ->
Expand Down Expand Up @@ -1188,25 +1190,25 @@ resend_from0(Idx, #?MODULE{last_resend_time = {LastResend, WalPid},
stage_entries(Cfg, [Entry | Rem] = Entries, Mt0) ->
case ra_mt:stage(Entry, Mt0) of
{ok, Mt} ->
stage_entries0(Rem, Mt);
{error, overwriting} ->
?DEBUG("~ts: mem table overwrite detected, opening new mem table",
[Cfg#cfg.log_id]),
stage_entries0(Cfg, Rem, Mt);
{error, OverwritingOrLimitReached} ->
?DEBUG("~ts: mem table ~s detected whilst staging entries, opening new mem table",
[Cfg#cfg.log_id, OverwritingOrLimitReached]),
%% TODO: do we need error handling here - this function uses the infinity
%% timeout
{ok, Mt} = ra_log_ets:new_mem_table_please(Cfg#cfg.names,
Cfg#cfg.uid, Mt0),
stage_entries(Cfg, Entries, Mt)
end.

stage_entries0([], Mt) ->
stage_entries0(_Cfg, [], Mt) ->
{ok, Mt};
stage_entries0([Entry | Rem], Mt0) ->
stage_entries0(Cfg, [Entry | Rem], Mt0) ->
case ra_mt:stage(Entry, Mt0) of
{ok, Mt} ->
stage_entries0(Rem, Mt);
stage_entries0(Cfg, Rem, Mt);
{error, overwriting} ->
Range = ra_mt:range(Mt0),
Range = ra_mt:range(Mt0),
Msg = io_lib:format("ra_log:verify_entries/2 "
"tried writing ~p - mem table range ~w",
[Rem, Range]),
Expand Down Expand Up @@ -1308,25 +1310,12 @@ maps_with_values(Keys, Map) ->
end
end, [], Keys).

% last_index_term_in_wal(Idx, #?MODULE{last_written_index_term = {Idx, Term}} = State) ->
% % we reached the lower limit which is the last known written index
% {Idx, Term, State};
% last_index_term_in_wal(Idx, #?MODULE{reader = Reader0} = State) ->
% case ra_log_reader:fetch_term(Idx, Reader0) of
% {undefined, Reader} ->
% last_index_term_in_wal(Idx-1, State#?MODULE{reader = Reader});
% {Term, Reader} ->
% %% if it can be read when bypassing the local cache it is in the
% %% wal
% {Idx, Term, State#?MODULE{reader = Reader}}
% end.

now_ms() ->
erlang:system_time(millisecond).

exec_mem_table_delete(#{} = Names, Specs, Mt)
exec_mem_table_delete(#{} = Names, UId, Specs)
when is_list(Specs) ->
[ra_log_ets:execute_delete(Names, Spec, Mt)
[ra_log_ets:execute_delete(Names, UId, Spec)
|| Spec <- Specs],
ok.

Expand Down
33 changes: 21 additions & 12 deletions src/ra_log_ets.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@
table_opts :: list()}).

-define(TABLE_OPTS, [set,
{write_concurrency, true},
{decentralized_counters, true},
{write_concurrency, auto},
public
]).

Expand Down Expand Up @@ -87,13 +86,13 @@ delete_mem_tables(#{log_ets := Name}, UId) ->
gen_server:cast(Name, {delete_mem_tables, UId}).

-spec execute_delete(ra_system:names(),
ra_mt:delete_spec(),
ra_mt:state()) ->
ra:uid(),
ra_mt:delete_spec()) ->
ok.
execute_delete(#{}, undefined, _Mt) ->
execute_delete(#{}, _UId, undefined) ->
ok;
execute_delete(#{log_ets := Name}, Spec, Mt) ->
gen_server:cast(Name, {exec_delete, Spec, Mt}).
execute_delete(#{log_ets := Name}, UId, Spec) ->
gen_server:cast(Name, {exec_delete, UId, Spec}).


%%%===================================================================
Expand All @@ -106,10 +105,11 @@ init([#{data_dir := DataDir,
open_mem_tbls := OpenMemTbls} = Names} = Config]) ->
process_flag(trap_exit, true),
CompressMemTbls = maps:get(compress_mem_tables, Config, false),
?INFO("~s: in system ~s initialising...", [LogEts, System]),
TblOpts = ?TABLE_OPTS ++ [{compressed, CompressMemTbls}],
?INFO("~s: in system ~s initialising. Mem table opts: ~0P",
[LogEts, System, TblOpts]),
_ = ets:new(OpenMemTbls, [bag, protected, named_table]),
ok = ra_directory:init(DataDir, Names),
TblOpts = ?TABLE_OPTS ++ [{compressed, CompressMemTbls}],
{ok, #state{names = Names,
table_opts = TblOpts}}.

Expand All @@ -135,10 +135,19 @@ handle_call(Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.

handle_cast({exec_delete, Spec, Mt}, State) ->
try timer:tc(fun () -> ra_mt:delete(Spec, Mt) end) of
handle_cast({exec_delete, UId, Spec},
#state{names = #{open_mem_tbls := MemTables}} = State) ->
case Spec of
{delete, Tid} ->
ets:delete_object(MemTables, {UId, Tid});
_ ->
ok
end,

%% TODO: delete from ra_log_open_mem_tables if {delete, tid()}
try timer:tc(fun () -> ra_mt:delete(Spec) end) of
{Time, Num} ->
?DEBUG("ra_log_ets: ets:delete/1 took ~bms to delete ~w ~b entries",
?DEBUG("ra_log_ets: ra_mt:delete/1 took ~bms to delete ~w ~b entries",
[Time div 1000, Spec, Num]),
ok
catch
Expand Down
3 changes: 1 addition & 2 deletions src/ra_log_segment_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,7 @@ send_segments(System, ServerUId, TidRanges, SegRefs) ->
[begin
%% this looks a bit weird but
%% we dont need full init to run a delete
M = ra_mt:init(Tid, read),
_ = ra_mt:delete({range, Tid, Range}, M)
_ = ra_mt:delete({range, Tid, Range})
end || {Tid, Range} <- TidRanges],
ok;
Pid ->
Expand Down
2 changes: 1 addition & 1 deletion src/ra_log_wal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1051,7 +1051,7 @@ handle_trunc(true, UId, Idx, #recovery{mode = Mode,
#{UId := Mt0} when Mode == initial ->
%% only meddle with mem table data in initial mode
{Specs, Mt} = ra_mt:set_first(Idx-1, Mt0),
[_ = ra_mt:delete(Spec, Mt) || Spec <- Specs],
[_ = ra_mt:delete(Spec) || Spec <- Specs],
State#recovery{tables = Tbls#{UId => Mt}};
_ ->
State
Expand Down
Loading

0 comments on commit 6a3a148

Please sign in to comment.