Skip to content

Commit

Permalink
RavenDB-19585 - change updateTopologyTimer to fetch and update topolo…
Browse files Browse the repository at this point in the history
…gy from all nodes to encourage needed failover
  • Loading branch information
lastav5 authored and ppekrol committed Dec 14, 2023
1 parent 8f5ec5d commit 0d3ca12
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 30 deletions.
45 changes: 15 additions & 30 deletions src/Raven.Client/Http/RequestExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ public class RequestExecutor : IDisposable
private readonly string _databaseName;

private static readonly Logger Logger = LoggingSource.Instance.GetLogger<RequestExecutor>("Client");
private DateTime _lastReturnedResponse;


public readonly JsonContextPool ContextPool;

public readonly AsyncLocal<AggressiveCacheOptions> AggressiveCaching = new AsyncLocal<AggressiveCacheOptions>();
Expand Down Expand Up @@ -326,9 +325,7 @@ protected RequestExecutor(string databaseName, X509Certificate2 certificate, Doc

_databaseName = databaseName;
Certificate = certificate;

_lastReturnedResponse = DateTime.UtcNow;


Conventions = conventions.Clone();

var maxNumberOfContextsToKeepInGlobalStack = PlatformDetails.Is32Bits == false
Expand Down Expand Up @@ -674,40 +671,29 @@ private async Task WaitForTopologyUpdate(Task topologyUpdate)
}
}

private void UpdateTopologyCallback(object _)
internal async void UpdateTopologyCallback(object _)
{
var time = DateTime.UtcNow;
if (time - _lastReturnedResponse <= TimeSpan.FromMinutes(5))
return;

ServerNode serverNode;

try
{
var selector = _nodeSelector;
if (selector == null)
return;
var preferredNode = selector.GetPreferredNode();
serverNode = preferredNode.Node;
}
catch (Exception e)
{
if (Logger.IsInfoEnabled)
Logger.Info("Couldn't get preferred node Topology from _updateTopologyTimer task", e);
var selector = _nodeSelector;
if (selector == null || selector.Topology == null)
return;
}
GC.KeepAlive(Task.Run(async () =>

// Fetch topologies from all nodes, the executor's topology will be updated to the most recent one
foreach (var serverNode in selector.Topology.Nodes)
{
try
{
await UpdateTopologyAsync(new UpdateTopologyParameters(serverNode) { TimeoutInMs = 0, DebugTag = "timer-callback" }).ConfigureAwait(false);
if(serverNode.ServerRole != ServerNode.Role.Member)
continue;

await UpdateTopologyAsync(new UpdateTopologyParameters(serverNode) {TimeoutInMs = 0, DebugTag = $"timer-callback-node-{serverNode.ClusterTag}"})
.ConfigureAwait(false);
}
catch (Exception e)
{
if (Logger.IsInfoEnabled)
Logger.Info("Couldn't Update Topology from _updateTopologyTimer task", e);
Logger.Info($"Couldn't Update Topology from _updateTopologyTimer task when fetching from node {serverNode.ClusterTag}", e);
}
}));
}
}

protected async Task SingleTopologyUpdateAsync(string[] initialUrls, Guid? applicationIdentifier = null)
Expand Down Expand Up @@ -995,7 +981,6 @@ public async Task ExecuteAsync<TResult>(

OnSucceedRequest?.Invoke(this, new SucceedRequestEventArgs(_databaseName, url, response, request, attemptNum));
responseDispose = await command.ProcessResponse(context, Cache, response, url).ConfigureAwait(false);
_lastReturnedResponse = DateTime.UtcNow;
}
finally
{
Expand Down
60 changes: 60 additions & 0 deletions test/RachisTests/DatabaseCluster/ClusterDatabaseMaintenance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,66 @@ public async Task ChangeUrlOfSingleNodeCluster()
}
}

[RavenFact(RavenTestCategory.ClientApi)]
public async Task ClientShouldFailoverWhenTalkingToLoneDisconnectedNode()
{
var (nodes, leader) = await CreateRaftCluster(3, leaderIndex:0, shouldRunInMemory: false, watcherCluster: true);

using (var store = GetDocumentStore(new Options()
{
Server = nodes[1],
ReplicationFactor = 3
}))
{
store.Initialize();
var re = store.GetRequestExecutor(store.Database);

await store.Maintenance.Server.SendAsync(new GetDatabaseRecordOperation(store.Database));

var selectorNodes = re._nodeSelector.Topology.Nodes;
Assert.Equal(3, selectorNodes.Count);
Assert.True(selectorNodes.All(x => x.ServerRole == ServerNode.Role.Member));

// disconnect nodes [1] from leader and [2]
var down1 = await DisposeServerAndWaitForFinishOfDisposalAsync(nodes[1]);

nodes[1] = GetNewServer(new ServerCreationOptions
{
CustomSettings = new Dictionary<string, string> { [RavenConfiguration.GetKey(x => x.Core.ServerUrls)] = down1.Url },
RunInMemory = false,
DeletePrevious = false,
DataDirectory = down1.DataDirectory
});
nodes[1].ServerStore.Engine.ForTestingPurposesOnly().NodeTagsToDisconnect.Add(nodes[0].ServerStore.NodeTag);
nodes[1].ServerStore.Engine.ForTestingPurposesOnly().NodeTagsToDisconnect.Add(nodes[2].ServerStore.NodeTag);
Servers.Add(nodes[1]);

//make sure leader and follower [2] disconnected
var db = await Databases.GetDocumentDatabaseInstanceFor(nodes[0], store);
await WaitAndAssertForValueAsync(() =>
{
var record = db.ReadDatabaseRecord();
return Task.FromResult(record.Topology.Members.Count);
}, 2);

//executor still thinks we have 3 members
selectorNodes = re._nodeSelector.Topology.Nodes;
Assert.Equal(3, selectorNodes.Count);
Assert.True(selectorNodes.All(x => x.ServerRole == ServerNode.Role.Member));

//artificially call the timer func
re.UpdateTopologyCallback(null);

//we expect a failover and an updated request executor
await WaitAndAssertForValueAsync(() =>
{
selectorNodes = re._nodeSelector.Topology.Nodes;
return Task.FromResult(selectorNodes.Count(x => x.ServerRole == ServerNode.Role.Member) == 2 &&
selectorNodes.Count(x => x.ServerRole == ServerNode.Role.Rehab) == 1);
}, true);
}
}

[Fact]
public async Task ChangeUrlOfMultiNodeCluster()
{
Expand Down

0 comments on commit 0d3ca12

Please sign in to comment.