Skip to content

Commit

Permalink
Backoff after snapshot catchup is complete to allow peer time to load
Browse files Browse the repository at this point in the history
Summary:
Add a configurable backoff time after a snapshot catchup is complete
before considering rerunning the snapshot catchup to allow time for
peers to load the transferred snapshot before starting a potentially
duplicate catchup.

Differential Revision: D61145064

fbshipit-source-id: 269cb824df7b8439fdfbd6462d301817c5dbe17d
  • Loading branch information
hsun324 authored and facebook-github-bot committed Aug 13, 2024
1 parent f412631 commit 17f5896
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 33 deletions.
6 changes: 6 additions & 0 deletions include/wa_raft.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,12 @@
% Time to wait before retrying snapshot transport to a overloaded peer.
-define(RAFT_SNAPSHOT_CATCHUP_OVERLOADED_BACKOFF_MS, snapshot_catchup_overloaded_backoff_ms).
-define(RAFT_SNAPSHOT_CATCHUP_OVERLOADED_BACKOFF_MS(App), ?RAFT_APP_CONFIG(App, ?RAFT_SNAPSHOT_CATCHUP_OVERLOADED_BACKOFF_MS, 1000)).
% Time to wait before allowing a rerun of a completed snapshot transport.
-define(RAFT_SNAPSHOT_CATCHUP_COMPLETED_BACKOFF_MS, raft_snapshot_catchup_completed_backoff_ms).
-define(RAFT_SNAPSHOT_CATCHUP_COMPLETED_BACKOFF_MS(App), ?RAFT_APP_CONFIG(App, ?RAFT_SNAPSHOT_CATCHUP_COMPLETED_BACKOFF_MS, 20 * 1000)).
% Time to wait before allowing a rerun of a failed snapshot transport.
-define(RAFT_SNAPSHOT_CATCHUP_FAILED_BACKOFF_MS, raft_snapshot_catchup_failed_backoff_ms).
-define(RAFT_SNAPSHOT_CATCHUP_FAILED_BACKOFF_MS(App), ?RAFT_APP_CONFIG(App, ?RAFT_SNAPSHOT_CATCHUP_FAILED_BACKOFF_MS, 10 * 1000)).

%% Time in seconds to retain transport destination directories after use
-define(RAFT_TRANSPORT_RETAIN_INTERVAL, transport_retain_min_secs).
Expand Down
93 changes: 60 additions & 33 deletions src/wa_raft_snapshot_catchup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
-type snapshot_key() :: {wa_raft:table(), wa_raft:partition(), wa_raft_log:log_pos()}.

-record(transport, {
app :: atom(),
id :: wa_raft_transport:transport_id(),
snapshot :: wa_raft_log:log_pos()
}).
Expand All @@ -48,8 +49,10 @@
transports = #{} :: #{key() => #transport{}},
% counts of active transports that are using a particular snapshot
snapshots = #{} :: #{snapshot_key() => pos_integer()},
% backoff windows for nodes that previously reported being overloaded
backoff_windows = #{} :: #{node() => pos_integer()}
% timestamps (ms) after which transports to previously overloaded nodes can be retried
overload_backoffs = #{} :: #{node() => integer()},
% timestamps (ms) after which repeat transports can be retried
retry_backoffs = #{} :: #{key() => integer()}
}).

-spec child_spec() -> supervisor:child_spec().
Expand Down Expand Up @@ -88,14 +91,16 @@ handle_call(Request, From, #state{} = State) ->
{noreply, State}.

-spec handle_cast({request_snapshot_transport, atom(), node(), wa_raft:table(), wa_raft:partition()}, State :: #state{}) -> {noreply, #state{}}.
handle_cast({request_snapshot_transport, App, Peer, Table, Partition}, #state{transports = Transports, snapshots = Snapshots, backoff_windows = BackoffWindows} = State) ->
NowMillis = erlang:monotonic_time(millisecond),
case {Transports, BackoffWindows} of
{#{{Peer, Table, Partition} := _}, _} ->
{noreply, State};
{_, #{Peer := RetryAfterTs}} when RetryAfterTs > NowMillis ->
handle_cast({request_snapshot_transport, App, Peer, Table, Partition}, #state{transports = Transports, snapshots = Snapshots, overload_backoffs = OverloadBackoffs, retry_backoffs = RetryBackoffs} = State) ->
Now = erlang:monotonic_time(millisecond),
Key = {Peer, Table, Partition},
Exists = maps:is_key(Key, Transports),
Overloaded = maps:get(Peer, OverloadBackoffs, Now) > Now,
Blocked = maps:get(Key, RetryBackoffs, Now) > Now,
case Exists orelse Overloaded orelse Blocked of
true ->
{noreply, State};
{_, _} ->
false ->
case maps:size(Transports) < ?RAFT_MAX_CONCURRENT_SNAPSHOT_CATCHUP() of
true ->
try
Expand All @@ -106,15 +111,28 @@ handle_cast({request_snapshot_transport, App, Peer, Table, Partition}, #state{tr
{error, receiver_overloaded} ->
?LOG_NOTICE("Peer ~0p reported being overloaded. Not sending snapshot for ~0p:~0p. Will try again later",
[Peer, Table, Partition], #{domain => [whatsapp, wa_raft]}),
NewRetryAfterTs = NowMillis + ?RAFT_SNAPSHOT_CATCHUP_OVERLOADED_BACKOFF_MS(App),
{noreply, State#state{backoff_windows = BackoffWindows#{Peer => NewRetryAfterTs}}};
NewOverloadBackoff = Now + ?RAFT_SNAPSHOT_CATCHUP_OVERLOADED_BACKOFF_MS(App),
NewOverloadBackoffs = OverloadBackoffs#{Peer => NewOverloadBackoff},
NewRetryBackoffs = maps:remove(Key, RetryBackoffs),
NewState = State#state{
overload_backoffs = NewOverloadBackoffs,
retry_backoffs = NewRetryBackoffs
},
{noreply, NewState};
{ok, ID} ->
?LOG_NOTICE("started sending snapshot for ~0p:~0p at ~0p:~0p over transport ~0p",
[Table, Partition, Index, Term, ID], #{domain => [whatsapp, wa_raft]}),
NewTransports = Transports#{{Peer, Table, Partition} => #transport{id = ID, snapshot = LogPos}},
NewTransports = Transports#{{Peer, Table, Partition} => #transport{app = App, id = ID, snapshot = LogPos}},
NewSnapshots = maps:update_with({Table, Partition, LogPos}, fun(V) -> V + 1 end, 1, Snapshots),
NewBackoffWindows = maps:remove(Peer, BackoffWindows),
{noreply, State#state{transports = NewTransports, snapshots = NewSnapshots, backoff_windows = NewBackoffWindows}}
NewOverloadBackoffs = maps:remove(Peer, OverloadBackoffs),
NewRetryBackoffs = maps:remove(Key, RetryBackoffs),
NewState = State#state{
transports = NewTransports,
snapshots = NewSnapshots,
overload_backoffs = NewOverloadBackoffs,
retry_backoffs = NewRetryBackoffs
},
{noreply, NewState}
end
catch
_T:_E:S ->
Expand Down Expand Up @@ -151,32 +169,41 @@ terminate(_Reason, #state{transports = Transports, snapshots = Snapshots}) ->
end, Snapshots).

-spec scan_transport(Key :: key(), Transport :: #transport{}, #state{}) -> #state{}.
scan_transport({_Peer, Table, Partition} = Key, #transport{id = ID, snapshot = LogPos},
#state{transports = Transports, snapshots = Snapshots} = State) ->
scan_transport(Key, #transport{app = App, id = ID} = Transport, State) ->
Status = case wa_raft_transport:transport_info(ID) of
{ok, #{status := S}} -> S;
_ -> undefined
end,
case Status =:= requested orelse Status =:= running of
true ->
case Status of
requested ->
State;
false ->
SnapshotKey = {Table, Partition, LogPos},
NewSnapshots = case Snapshots of
#{SnapshotKey := 1} ->
% try to delete a snapshot if it is the last transport using it
delete_snapshot(Table, Partition, LogPos),
maps:remove(SnapshotKey, Snapshots);
#{SnapshotKey := Count} ->
% otherwise decrement the reference count for the snapshot
Snapshots#{SnapshotKey => Count - 1};
#{} ->
% unexpected that the snapshot is missing, but just ignore
Snapshots
end,
State#state{transports = maps:remove(Key, Transports), snapshots = NewSnapshots}
running ->
State;
completed ->
finish_transport(Key, Transport, ?RAFT_SNAPSHOT_CATCHUP_COMPLETED_BACKOFF_MS(App), State);
_Other ->
finish_transport(Key, Transport, ?RAFT_SNAPSHOT_CATCHUP_FAILED_BACKOFF_MS(App), State)
end.

-spec finish_transport(key(), #transport{}, pos_integer(), #state{}) -> #state{}.
finish_transport({_Peer, Table, Partition} = Key, #transport{snapshot = LogPos}, Backoff, #state{transports = Transports, snapshots = Snapshots, retry_backoffs = RetryBackoffs} = State) ->
Now = erlang:monotonic_time(millisecond),
SnapshotKey = {Table, Partition, LogPos},
NewSnapshots = case Snapshots of
#{SnapshotKey := 1} ->
% try to delete a snapshot if it is the last transport using it
delete_snapshot(Table, Partition, LogPos),
maps:remove(SnapshotKey, Snapshots);
#{SnapshotKey := Count} ->
% otherwise decrement the reference count for the snapshot
Snapshots#{SnapshotKey => Count - 1};
#{} ->
% unexpected that the snapshot is missing, but just ignore
Snapshots
end,
NewRetryBackoffs = RetryBackoffs#{Key => Now + Backoff},
State#state{transports = maps:remove(Key, Transports), snapshots = NewSnapshots, retry_backoffs = NewRetryBackoffs}.

-spec delete_snapshot(Table :: wa_raft:table(), Partition :: wa_raft:partition(),
Position :: wa_raft_log:log_pos()) -> ok.
delete_snapshot(Table, Partition, #raft_log_pos{index = Index, term = Term}) ->
Expand Down

0 comments on commit 17f5896

Please sign in to comment.