From b527abed171734d5dd8607e635bbc7df338c3549 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Mon, 2 Oct 2023 16:30:53 +0100 Subject: [PATCH] Fixing test flappers (#141) * Fixing test flappers * Fixing test that potentially hangs * Fix consume pending msgs to stay positive In test sometimes we're receiving batch requests more than the requested maxMsgs. To eliminate the possibility of having batch size going over the max now we can make sure the pending messages doesn't go below zero, os that there is no way for a pull request batch to go over max msgs. * Fixed request reply test timeout * Fixed extraneous pull request In some cases in test we send an unnecessary pull request with batch=0 the only place this could happen is in CheckPending() so we add an additional check for pending messages there. * Avoid pending update races * Hunting consume heartbeat test failure There is an extraneous pull request sent at the very beginning of the consume. * Fixed format and warnings * Fix initial pull request race In rare cases we were getting two pull requests issued back-to-back, one for the initialization then right after that with an incoming message triggering another request because the pending numbers were nor reset soon enough after the initial request. initialization | message loop ------------------------------------------ pull() | | incoming_message() | check_pending() (*)--> reset_pending() | | pull() | reset_pending() (*) Too late. Check thinks we don't have enough pending messages so issues another pull request. By setting setting the pending requests in constructor to full values, assuming a pull request will be done right after subscription solves the race condition where reset_pending() might happen just after the first message. initialization | message loop ------------------------------------------ reset_pending() | pull() | | incoming_message() | check_pending() (*)--> | | (*) Now check thinks we have enough pending messages so doesn't issues a pull request. --- .github/workflows/test.yml | 3 +- .../Internal/NatsJSConsume.cs | 163 +++++++++++------- src/NATS.Client.JetStream/NatsJSConsumer.cs | 5 +- src/NATS.Client.JetStream/NatsJSMsg.cs | 2 +- .../RequestReplyTest.cs | 19 +- .../ManageStreamTest.cs | 23 +-- 6 files changed, 133 insertions(+), 82 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index eea223ad3..25aad012a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -48,7 +48,8 @@ jobs: run: dotnet test -c Debug --no-build tests/NATS.Client.Core.Tests/NATS.Client.Core.Tests.csproj - name: Test JetStream - run: dotnet test -c Debug --no-build tests/NATS.Client.JetStream.Tests/NATS.Client.JetStream.Tests.csproj + # This test is hanging sometimes. Find out where! + run: dotnet test -c Debug --no-build --logger:"console;verbosity=normal" tests/NATS.Client.JetStream.Tests/NATS.Client.JetStream.Tests.csproj memory_test: name: memory test diff --git a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs index 1a9a59e0d..64b7e2df6 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs @@ -8,12 +8,19 @@ namespace NATS.Client.JetStream.Internal; +internal struct PullRequest +{ + public ConsumerGetnextRequest Request { get; init; } + + public string Origin { get; init; } +} + internal class NatsJSConsume : NatsSubBase, INatsJSConsume { private readonly ILogger _logger; private readonly bool _debug; private readonly Channel> _userMsgs; - private readonly Channel _pullRequests; + private readonly Channel _pullRequests; private readonly NatsJSContext _context; private readonly string _stream; private readonly string _consumer; @@ -29,6 +36,7 @@ internal class NatsJSConsume : NatsSubBase, INatsJSConsume private readonly long _maxBytes; private readonly long _thresholdBytes; + private readonly object _pendingGate = new(); private long _pendingMsgs; private long _pendingBytes; @@ -83,13 +91,13 @@ public NatsJSConsume( static state => { var self = (NatsJSConsume)state!; - self.Pull(self._maxMsgs, self._maxBytes); + self.Pull("heartbeat-timeout", self._maxMsgs, self._maxBytes); self.ResetPending(); if (self._debug) { self._logger.LogDebug( NatsJSLogEvents.IdleTimeout, - "Idle heartbeat timed-out after {Timeout}ns", + "Idle heartbeat timeout after {Timeout}ns", self._idle); } }, @@ -100,27 +108,30 @@ public NatsJSConsume( _userMsgs = Channel.CreateBounded>(NatsSub.GetChannelOpts(opts?.ChannelOpts)); Msgs = _userMsgs.Reader; - _pullRequests = Channel.CreateBounded(NatsSub.GetChannelOpts(opts?.ChannelOpts)); + _pullRequests = Channel.CreateBounded(NatsSub.GetChannelOpts(opts?.ChannelOpts)); _pullTask = Task.Run(PullLoop); + + ResetPending(); } public ChannelReader> Msgs { get; } public void Stop() => EndSubscription(NatsSubEndReason.None); - public ValueTask CallMsgNextAsync(ConsumerGetnextRequest request, CancellationToken cancellationToken = default) => - Connection.PubModelAsync( + public ValueTask CallMsgNextAsync(string origin, ConsumerGetnextRequest request, CancellationToken cancellationToken = default) + { + if (_debug) + { + _logger.LogDebug("Sending pull request for {Origin} {Msgs}, {Bytes}", origin, request.Batch, request.MaxBytes); + } + + return Connection.PubModelAsync( subject: $"{_context.Opts.Prefix}.CONSUMER.MSG.NEXT.{_stream}.{_consumer}", data: request, serializer: NatsJsonSerializer.Default, replyTo: Subject, headers: default, cancellationToken); - - public void ResetPending() - { - _pendingMsgs = _maxMsgs; - _pendingBytes = _maxBytes; } public void ResetHeartbeatTimer() => _timer.Change(_hbTimeout, Timeout.Infinite); @@ -176,25 +187,28 @@ protected override async ValueTask ReceiveInternalAsync( { if (long.TryParse(natsPendingMsgs, out var pendingMsgs)) { - if (_debug) - { - _logger.LogDebug( - NatsJSLogEvents.PendingCount, - "Header pending messages current {Pending}", - _pendingMsgs); - } - - _pendingMsgs -= pendingMsgs; - if (_pendingMsgs < 0) - _pendingMsgs = 0; - - if (_debug) + lock (_pendingGate) { - _logger.LogDebug( - NatsJSLogEvents.PendingCount, - "Header pending messages {Header} {Pending}", - natsPendingMsgs, - _pendingMsgs); + if (_debug) + { + _logger.LogDebug( + NatsJSLogEvents.PendingCount, + "Header pending messages current {Pending}", + _pendingMsgs); + } + + _pendingMsgs -= pendingMsgs; + if (_pendingMsgs < 0) + _pendingMsgs = 0; + + if (_debug) + { + _logger.LogDebug( + NatsJSLogEvents.PendingCount, + "Header pending messages {Header} {Pending}", + natsPendingMsgs, + _pendingMsgs); + } } } else @@ -207,18 +221,21 @@ protected override async ValueTask ReceiveInternalAsync( { if (long.TryParse(natsPendingBytes, out var pendingBytes)) { - if (_debug) + lock (_pendingGate) { - _logger.LogDebug(NatsJSLogEvents.PendingCount, "Header pending bytes current {Pending}", _pendingBytes); - } - - _pendingBytes -= pendingBytes; - if (_pendingBytes < 0) - _pendingBytes = 0; - - if (_debug) - { - _logger.LogDebug(NatsJSLogEvents.PendingCount, "Header pending bytes {Header} {Pending}", natsPendingBytes, _pendingBytes); + if (_debug) + { + _logger.LogDebug(NatsJSLogEvents.PendingCount, "Header pending bytes current {Pending}", _pendingBytes); + } + + _pendingBytes -= pendingBytes; + if (_pendingBytes < 0) + _pendingBytes = 0; + + if (_debug) + { + _logger.LogDebug(NatsJSLogEvents.PendingCount, "Header pending bytes {Header} {Pending}", natsPendingBytes, _pendingBytes); + } } } else @@ -276,14 +293,21 @@ protected override async ValueTask ReceiveInternalAsync( _serializer), _context); - _pendingMsgs--; + lock (_pendingGate) + { + if (_pendingMsgs > 0) + _pendingMsgs--; + } if (_maxBytes > 0) { if (_debug) _logger.LogDebug(NatsJSLogEvents.MessageProperty, "Message size {Size}", msg.Size); - _pendingBytes -= msg.Size; + lock (_pendingGate) + { + _pendingBytes -= msg.Size; + } } await _userMsgs.Writer.WriteAsync(msg).ConfigureAwait(false); @@ -298,42 +322,59 @@ protected override void TryComplete() _userMsgs.Writer.TryComplete(); } - private void CheckPending() + private void ResetPending() { - if (_maxBytes > 0 && _pendingBytes <= _thresholdBytes) + lock (_pendingGate) { - if (_debug) - _logger.LogDebug(NatsJSLogEvents.PendingCount, "Check pending bytes {Pending}", _pendingBytes); - - Pull(_maxMsgs, _maxBytes - _pendingBytes); - ResetPending(); + _pendingMsgs = _maxMsgs; + _pendingBytes = _maxBytes; } - else if (_maxBytes == 0 && _pendingMsgs <= _thresholdMsgs) + } + + private void CheckPending() + { + lock (_pendingGate) { - if (_debug) - _logger.LogDebug(NatsJSLogEvents.PendingCount, "Check pending messages {Pending}", _pendingMsgs); + if (_maxBytes > 0 && _pendingBytes <= _thresholdBytes) + { + if (_debug) + _logger.LogDebug(NatsJSLogEvents.PendingCount, "Check pending bytes {Pending}, {MaxBytes}", _pendingBytes, _maxBytes); + + Pull("chk-bytes", _maxMsgs, _maxBytes - _pendingBytes); + ResetPending(); + } + else if (_maxBytes == 0 && _pendingMsgs <= _thresholdMsgs && _pendingMsgs < _maxMsgs) + { + if (_debug) + _logger.LogDebug(NatsJSLogEvents.PendingCount, "Check pending messages {Pending}, {MaxMsgs}", _pendingMsgs, _maxMsgs); - Pull(_maxMsgs - _pendingMsgs, 0); - ResetPending(); + Pull("chk-msgs", _maxMsgs - _pendingMsgs, 0); + ResetPending(); + } } } - private void Pull(long batch, long maxBytes) => _pullRequests.Writer.TryWrite(new ConsumerGetnextRequest + private void Pull(string origin, long batch, long maxBytes) => _pullRequests.Writer.TryWrite(new PullRequest { - Batch = batch, - MaxBytes = maxBytes, - IdleHeartbeat = _idle, - Expires = _expires, + Request = new ConsumerGetnextRequest + { + Batch = batch, + MaxBytes = maxBytes, + IdleHeartbeat = _idle, + Expires = _expires, + }, + Origin = origin, }); private async Task PullLoop() { await foreach (var pr in _pullRequests.Reader.ReadAllAsync()) { - await CallMsgNextAsync(pr).ConfigureAwait(false); + var origin = $"pull-loop({pr.Origin})"; + await CallMsgNextAsync(origin, pr.Request).ConfigureAwait(false); if (_debug) { - _logger.LogDebug(NatsJSLogEvents.PullRequest, "Pull request issued for {Batch}, {MaxBytes}", pr.Batch, pr.MaxBytes); + _logger.LogDebug(NatsJSLogEvents.PullRequest, "Pull request issued for {Origin} {Batch}, {MaxBytes}", origin, pr.Request.Batch, pr.Request.MaxBytes); } } } diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs index 86d8a357c..800435288 100644 --- a/src/NATS.Client.JetStream/NatsJSConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs @@ -78,6 +78,8 @@ public async ValueTask> ConsumeAsync(NatsJSConsumeOpts? opt { ThrowIfDeleted(); + opts ??= new NatsJSConsumeOpts(); + var inbox = _context.NewInbox(); var max = NatsJSOptsDefaults.SetMax(opts.MaxMsgs, opts.MaxBytes, opts.ThresholdMsgs, opts.ThresholdBytes); @@ -117,7 +119,9 @@ await _context.Connection.SubAsync( sub: sub, cancellationToken); + // Start consuming with the first Pull Request await sub.CallMsgNextAsync( + "init", new ConsumerGetnextRequest { Batch = max.MaxMsgs, @@ -127,7 +131,6 @@ await sub.CallMsgNextAsync( }, cancellationToken); - sub.ResetPending(); sub.ResetHeartbeatTimer(); return sub; diff --git a/src/NATS.Client.JetStream/NatsJSMsg.cs b/src/NATS.Client.JetStream/NatsJSMsg.cs index ed4e49149..5b5d040ec 100644 --- a/src/NATS.Client.JetStream/NatsJSMsg.cs +++ b/src/NATS.Client.JetStream/NatsJSMsg.cs @@ -109,7 +109,7 @@ private ValueTask SendAckAsync(ReadOnlySequence payload, AckOpts opts = de return _msg.ReplyAsync( payload: payload, - opts: new NatsPubOpts {WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent,}, + opts: new NatsPubOpts { WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent }, cancellationToken: cancellationToken); } } diff --git a/tests/NATS.Client.Core.Tests/RequestReplyTest.cs b/tests/NATS.Client.Core.Tests/RequestReplyTest.cs index 699c7efc1..f60b4993e 100644 --- a/tests/NATS.Client.Core.Tests/RequestReplyTest.cs +++ b/tests/NATS.Client.Core.Tests/RequestReplyTest.cs @@ -12,21 +12,26 @@ public class RequestReplyTest [Fact] public async Task Simple_request_reply_test() { - // Trace to hunt flapper! - await using var server = NatsServer.StartWithTrace(_output); - + await using var server = NatsServer.Start(); await using var nats = server.CreateClientConnection(); - var sub = await nats.SubscribeAsync("foo"); + const string subject = "foo"; + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + var cancellationToken = cts.Token; + + var sub = await nats.SubscribeAsync(subject, cancellationToken: cancellationToken); var reg = sub.Register(async msg => { - await msg.ReplyAsync(msg.Data * 2); + await msg.ReplyAsync(msg.Data * 2, cancellationToken: cancellationToken); }); + var natsSubOpts = new NatsSubOpts { Timeout = TimeSpan.FromSeconds(10) }; + for (var i = 0; i < 10; i++) { - var rep = await nats.RequestAsync("foo", i); - Assert.Equal(i * 2, rep?.Data); + var rep = await nats.RequestAsync(subject, i, replyOpts: natsSubOpts, cancellationToken: cancellationToken) ?? throw new TimeoutException("Request timeout"); + + Assert.Equal(i * 2, rep.Data); } await sub.DisposeAsync(); diff --git a/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs b/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs index 3f00bdbfd..12c9f9dfe 100644 --- a/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs @@ -12,45 +12,46 @@ public class ManageStreamTest [Fact] public async Task Account_info_create_get_update_stream() { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var cancellationToken = cts.Token; + await using var server = NatsServer.StartJS(); var nats = server.CreateClientConnection(); var js = new NatsJSContext(nats); // Account Info { - var accountInfo = await js.GetAccountInfoAsync(); + var accountInfo = await js.GetAccountInfoAsync(cancellationToken); Assert.Equal(0, accountInfo.Streams); } // Create { - var stream = await js.CreateStreamAsync(request: new StreamConfiguration - { - Name = "events", - Subjects = new[] { "events.*" }, - }); + var stream = await js.CreateStreamAsync( + request: new StreamConfiguration { Name = "events", Subjects = new[] { "events.*" } }, + cancellationToken: cancellationToken); Assert.Equal("events", stream.Info.Config.Name); - var accountInfo = await js.GetAccountInfoAsync(); + var accountInfo = await js.GetAccountInfoAsync(cancellationToken); Assert.Equal(1, accountInfo.Streams); } // Get { - var stream = await js.GetStreamAsync("events"); + var stream = await js.GetStreamAsync("events", cancellationToken); Assert.Equal("events", stream.Info.Config.Name); Assert.Equal(new[] { "events.*" }, stream.Info.Config.Subjects); } // Update { - var stream1 = await js.GetStreamAsync("events"); + var stream1 = await js.GetStreamAsync("events", cancellationToken); Assert.Equal(-1, stream1.Info.Config.MaxMsgs); - var stream2 = await js.UpdateStreamAsync(new StreamUpdateRequest { Name = "events", MaxMsgs = 10 }); + var stream2 = await js.UpdateStreamAsync(new StreamUpdateRequest { Name = "events", MaxMsgs = 10 }, cancellationToken); Assert.Equal(10, stream2.Info.Config.MaxMsgs); - var stream3 = await js.GetStreamAsync("events"); + var stream3 = await js.GetStreamAsync("events", cancellationToken); Assert.Equal(10, stream3.Info.Config.MaxMsgs); } }