Skip to content

Commit

Permalink
Increased logging on flaky test
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Jul 13, 2023
1 parent 7cfb5b6 commit 514149c
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 35 deletions.
4 changes: 2 additions & 2 deletions tests/NATS.Client.Core.Tests/NatsConnectionTest.Auth.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public async Task UserCredentialAuthTest(string name, string serverConfig, NatsO
.AddServerConfig(serverConfig)
.Build();

await using var server = new NatsServer(_output, _transportType, serverOptions);
await using var server = new NatsServer(_output, serverOptions);

var subject = Guid.NewGuid().ToString("N");

Expand Down Expand Up @@ -141,7 +141,7 @@ public async Task UserCredentialAuthTest(string name, string serverConfig, NatsO
await disconnectSignal2;

_output.WriteLine("START NEW SERVER");
await using var newServer = new NatsServer(_output, _transportType, serverOptions);
await using var newServer = new NatsServer(_output, serverOptions);
await subConnection.ConnectAsync(); // wait open again
await pubConnection.ConnectAsync(); // wait open again

Expand Down
5 changes: 3 additions & 2 deletions tests/NATS.Client.Core.Tests/NatsConnectionTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,11 @@ public async Task ReconnectSingleTest()
{
using var options = new NatsServerOptions
{
TransportType = _transportType,
EnableWebSocket = _transportType == TransportType.WebSocket,
ServerDisposeReturnsPorts = false,
};
await using var server = new NatsServer(_output, _transportType, options);
await using var server = new NatsServer(_output, options);
var subject = Guid.NewGuid().ToString();

await using var subConnection = server.CreateClientConnection();
Expand Down Expand Up @@ -211,7 +212,7 @@ await Retry.Until(

// start new nats server on same port
_output.WriteLine("START NEW SERVER");
await using var newServer = new NatsServer(_output, _transportType, options);
await using var newServer = new NatsServer(_output, options);
await subConnection.ConnectAsync(); // wait open again
await pubConnection.ConnectAsync(); // wait open again

Expand Down
42 changes: 24 additions & 18 deletions tests/NATS.Client.Core.Tests/ProtocolTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,22 @@ await Retry.Until(
[Fact]
public async Task Unsubscribe_max_msgs()
{
const int maxMsgs = 10;
const int pubMsgs = 5;
const int extraMsgs = 3;

void Log(string text)
{
_output.WriteLine($"[TESTS] {DateTime.Now:HH:mm:ss.fff} {text}");
}

// Use a single server to test multiple scenarios to make test runs more efficient
await using var server = new NatsServer();
await using var server = new NatsServer(_output, new NatsServerOptionsBuilder().UseTransport(TransportType.Tcp).Trace().Build());
var (nats, proxy) = server.CreateProxiedClientConnection();
var sid = 0;

// Auto-unsubscribe after consuming max-msgs
Log("### Auto-unsubscribe after consuming max-msgs");
{
const int maxMsgs = 99;
var opts = new NatsSubOpts { MaxMsgs = maxMsgs };
await using var sub = await nats.SubscribeAsync<int>("foo", opts);
sid++;
Expand All @@ -173,8 +181,8 @@ public async Task Unsubscribe_max_msgs()
Assert.Equal($"SUB foo {sid}", proxy.Frames[0].Message);
Assert.Equal($"UNSUB {sid} {maxMsgs}", proxy.Frames[1].Message);

// send more messages than max to check we only get max
for (var i = 0; i < maxMsgs + 10; i++)
Log("Send more messages than max to check we only get max");
for (var i = 0; i < maxMsgs + extraMsgs; i++)
{
await nats.PublishAsync("foo", i);
}
Expand All @@ -192,7 +200,7 @@ public async Task Unsubscribe_max_msgs()
Assert.Equal(NatsSubEndReason.MaxMsgs, sub.EndReason);
}

// Manual unsubscribe
Log("### Manual unsubscribe");
{
await proxy.FlushFramesAsync(nats);

Expand All @@ -205,13 +213,13 @@ public async Task Unsubscribe_max_msgs()
Assert.Equal($"SUB foo2 {sid}", proxy.ClientFrames[0].Message);
Assert.Equal($"UNSUB {sid}", proxy.ClientFrames[1].Message);

// send messages to check we receive none since we're already unsubscribed
for (var i = 0; i < 100; i++)
Log("Send messages to check we receive none since we're already unsubscribed");
for (var i = 0; i < pubMsgs; i++)
{
await nats.PublishAsync("foo2", i);
}

await Retry.Until("all pub frames arrived", () => proxy.Frames.Count(f => f.Message.StartsWith("PUB foo2")) == 100);
await Retry.Until("all pub frames arrived", () => proxy.Frames.Count(f => f.Message.StartsWith("PUB foo2")) == pubMsgs);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;
Expand All @@ -225,12 +233,10 @@ public async Task Unsubscribe_max_msgs()
Assert.Equal(NatsSubEndReason.None, sub.EndReason);
}

// Reconnect
Log("### Reconnect");
{
proxy.Reset();

const int maxMsgs = 100;
const int pubMsgs = 10;
var opts = new NatsSubOpts { MaxMsgs = maxMsgs };
var sub = await nats.SubscribeAsync<int>("foo3", opts);
sid++;
Expand All @@ -243,22 +249,22 @@ public async Task Unsubscribe_max_msgs()
await nats.PublishAsync("foo3", i);
}

await Retry.Until("published", () => proxy.Frames.Count(f => f.Message.StartsWith("PUB foo3")) == 10);
await Retry.Until("received", () => Volatile.Read(ref count) == 10);
await Retry.Until("published", () => proxy.Frames.Count(f => f.Message.StartsWith("PUB foo3")) == pubMsgs);
await Retry.Until("received", () => Volatile.Read(ref count) == pubMsgs);

var pending = maxMsgs - pubMsgs;
Assert.Equal(pending, ((INatsSub)sub).PendingMsgs);

proxy.Reset();

// SUB + UNSUB
Log("Expect SUB + UNSUB");
await Retry.Until("re-subscribed", () => proxy.ClientFrames.Count == 2);

// Make sure we're still using the same SID
Log("Make sure we're still using the same SID");
Assert.Equal($"SUB foo3 {sid}", proxy.ClientFrames[0].Message);
Assert.Equal($"UNSUB {sid} {pending}", proxy.ClientFrames[1].Message);

// We already published a few, this should exceed max-msgs
Log("We already published a few, this should exceed max-msgs");
for (var i = 0; i < maxMsgs; i++)
{
await nats.PublishAsync("foo3", i);
Expand All @@ -272,7 +278,7 @@ await Retry.Until(
"unsubscribed with max-msgs",
() => sub.EndReason == NatsSubEndReason.MaxMsgs);

Assert.Equal(Volatile.Read(ref count), maxMsgs);
Assert.Equal(maxMsgs, Volatile.Read(ref count));

await sub.DisposeAsync();
await reg;
Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Client.JetStream.Tests/JetStreamTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class JetStreamTest
[Fact]
public async Task Create_stream_test()
{
await using var server = new NatsServer(new NullOutputHelper(), TransportType.Tcp, new NatsServerOptionsBuilder().UseTransport(TransportType.Tcp).UseJetStream().Build());
await using var server = new NatsServer(new NullOutputHelper(), new NatsServerOptionsBuilder().UseTransport(TransportType.Tcp).UseJetStream().Build());
await using var nats = server.CreateClientConnection();

// Create stream
Expand Down
26 changes: 16 additions & 10 deletions tests/NATS.Client.TestUtilities/NatsServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ public NatsServer()
}

public NatsServer(ITestOutputHelper outputHelper, TransportType transportType)
: this(outputHelper, transportType, new NatsServerOptionsBuilder().UseTransport(transportType).Build())
: this(outputHelper, new NatsServerOptionsBuilder().UseTransport(transportType).Build())
{
}

public NatsServer(ITestOutputHelper outputHelper, TransportType transportType, NatsServerOptions options)
public NatsServer(ITestOutputHelper outputHelper, NatsServerOptions options)
{
_outputHelper = outputHelper;
_transportType = transportType;
_transportType = options.TransportType;
Options = options;
_configFileName = Path.GetTempFileName();
var config = options.ConfigFileContents;
Expand Down Expand Up @@ -183,7 +183,7 @@ public async ValueTask DisposeAsync()
throw new Exception("Tapped mode doesn't work wit TLS");
}

var proxy = new NatsProxy(Options.ServerPort, _outputHelper);
var proxy = new NatsProxy(Options.ServerPort, _outputHelper, Options.Trace);

var client = new NatsConnection((options ?? NatsOptions.Default) with
{
Expand Down Expand Up @@ -254,16 +254,19 @@ public NatsCluster(ITestOutputHelper outputHelper, TransportType transportType)
{
var opts1 = new NatsServerOptions
{
TransportType = transportType,
EnableWebSocket = transportType == TransportType.WebSocket,
EnableClustering = true,
};
var opts2 = new NatsServerOptions
{
TransportType = transportType,
EnableWebSocket = transportType == TransportType.WebSocket,
EnableClustering = true,
};
var opts3 = new NatsServerOptions
{
TransportType = transportType,
EnableWebSocket = transportType == TransportType.WebSocket,
EnableClustering = true,
};
Expand All @@ -273,9 +276,9 @@ public NatsCluster(ITestOutputHelper outputHelper, TransportType transportType)
opt.SetRoutes(routes);
}

Server1 = new NatsServer(outputHelper, transportType, opts1);
Server2 = new NatsServer(outputHelper, transportType, opts2);
Server3 = new NatsServer(outputHelper, transportType, opts3);
Server1 = new NatsServer(outputHelper, opts1);
Server2 = new NatsServer(outputHelper, opts2);
Server3 = new NatsServer(outputHelper, opts3);
}

public NatsServer Server1 { get; }
Expand All @@ -295,15 +298,17 @@ public async ValueTask DisposeAsync()
public class NatsProxy : IDisposable
{
private readonly ITestOutputHelper _outputHelper;
private readonly bool _trace;
private readonly TcpListener _tcpListener;
private readonly List<TcpClient> _clients = new();
private readonly List<Frame> _frames = new();
private readonly Stopwatch _watch = new();
private int _syncCount;

public NatsProxy(int port, ITestOutputHelper outputHelper)
public NatsProxy(int port, ITestOutputHelper outputHelper, bool trace)
{
_outputHelper = outputHelper;
_trace = trace;
_tcpListener = new TcpListener(IPAddress.Loopback, 0);
_tcpListener.Start();
_watch.Restart();
Expand Down Expand Up @@ -522,12 +527,13 @@ private bool NatsProtoDump(int client, string origin, TextReader sr, TextWriter

private void AddFrame(Frame frame)
{
// Log($"Dump {frame}");
if (_trace)
Log($"TRACE {frame}");
lock (_frames)
_frames.Add(frame);
}

private void Log(string text) => _outputHelper.WriteLine($"{DateTime.Now:HH:mm:ss.fff} [PROXY] {text}");
private void Log(string text) => _outputHelper.WriteLine($"[PROXY] {DateTime.Now:HH:mm:ss.fff} {text}");

public record Frame(TimeSpan Timestamp, int Client, string Origin, string Message);
}
Expand Down
22 changes: 22 additions & 0 deletions tests/NATS.Client.TestUtilities/NatsServerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public sealed class NatsServerOptionsBuilder
private string? _tlsServerCertFile;
private string? _tlsServerKeyFile;
private string? _tlsCaFile;
private TransportType? _transportType;
private bool _trace;

public NatsServerOptions Build()
{
Expand All @@ -32,11 +34,21 @@ public NatsServerOptions Build()
TlsServerKeyFile = _tlsServerKeyFile,
TlsCaFile = _tlsCaFile,
ExtraConfigs = _extraConfigs,
TransportType = _transportType ?? TransportType.Tcp,
Trace = _trace,
};
}

public NatsServerOptionsBuilder Trace()
{
_trace = true;
return this;
}

public NatsServerOptionsBuilder UseTransport(TransportType transportType)
{
_transportType = transportType;

if (transportType == TransportType.Tls)
{
_enableTls = true;
Expand Down Expand Up @@ -116,6 +128,10 @@ public NatsServerOptions()

public string? TlsCaFile { get; init; }

public TransportType TransportType { get; init; }

public bool Trace { get; init; }

public List<string> ExtraConfigs { get; init; } = new();

public int ServerPort => _lazyServerPort.Value;
Expand All @@ -130,6 +146,12 @@ public string ConfigFileContents
{
var sb = new StringBuilder();
sb.AppendLine($"port: {ServerPort}");

if (Trace)
{
sb.AppendLine($"trace: true");
}

if (EnableWebSocket)
{
sb.AppendLine("websocket {");
Expand Down
4 changes: 2 additions & 2 deletions tests/NATS.Client.TestUtilities/OutputHelperLogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Except
{
try
{
_testOutputHelper.WriteLine(formatter(state, exception));
_testOutputHelper.WriteLine($"[NCLOG] {DateTime.Now:HH:mm:ss.fff} {logLevel}: {formatter(state, exception)}");
if (exception != null)
{
_testOutputHelper.WriteLine(exception.ToString());
_testOutputHelper.WriteLine($"[NCLOG] {DateTime.Now:HH:mm:ss.fff} Exception: {exception}");
}
}
catch
Expand Down

0 comments on commit 514149c

Please sign in to comment.