From c4ba6a38c9b9249734d8472c5fb9493d58a38551 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Wed, 30 Oct 2024 08:46:06 +0100 Subject: [PATCH] Start update slotmap timer after initial nodes are added MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Includes a new testcase simulating that all nodes are unreachable. Signed-off-by: Björn Svensson --- src/ered_cluster.erl | 20 ++++++++------- test/ered_SUITE.erl | 58 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 9 deletions(-) diff --git a/src/ered_cluster.erl b/src/ered_cluster.erl index 4910f73..8a81c8e 100644 --- a/src/ered_cluster.erl +++ b/src/ered_cluster.erl @@ -67,7 +67,6 @@ convergence_check = nok :: convergence_check(), info_pid = [] :: [pid()], - update_delay = 1000, % 1s delay between slot map update requests client_opts = [], update_slot_wait = 500, min_replicas = 0, @@ -486,20 +485,23 @@ start_periodic_slot_info_request(PreferredNodes, State) -> %% see if they are available. If they are hostnames that map %% to IP addresses and all IP addresses of the cluster have %% changed, then this helps us rediscover the cluster. - start_clients(State#st.initial_nodes, State); + State1 = start_clients(State#st.initial_nodes, State), + start_update_slot_timer([], State1); Node -> send_slot_info_request(Node, State), - Tref = erlang:start_timer( - State#st.update_slot_wait, - self(), - {time_to_update_slots, - lists:delete(Node, PreferredNodes)}), - State#st{slot_timer_ref = Tref} + start_update_slot_timer(lists:delete(Node, PreferredNodes), State) end; _Else -> State end. +start_update_slot_timer(PreferredNodes, State) -> + Tref = erlang:start_timer( + State#st.update_slot_wait, + self(), + {time_to_update_slots, PreferredNodes}), + State#st{slot_timer_ref = Tref}. + stop_periodic_slot_info_request(State) -> case State#st.slot_timer_ref of none -> @@ -664,7 +666,7 @@ start_clients(Addrs, State) -> {State#st.nodes, State#st.closing}, Addrs), - State#st{nodes = maps:merge(State#st.nodes, NewNodes), + State#st{nodes = NewNodes, pending = sets:union(State#st.pending, sets:subtract(new_set(maps:keys(NewNodes)), State#st.up)), diff --git a/test/ered_SUITE.erl b/test/ered_SUITE.erl index 84741b4..ce0eb0b 100644 --- a/test/ered_SUITE.erl +++ b/test/ered_SUITE.erl @@ -16,6 +16,7 @@ all() -> t_manual_failover, t_manual_failover_then_old_master_down, t_blackhole, + t_blackhole_all_nodes, t_init_timeout, t_empty_slotmap, t_empty_initial_slotmap, @@ -467,6 +468,63 @@ t_blackhole(_) -> no_more_msgs(). +t_blackhole_all_nodes(_) -> + %% Simulate that all nodes are unreachable, e.g. a network failure. We use + %% 'docket pause', similar to sending SIGSTOP to a process, to make the + %% nodes unresponsive. This makes TCP recv() and connect() time out. + CloseWait = 2000, % default is 10000 + NodeDownTimeout = 2000, % default is 2000 + ResponseTimeout = 10000, % default is 10000 + R = start_cluster([{close_wait, CloseWait}, + %% Require replicas for 'cluster OK'. + {min_replicas, 1}, + {client_opts, + [{node_down_timeout, NodeDownTimeout}, + {connection_opts, + [{response_timeout, ResponseTimeout}]}]} + ]), + + %% Pause all nodes + lists:foreach(fun(Port) -> + Pod = get_pod_name_from_port(Port), + ct:pal("Pausing container: " ++ os:cmd("docker pause " ++ Pod)) + end, ?PORTS), + + %% Send PING to all nodes and expect closed sockets, error replies for sent requests, + %% and a report that the cluster is not ok. + TestPid = self(), + AddrToPid = ered:get_addr_to_client_map(R), + maps:foreach(fun(_ClientAddr, ClientPid) -> + ered:command_client_async(ClientPid, [<<"PING">>], + fun(Reply) -> TestPid ! {ping_reply, Reply} end) + end, AddrToPid), + + [?MSG(#{msg_type := socket_closed, reason := {recv_exit, timeout}, addr := {"127.0.0.1", Port}}, + ResponseTimeout + 1000) || Port <- ?PORTS], + ?MSG({ping_reply, {error, _Reason1}}, NodeDownTimeout + 1000), + ?MSG({ping_reply, {error, _Reason2}}, NodeDownTimeout + 1000), + ?MSG({ping_reply, {error, _Reason3}}, NodeDownTimeout + 1000), + ?MSG({ping_reply, {error, _Reason4}}, NodeDownTimeout + 1000), + ?MSG({ping_reply, {error, _Reason5}}, NodeDownTimeout + 1000), + ?MSG({ping_reply, {error, _Reason6}}, NodeDownTimeout + 1000), + [?MSG(#{msg_type := node_down_timeout, addr := {"127.0.0.1", Port}}) || Port <- ?PORTS], + ?MSG(#{msg_type := cluster_not_ok, reason := master_down}), + + %% Unpause all nodes + lists:foreach(fun(Port) -> + Pod = get_pod_name_from_port(Port), + ct:pal("Unpausing container: " ++ os:cmd("docker unpause " ++ Pod)) + end, ?PORTS), + timer:sleep(500), + + wait_for_consistent_cluster(), + + %% Expect connects and a cluster ok. + [?MSG(#{msg_type := connected, addr := {"127.0.0.1", Port}}, 10000) || Port <- ?PORTS], + ?MSG(#{msg_type := cluster_ok}, 10000), + + no_more_msgs(). + t_init_timeout(_) -> Opts = [