Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
kjnilsson committed Oct 25, 2024
1 parent a6d9d4e commit 3a6d5a4
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 20 deletions.
2 changes: 1 addition & 1 deletion src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ tick(Now, #?MODULE{cfg = #cfg{wal = Wal},
case Now > Ms + ?WAL_RESEND_TIMEOUT andalso
is_pid(CurWalPid) andalso
CurWalPid =/= WalPid andalso
ra_range:in(LastWrittenIdx, MtRange)
ra_range:in(LastWrittenIdx + 1, MtRange)
of
true ->
%% the wal has restarted, it has been at least 5s and there are
Expand Down
26 changes: 11 additions & 15 deletions src/ra_log_segment_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -137,22 +137,25 @@ handle_cast({mem_tables, Ranges, WalFile}, #state{data_dir = Dir,
ok = counters:add(State#state.counter, ?C_MEM_TABLES, map_size(Ranges)),
#{names := Names} = ra_system:fetch(System),
Degree = erlang:system_info(schedulers),
%% TODO: run each "chunk" in a single parallel function to make better use
%% of time when flushing for more then num schedulers writers
%% TODO: refactor to make better use of time where each uid has an
%% uneven amount of work to do.
RangesList = maps:fold(
fun (UId, TidRanges, Acc) ->
case ra_directory:is_registered_uid(Names, UId) of
true ->
[{UId, TidRanges} | Acc];
false ->
%% TODO: log
%% delete all tids as the uid is not
%% registered
?DEBUG("segment_writer in '~w': deleting memtable "
"for ~w as not a registered uid",
[System, UId]),
ok = ra_log_ets:delete_mem_tables(Names, UId),
Acc
end
end, [], Ranges),

T1 = erlang:monotonic_time(),
_ = [begin
{_, Failures} = ra_lib:partition_parallel(
fun (E) ->
Expand All @@ -169,19 +172,12 @@ handle_cast({mem_tables, Ranges, WalFile}, #state{data_dir = Dir,
exit({segment_writer_segment_write_failure, Failures})
end
end || Tabs <- ra_lib:lists_chunk(Degree, RangesList)],
% delete wal file once done
% TODO: test scenario when server crashes after segments but before
% deleting walfile
% can we make segment writer idempotent somehow
?DEBUG("segment_writer: deleting wal file: ~ts",
[filename:basename(WalFile)]),
%% temporarily disable wal deletion
%% TODO: this should be a debug option config?
% Base = filename:basename(WalFile),
% BkFile = filename:join([State0#state.data_dir, "wals", Base]),
% filelib:ensure_dir(BkFile),
% file:copy(WalFile, BkFile),
%% delete wal file once done
ok = prim_file:delete(filename:join(Dir, WalFile)),
T2 = erlang:monotonic_time(),
Diff = erlang:convert_time_unit(T2 - T1, native, millisecond),
?DEBUG("segment_writer in '~w': completed flush of ~b writers from wal file ~s in ~bms",
[System, length(RangesList), WalFile, Diff]),
{noreply, State};
handle_cast({truncate_segments, Who, {_From, _To, Name} = SegRef},
#state{segment_conf = SegConf} = State0) ->
Expand Down
11 changes: 7 additions & 4 deletions test/coordination_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ segment_writer_or_wal_crash_follower(Config) ->
ct:pal("running iteration ~b", [I]),

WriterPid = spawn(WriterFun),
timer:sleep(rand:uniform(500) + 5_000),
timer:sleep(rand:uniform(500) + 1_000),

case I rem 2 == 0 of
true ->
Expand Down Expand Up @@ -1022,7 +1022,7 @@ segment_writer_or_wal_crash_leader(Config) ->
ct:pal("running iteration ~b", [I]),

WriterPid = spawn_link(fun () -> WriterFun(Leader) end),
timer:sleep(rand:uniform(500) + 5_000),
timer:sleep(rand:uniform(500) + 1_000),

case I rem 2 == 0 of
true ->
Expand All @@ -1037,7 +1037,7 @@ segment_writer_or_wal_crash_leader(Config) ->
true = ct_rpc:call(LeaderNode, erlang, exit, [Pid, kill])
end,

timer:sleep(1000),
timer:sleep(rand:uniform(500) + 500),
WriterPid ! stop,
await_condition(fun () -> not is_process_alive(WriterPid) end, 1000),

Expand Down Expand Up @@ -1368,8 +1368,11 @@ await_condition(_Fun, 0) ->
await_condition(Fun, Attempts) ->
case catch Fun() of
true -> ok;
false ->
timer:sleep(100),
await_condition(Fun, Attempts - 1);
_Reason ->
% ct:pal("await_condition retry with ~p", [Reason]),
% ct:pal("await_condition retry with ~p", [_Reason]),
timer:sleep(100),
await_condition(Fun, Attempts - 1)
end.
Expand Down

0 comments on commit 3a6d5a4

Please sign in to comment.