From f769c78f68a8a61384666c31af61691ecb193bf9 Mon Sep 17 00:00:00 2001 From: Henry Sun Date: Tue, 6 Aug 2024 14:16:55 -0700 Subject: [PATCH] Add limit on number of inactive transports to retain info for Summary: Split the transport tables into a table for transport info and a table for file info. Add the ability to limit the number of inactive transports to retain info for to avoid the transport info tables from growing without bound. Differential Revision: D60845349 fbshipit-source-id: dc7086493146d8d161e2f005aa66c6fa3f3fee4a --- include/wa_raft.hrl | 3 + src/wa_raft_transport.erl | 239 ++++++++++++++++++++++++++------------ 2 files changed, 165 insertions(+), 77 deletions(-) diff --git a/include/wa_raft.hrl b/include/wa_raft.hrl index 628bfa8..589fd8e 100644 --- a/include/wa_raft.hrl +++ b/include/wa_raft.hrl @@ -102,6 +102,9 @@ %% Time in seconds after which a transport that has not made progress should be considered failed -define(RAFT_TRANSPORT_IDLE_TIMEOUT(), ?RAFT_CONFIG(transport_idle_timeout_secs, 30)). +%% Maximum number of previous inactive transports to retain info for. +-define(RAFT_TRANSPORT_INACTIVE_INFO_LIMIT(), ?RAFT_CONFIG(raft_transport_inactive_info_limit, 30)). + %% Size in bytes of individual chunks (messages containing file data) to be sent during transports %% using the dist transport provider -define(RAFT_DIST_TRANSPORT_CHUNK_SIZE(), ?RAFT_CONFIG(dist_transport_chunk_size, 1 * 1024 * 1024)). diff --git a/src/wa_raft_transport.erl b/src/wa_raft_transport.erl index 761ce15..2372d9c 100644 --- a/src/wa_raft_transport.erl +++ b/src/wa_raft_transport.erl @@ -70,6 +70,11 @@ file_info/0 ]). +%% Name of the ETS table to keep records for transports +-define(TRANSPORT_TABLE, wa_raft_transport_transports). +%% Name of the ETS table to keep records for files +-define(FILE_TABLE, wa_raft_transport_files). + -define(RAFT_TRANSPORT_PARTITION_SUBDIRECTORY, "transport"). -define(RAFT_TRANSPORT_SCAN_INTERVAL_SECS, 30). @@ -82,9 +87,6 @@ %% Counter - inflight receives -define(RAFT_TRANSPORT_COUNTER_ACTIVE_RECEIVES, 1). --define(INFO_KEY(ID), {ID, info}). --define(FILE_KEY(ID, FileID), {ID, {file, FileID}}). - -type transport_id() :: pos_integer(). -type transport_info() :: #{ type := sender | receiver, @@ -247,18 +249,19 @@ complete(ID, FileID, Status, Pid) -> -spec setup_tables() -> ok. setup_tables() -> - ?MODULE = ets:new(?MODULE, [named_table, set, public]), + ?TRANSPORT_TABLE = ets:new(?TRANSPORT_TABLE, [named_table, set, public]), + ?FILE_TABLE = ets:new(?FILE_TABLE, [named_table, set, public]), ok. -spec transports() -> [transport_id()]. transports() -> - ets:select(?MODULE, [{{?INFO_KEY('$1'), '_'}, [], ['$1']}]). + ets:select(?TRANSPORT_TABLE, [{{'$1', '_'}, [], ['$1']}]). -spec transport_info(ID :: transport_id()) -> {ok, Info :: transport_info()} | not_found. transport_info(ID) -> - case ets:lookup(?MODULE, ?INFO_KEY(ID)) of - [{_, Info}] -> {ok, Info}; - [] -> not_found + case ets:lookup_element(?TRANSPORT_TABLE, ID, 2, not_found) of + not_found -> not_found; + Info -> {ok, Info} end. -spec transport_info(ID :: transport_id(), Item :: atom()) -> Info :: term() | undefined. @@ -272,33 +275,49 @@ transport_info(ID, Item) -> % provide any atomicity guarantees. -spec set_transport_info(ID :: transport_id(), Info :: transport_info(), Counters :: counters:counters_ref()) -> term(). set_transport_info(ID, #{atomics := TransportAtomics} = Info, Counters) -> - true = ets:insert(?MODULE, {?INFO_KEY(ID), Info}), + true = ets:insert(?TRANSPORT_TABLE, {ID, Info}), maybe_update_active_inbound_transport_counts(undefined, Info, Counters), ok = atomics:put(TransportAtomics, ?RAFT_TRANSPORT_ATOMICS_UPDATED_TS, erlang:system_time(millisecond)). % This function should only be called from the "factory" process since it does not % provide any atomicity guarantees. --spec update_transport_info(ID :: transport_id(), Fun :: fun((Info :: transport_info()) -> NewInfo :: transport_info()), Counters :: counters:counters_ref()) -> ok | not_found. -update_transport_info(ID, Fun, Counters) -> +-spec update_and_get_transport_info( + ID :: transport_id(), + Fun :: fun((Info :: transport_info()) -> NewInfo :: transport_info()), + Counters :: counters:counters_ref() +) -> {ok, NewOrExistingInfo :: transport_info()} | not_found. +update_and_get_transport_info(ID, Fun, Counters) -> case transport_info(ID) of {ok, #{atomics := TransportAtomics} = Info} -> case Fun(Info) of Info -> - ok; + {ok, Info}; NewInfo -> - true = ets:insert(?MODULE, {?INFO_KEY(ID), NewInfo}), + true = ets:insert(?TRANSPORT_TABLE, {ID, NewInfo}), ok = atomics:put(TransportAtomics, ?RAFT_TRANSPORT_ATOMICS_UPDATED_TS, erlang:system_time(millisecond)), - maybe_update_active_inbound_transport_counts(Info, NewInfo, Counters) + ok = maybe_update_active_inbound_transport_counts(Info, NewInfo, Counters), + {ok, NewInfo} end; not_found -> not_found end. +-spec delete_transport_info(ID :: transport_id()) -> ok | not_found. +delete_transport_info(ID) -> + case transport_info(ID) of + {ok, #{total_files := TotalFiles}} -> + lists:foreach(fun (FileID) -> delete_file_info(ID, FileID) end, lists:seq(1, TotalFiles)), + ets:delete(?TRANSPORT_TABLE, ID), + ok; + not_found -> + not_found + end. + -spec file_info(ID :: transport_id(), FileID :: file_id()) -> {ok, Info :: file_info()} | not_found. file_info(ID, FileID) -> - case ets:lookup(?MODULE, ?FILE_KEY(ID, FileID)) of - [{_, Info}] -> {ok, Info}; - [] -> not_found + case ets:lookup_element(?FILE_TABLE, {ID, FileID}, 2, not_found) of + not_found -> not_found; + Info -> {ok, Info} end. -spec maybe_update_active_inbound_transport_counts(OldInfo :: transport_info() | undefined, NewInfo :: transport_info(), Counters :: counters:counters_ref()) -> ok. @@ -315,7 +334,7 @@ maybe_update_active_inbound_transport_counts(_, _, _) -> % transport of the specified file since it does not provide any atomicity guarantees. -spec set_file_info(ID :: transport_id(), FileID :: file_id(), Info :: file_info()) -> term(). set_file_info(ID, FileID, #{atomics := {TransportAtomics, FileAtomics}} = Info) -> - true = ets:insert(?MODULE, {?FILE_KEY(ID, FileID), Info}), + true = ets:insert(?FILE_TABLE, {{ID, FileID}, Info}), NowMillis = erlang:system_time(millisecond), ok = atomics:put(TransportAtomics, ?RAFT_TRANSPORT_ATOMICS_UPDATED_TS, NowMillis), ok = atomics:put(FileAtomics, ?RAFT_TRANSPORT_ATOMICS_UPDATED_TS, NowMillis). @@ -330,7 +349,7 @@ update_file_info(ID, FileID, Fun) -> Info -> ok; NewInfo -> - true = ets:insert(?MODULE, {?FILE_KEY(ID, FileID), NewInfo}), + true = ets:insert(?FILE_TABLE, {{ID, FileID}, NewInfo}), NowMillis = erlang:system_time(millisecond), ok = atomics:put(TransportAtomics, ?RAFT_TRANSPORT_ATOMICS_UPDATED_TS, NowMillis), ok = atomics:put(FileAtomics, ?RAFT_TRANSPORT_ATOMICS_UPDATED_TS, NowMillis), @@ -340,6 +359,11 @@ update_file_info(ID, FileID, Fun) -> not_found end. +-spec delete_file_info(ID :: transport_id(), FileID :: file_id()) -> ok. +delete_file_info(ID, FileID) -> + ets:delete(?FILE_TABLE, {ID, FileID}), + ok. + %%------------------------------------------------------------------- %% Internal API %%------------------------------------------------------------------- @@ -443,14 +467,18 @@ handle_call({transport, ID, Peer, Module, Meta, Files}, From, #state{counters = ], TotalFiles =:= 0 andalso - update_transport_info(ID, fun (Info0) -> - Info1 = Info0#{status => completed, end_ts => NowMillis}, - Info2 = case maybe_notify_complete(ID, Info1, State) of - ok -> Info1; - {error, Reason} -> Info1#{status => failed, error => {notify_failed, Reason}} + update_and_get_transport_info( + ID, + fun (Info0) -> + Info1 = Info0#{status => completed, end_ts => NowMillis}, + Info2 = case maybe_notify_complete(ID, Info1, State) of + ok -> Info1; + {error, Reason} -> Info1#{status => failed, error => {notify_failed, Reason}} + end, + maybe_notify(ID, Info2) end, - maybe_notify(ID, Info2) - end, Counters), + Counters + ), {reply, ok, State} end @@ -459,26 +487,39 @@ handle_call({transport, ID, Peer, Module, Meta, Files}, From, #state{counters = ?RAFT_COUNT('raft.transport.receive.error'), ?LOG_WARNING("wa_raft_transport failed to accept transport ~p due to ~p ~p: ~n~p", [ID, T, E, S], #{domain => [whatsapp, wa_raft]}), - update_transport_info(ID, fun (Info) -> Info#{status => failed, end_ts => erlang:system_time(millisecond), error => {receive_failed, {T, E, S}}} end, Counters), + update_and_get_transport_info( + ID, + fun (Info) -> + Info#{ + status => failed, + end_ts => erlang:system_time(millisecond), + error => {receive_failed, {T, E, S}} + } + end, + Counters + ), {reply, {error, failed}, State} end; handle_call({cancel, ID, Reason}, _From, #state{counters = Counters} = State) -> ?LOG_NOTICE("wa_raft_transport got cancellation request for ~p for reason ~p", [ID, Reason], #{domain => [whatsapp, wa_raft]}), - Result = - update_transport_info(ID, - fun - (#{status := running} = Info) -> - NowMillis = erlang:system_time(millisecond), - Info#{status => cancelled, end_ts => NowMillis, error => {cancelled, Reason}}; - (Info) -> - Info - end, - Counters), - Reply = case Result of - ok -> ok; - not_found -> {error, not_found} - end, + Reply = + case + update_and_get_transport_info( + ID, + fun + (#{status := running} = Info) -> + NowMillis = erlang:system_time(millisecond), + Info#{status => cancelled, end_ts => NowMillis, error => {cancelled, Reason}}; + (Info) -> + Info + end, + Counters + ) + of + {ok, _Info} -> ok; + not_found -> {error, not_found} + end, {reply, Reply, State}; handle_call(Request, _From, #state{} = State) -> ?LOG_WARNING("wa_raft_transport received unrecognized factory call ~p", @@ -500,31 +541,34 @@ handle_cast({complete, ID, FileID, Status, Pid}, #state{counters = Counters} = S Result0 =:= not_found andalso ?LOG_WARNING("wa_raft_transport got complete report for unknown file ~p:~p", [ID, FileID], #{domain => [whatsapp, wa_raft]}), - Result1 = update_transport_info(ID, - fun - (#{status := running, type := Type, completed_files := CompletedFiles, total_files := TotalFiles} = Info0) -> - Info1 = Info0#{completed_files => CompletedFiles + 1}, - Info2 = case CompletedFiles + 1 of - TotalFiles -> Info1#{status => completed, end_ts => NowMillis}; - _ -> Info1 - end, - Info3 = case Status of - ok -> Info2; - _ -> Info2#{status => failed, end_ts => NowMillis, error => {file, FileID, Status}} - end, - Info4 = case maybe_notify_complete(ID, Info3, State) of - ok -> Info3; - {error, Reason} -> Info3#{status => failed, error => {notify_failed, Reason}} - end, - Info5 = case Type of - sender -> maybe_submit_one(ID, Info4, Pid); - _ -> Info4 - end, - maybe_notify(ID, Info5); - (Info) -> - Info - end, - Counters), + Result1 = + update_and_get_transport_info( + ID, + fun + (#{status := running, type := Type, completed_files := CompletedFiles, total_files := TotalFiles} = Info0) -> + Info1 = Info0#{completed_files => CompletedFiles + 1}, + Info2 = case CompletedFiles + 1 of + TotalFiles -> Info1#{status => completed, end_ts => NowMillis}; + _ -> Info1 + end, + Info3 = case Status of + ok -> Info2; + _ -> Info2#{status => failed, end_ts => NowMillis, error => {file, FileID, Status}} + end, + Info4 = case maybe_notify_complete(ID, Info3, State) of + ok -> Info3; + {error, Reason} -> Info3#{status => failed, error => {notify_failed, Reason}} + end, + Info5 = case Type of + sender -> maybe_submit_one(ID, Info4, Pid); + _ -> Info4 + end, + maybe_notify(ID, Info5); + (Info) -> + Info + end, + Counters + ), Result1 =:= not_found andalso ?LOG_WARNING("wa_raft_transport got complete report for unknown transfer ~p", [ID], #{domain => [whatsapp, wa_raft]}), @@ -535,10 +579,19 @@ handle_cast(Request, State) -> -spec handle_info(Info :: term(), State :: #state{}) -> {noreply, NewState :: #state{}}. handle_info(scan, #state{counters = Counters} = State) -> - lists:foreach( - fun (ID) -> - update_transport_info(ID, fun (Info) -> scan_transport(ID, Info) end, Counters) - end, transports()), + InactiveTransports = + lists:filter( + fun (ID) -> + case update_and_get_transport_info(ID, fun (Info) -> scan_transport(ID, Info) end, Counters) of + {ok, #{status := Status}} -> Status =/= requested andalso Status =/= running; + not_found -> false + end + end, transports()), + ExcessTransports = length(InactiveTransports) - ?RAFT_TRANSPORT_INACTIVE_INFO_LIMIT(), + ExcessTransports > 0 andalso begin + ExcessTransportIDs = lists:sublist(lists:sort(InactiveTransports), ExcessTransports), + lists:foreach(fun delete_transport_info/1, ExcessTransportIDs) + end, schedule_scan(), {noreply, State}; handle_info(Info, State) -> @@ -605,7 +658,8 @@ handle_transport_start(From, Peer, Meta, Root, Counters) -> FileData = [{FileID, Filename, Size} || {FileID, Filename, _, _, Size} <- Files], case gen_server:call({?MODULE, Peer}, {transport, ID, node(), Module, Meta, FileData}) of ok -> - update_transport_info(ID, + update_and_get_transport_info( + ID, fun (Info0) -> Info1 = case From of undefined -> Info0; @@ -621,20 +675,41 @@ handle_transport_start(From, Peer, Meta, Root, Counters) -> Workers = [Pid || {_Id, Pid, _Type, _Modules} <- supervisor:which_children(Sup), is_pid(Pid)], lists:foldl(fun (Pid, InfoN) -> maybe_submit_one(ID, InfoN, Pid) end, Info2, Workers) end - end, - Counters), + end, + Counters + ), {ok, ID}; {error, receiver_overloaded} -> ?RAFT_COUNT('raft.transport.rejected.receiver_overloaded'), ?LOG_WARNING("wa_raft_transport peer ~p rejected transport ~p because of overload", [Peer, ID], #{domain => [whatsapp, wa_raft]}), - update_transport_info(ID, fun (Info) -> Info#{status => failed, end_ts => NowMillis, error => {rejected, receiver_overloaded}} end, Counters), + update_and_get_transport_info( + ID, + fun (Info) -> + Info#{ + status => failed, + end_ts => NowMillis, + error => {rejected, receiver_overloaded} + } + end, + Counters + ), {error, receiver_overloaded}; Error -> ?RAFT_COUNT('raft.transport.rejected'), ?LOG_WARNING("wa_raft_transport peer ~p rejected transport ~p with error ~p", [Peer, ID, Error], #{domain => [whatsapp, wa_raft]}), - update_transport_info(ID, fun (Info) -> Info#{status => failed, end_ts => NowMillis, error => {rejected, Error}} end, Counters), + update_and_get_transport_info( + ID, + fun (Info) -> + Info#{ + status => failed, + end_ts => NowMillis, + error => {rejected, Error} + } + end, + Counters + ), {error, Error} end catch @@ -642,7 +717,17 @@ handle_transport_start(From, Peer, Meta, Root, Counters) -> ?RAFT_COUNT('raft.transport.start.error'), ?LOG_WARNING("wa_raft_transport failed to start transport ~p due to ~p ~p: ~n~p", [ID, T, E, S], #{domain => [whatsapp, wa_raft]}), - update_transport_info(ID, fun (Info) -> Info#{status => failed, end_ts => erlang:system_time(millisecond), error => {start, {T, E, S}}} end, Counters), + update_and_get_transport_info( + ID, + fun (Info) -> + Info#{ + status => failed, + end_ts => erlang:system_time(millisecond), + error => {start, {T, E, S}} + } + end, + Counters + ), {error, failed} end. @@ -746,7 +831,7 @@ maybe_notify(ID, #{status := Status, notify := Notify} = Info) when Status =/= r maybe_notify(_ID, Info) -> Info. --spec scan_transport(transport_id(), transport_info()) -> transport_info(). +-spec scan_transport(ID :: transport_id(), Info :: transport_info()) -> NewInfo :: transport_info(). scan_transport(ID, #{status := running, atomics := TransportAtomics} = Info) -> LastUpdateTs = atomics:get(TransportAtomics, ?RAFT_TRANSPORT_ATOMICS_UPDATED_TS), NowMillis = erlang:system_time(millisecond),