Skip to content

Commit

Permalink
Fixing test flappers (#141)
Browse files Browse the repository at this point in the history
* Fixing test flappers

* Fixing test that potentially hangs

* Fix consume pending msgs to stay positive

In test sometimes we're receiving batch requests
more than the requested maxMsgs. To eliminate
the possibility of having batch size going over
the max now we can make sure the pending messages
doesn't go below zero, os that there is no way
for a pull request batch to go over max msgs.

* Fixed request reply test timeout

* Fixed extraneous pull request

In some cases in test we send an unnecessary pull request with batch=0
the only place this could happen is in CheckPending() so we add
an additional check for pending messages there.

* Avoid pending update races

* Hunting consume heartbeat test failure

There is an extraneous pull request sent at the very
beginning of the consume.

* Fixed format and warnings

* Fix initial pull request race

In rare cases we were getting two pull requests issued
back-to-back, one for the initialization then right after
that with an incoming message triggering another request
because the pending numbers were nor reset soon enough
after the initial request.

        initialization   |   message loop
      ------------------------------------------
       pull()            |
                         |  incoming_message()
                         |  check_pending()
(*)--> reset_pending()   |
                         |  pull()
                         |  reset_pending()

(*) Too late. Check thinks we don't have enough
    pending messages so issues another pull request.

By setting setting the pending requests in constructor
to full values, assuming a pull request will be done
right after subscription solves the race condition
where reset_pending() might happen just after the
first message.

        initialization   |   message loop
      ------------------------------------------
       reset_pending()   |
       pull()            |
                         |  incoming_message()
                         |  check_pending()
(*)-->                   |
                         |

(*) Now check thinks we have enough pending messages
   so doesn't issues a pull request.
  • Loading branch information
mtmk authored Oct 2, 2023
1 parent fabec64 commit b527abe
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 82 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ jobs:
run: dotnet test -c Debug --no-build tests/NATS.Client.Core.Tests/NATS.Client.Core.Tests.csproj

- name: Test JetStream
run: dotnet test -c Debug --no-build tests/NATS.Client.JetStream.Tests/NATS.Client.JetStream.Tests.csproj
# This test is hanging sometimes. Find out where!
run: dotnet test -c Debug --no-build --logger:"console;verbosity=normal" tests/NATS.Client.JetStream.Tests/NATS.Client.JetStream.Tests.csproj

memory_test:
name: memory test
Expand Down
163 changes: 102 additions & 61 deletions src/NATS.Client.JetStream/Internal/NatsJSConsume.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,19 @@

namespace NATS.Client.JetStream.Internal;

internal struct PullRequest
{
public ConsumerGetnextRequest Request { get; init; }

public string Origin { get; init; }
}

internal class NatsJSConsume<TMsg> : NatsSubBase, INatsJSConsume<TMsg>
{
private readonly ILogger _logger;
private readonly bool _debug;
private readonly Channel<NatsJSMsg<TMsg?>> _userMsgs;
private readonly Channel<ConsumerGetnextRequest> _pullRequests;
private readonly Channel<PullRequest> _pullRequests;
private readonly NatsJSContext _context;
private readonly string _stream;
private readonly string _consumer;
Expand All @@ -29,6 +36,7 @@ internal class NatsJSConsume<TMsg> : NatsSubBase, INatsJSConsume<TMsg>
private readonly long _maxBytes;
private readonly long _thresholdBytes;

private readonly object _pendingGate = new();
private long _pendingMsgs;
private long _pendingBytes;

Expand Down Expand Up @@ -83,13 +91,13 @@ public NatsJSConsume(
static state =>
{
var self = (NatsJSConsume<TMsg>)state!;
self.Pull(self._maxMsgs, self._maxBytes);
self.Pull("heartbeat-timeout", self._maxMsgs, self._maxBytes);
self.ResetPending();
if (self._debug)
{
self._logger.LogDebug(
NatsJSLogEvents.IdleTimeout,
"Idle heartbeat timed-out after {Timeout}ns",
"Idle heartbeat timeout after {Timeout}ns",
self._idle);
}
},
Expand All @@ -100,27 +108,30 @@ public NatsJSConsume(
_userMsgs = Channel.CreateBounded<NatsJSMsg<TMsg?>>(NatsSub.GetChannelOpts(opts?.ChannelOpts));
Msgs = _userMsgs.Reader;

_pullRequests = Channel.CreateBounded<ConsumerGetnextRequest>(NatsSub.GetChannelOpts(opts?.ChannelOpts));
_pullRequests = Channel.CreateBounded<PullRequest>(NatsSub.GetChannelOpts(opts?.ChannelOpts));
_pullTask = Task.Run(PullLoop);

ResetPending();
}

public ChannelReader<NatsJSMsg<TMsg?>> Msgs { get; }

public void Stop() => EndSubscription(NatsSubEndReason.None);

public ValueTask CallMsgNextAsync(ConsumerGetnextRequest request, CancellationToken cancellationToken = default) =>
Connection.PubModelAsync(
public ValueTask CallMsgNextAsync(string origin, ConsumerGetnextRequest request, CancellationToken cancellationToken = default)
{
if (_debug)
{
_logger.LogDebug("Sending pull request for {Origin} {Msgs}, {Bytes}", origin, request.Batch, request.MaxBytes);
}

return Connection.PubModelAsync(
subject: $"{_context.Opts.Prefix}.CONSUMER.MSG.NEXT.{_stream}.{_consumer}",
data: request,
serializer: NatsJsonSerializer.Default,
replyTo: Subject,
headers: default,
cancellationToken);

public void ResetPending()
{
_pendingMsgs = _maxMsgs;
_pendingBytes = _maxBytes;
}

public void ResetHeartbeatTimer() => _timer.Change(_hbTimeout, Timeout.Infinite);
Expand Down Expand Up @@ -176,25 +187,28 @@ protected override async ValueTask ReceiveInternalAsync(
{
if (long.TryParse(natsPendingMsgs, out var pendingMsgs))
{
if (_debug)
{
_logger.LogDebug(
NatsJSLogEvents.PendingCount,
"Header pending messages current {Pending}",
_pendingMsgs);
}

_pendingMsgs -= pendingMsgs;
if (_pendingMsgs < 0)
_pendingMsgs = 0;

if (_debug)
lock (_pendingGate)
{
_logger.LogDebug(
NatsJSLogEvents.PendingCount,
"Header pending messages {Header} {Pending}",
natsPendingMsgs,
_pendingMsgs);
if (_debug)
{
_logger.LogDebug(
NatsJSLogEvents.PendingCount,
"Header pending messages current {Pending}",
_pendingMsgs);
}

_pendingMsgs -= pendingMsgs;
if (_pendingMsgs < 0)
_pendingMsgs = 0;

if (_debug)
{
_logger.LogDebug(
NatsJSLogEvents.PendingCount,
"Header pending messages {Header} {Pending}",
natsPendingMsgs,
_pendingMsgs);
}
}
}
else
Expand All @@ -207,18 +221,21 @@ protected override async ValueTask ReceiveInternalAsync(
{
if (long.TryParse(natsPendingBytes, out var pendingBytes))
{
if (_debug)
lock (_pendingGate)
{
_logger.LogDebug(NatsJSLogEvents.PendingCount, "Header pending bytes current {Pending}", _pendingBytes);
}

_pendingBytes -= pendingBytes;
if (_pendingBytes < 0)
_pendingBytes = 0;

if (_debug)
{
_logger.LogDebug(NatsJSLogEvents.PendingCount, "Header pending bytes {Header} {Pending}", natsPendingBytes, _pendingBytes);
if (_debug)
{
_logger.LogDebug(NatsJSLogEvents.PendingCount, "Header pending bytes current {Pending}", _pendingBytes);
}

_pendingBytes -= pendingBytes;
if (_pendingBytes < 0)
_pendingBytes = 0;

if (_debug)
{
_logger.LogDebug(NatsJSLogEvents.PendingCount, "Header pending bytes {Header} {Pending}", natsPendingBytes, _pendingBytes);
}
}
}
else
Expand Down Expand Up @@ -276,14 +293,21 @@ protected override async ValueTask ReceiveInternalAsync(
_serializer),
_context);

_pendingMsgs--;
lock (_pendingGate)
{
if (_pendingMsgs > 0)
_pendingMsgs--;
}

if (_maxBytes > 0)
{
if (_debug)
_logger.LogDebug(NatsJSLogEvents.MessageProperty, "Message size {Size}", msg.Size);

_pendingBytes -= msg.Size;
lock (_pendingGate)
{
_pendingBytes -= msg.Size;
}
}

await _userMsgs.Writer.WriteAsync(msg).ConfigureAwait(false);
Expand All @@ -298,42 +322,59 @@ protected override void TryComplete()
_userMsgs.Writer.TryComplete();
}

private void CheckPending()
private void ResetPending()
{
if (_maxBytes > 0 && _pendingBytes <= _thresholdBytes)
lock (_pendingGate)
{
if (_debug)
_logger.LogDebug(NatsJSLogEvents.PendingCount, "Check pending bytes {Pending}", _pendingBytes);

Pull(_maxMsgs, _maxBytes - _pendingBytes);
ResetPending();
_pendingMsgs = _maxMsgs;
_pendingBytes = _maxBytes;
}
else if (_maxBytes == 0 && _pendingMsgs <= _thresholdMsgs)
}

private void CheckPending()
{
lock (_pendingGate)
{
if (_debug)
_logger.LogDebug(NatsJSLogEvents.PendingCount, "Check pending messages {Pending}", _pendingMsgs);
if (_maxBytes > 0 && _pendingBytes <= _thresholdBytes)
{
if (_debug)
_logger.LogDebug(NatsJSLogEvents.PendingCount, "Check pending bytes {Pending}, {MaxBytes}", _pendingBytes, _maxBytes);

Pull("chk-bytes", _maxMsgs, _maxBytes - _pendingBytes);
ResetPending();
}
else if (_maxBytes == 0 && _pendingMsgs <= _thresholdMsgs && _pendingMsgs < _maxMsgs)
{
if (_debug)
_logger.LogDebug(NatsJSLogEvents.PendingCount, "Check pending messages {Pending}, {MaxMsgs}", _pendingMsgs, _maxMsgs);

Pull(_maxMsgs - _pendingMsgs, 0);
ResetPending();
Pull("chk-msgs", _maxMsgs - _pendingMsgs, 0);
ResetPending();
}
}
}

private void Pull(long batch, long maxBytes) => _pullRequests.Writer.TryWrite(new ConsumerGetnextRequest
private void Pull(string origin, long batch, long maxBytes) => _pullRequests.Writer.TryWrite(new PullRequest
{
Batch = batch,
MaxBytes = maxBytes,
IdleHeartbeat = _idle,
Expires = _expires,
Request = new ConsumerGetnextRequest
{
Batch = batch,
MaxBytes = maxBytes,
IdleHeartbeat = _idle,
Expires = _expires,
},
Origin = origin,
});

private async Task PullLoop()
{
await foreach (var pr in _pullRequests.Reader.ReadAllAsync())
{
await CallMsgNextAsync(pr).ConfigureAwait(false);
var origin = $"pull-loop({pr.Origin})";
await CallMsgNextAsync(origin, pr.Request).ConfigureAwait(false);
if (_debug)
{
_logger.LogDebug(NatsJSLogEvents.PullRequest, "Pull request issued for {Batch}, {MaxBytes}", pr.Batch, pr.MaxBytes);
_logger.LogDebug(NatsJSLogEvents.PullRequest, "Pull request issued for {Origin} {Batch}, {MaxBytes}", origin, pr.Request.Batch, pr.Request.MaxBytes);
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/NATS.Client.JetStream/NatsJSConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public async ValueTask<INatsJSConsume<T>> ConsumeAsync<T>(NatsJSConsumeOpts? opt
{
ThrowIfDeleted();

opts ??= new NatsJSConsumeOpts();

var inbox = _context.NewInbox();

var max = NatsJSOptsDefaults.SetMax(opts.MaxMsgs, opts.MaxBytes, opts.ThresholdMsgs, opts.ThresholdBytes);
Expand Down Expand Up @@ -117,7 +119,9 @@ await _context.Connection.SubAsync(
sub: sub,
cancellationToken);

// Start consuming with the first Pull Request
await sub.CallMsgNextAsync(
"init",
new ConsumerGetnextRequest
{
Batch = max.MaxMsgs,
Expand All @@ -127,7 +131,6 @@ await sub.CallMsgNextAsync(
},
cancellationToken);

sub.ResetPending();
sub.ResetHeartbeatTimer();

return sub;
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.JetStream/NatsJSMsg.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private ValueTask SendAckAsync(ReadOnlySequence<byte> payload, AckOpts opts = de

return _msg.ReplyAsync(
payload: payload,
opts: new NatsPubOpts {WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent,},
opts: new NatsPubOpts { WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent },
cancellationToken: cancellationToken);
}
}
Expand Down
19 changes: 12 additions & 7 deletions tests/NATS.Client.Core.Tests/RequestReplyTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,26 @@ public class RequestReplyTest
[Fact]
public async Task Simple_request_reply_test()
{
// Trace to hunt flapper!
await using var server = NatsServer.StartWithTrace(_output);

await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection();

var sub = await nats.SubscribeAsync<int>("foo");
const string subject = "foo";
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var cancellationToken = cts.Token;

var sub = await nats.SubscribeAsync<int>(subject, cancellationToken: cancellationToken);
var reg = sub.Register(async msg =>
{
await msg.ReplyAsync(msg.Data * 2);
await msg.ReplyAsync(msg.Data * 2, cancellationToken: cancellationToken);
});

var natsSubOpts = new NatsSubOpts { Timeout = TimeSpan.FromSeconds(10) };

for (var i = 0; i < 10; i++)
{
var rep = await nats.RequestAsync<int, int>("foo", i);
Assert.Equal(i * 2, rep?.Data);
var rep = await nats.RequestAsync<int, int>(subject, i, replyOpts: natsSubOpts, cancellationToken: cancellationToken) ?? throw new TimeoutException("Request timeout");

Assert.Equal(i * 2, rep.Data);
}

await sub.DisposeAsync();
Expand Down
Loading

0 comments on commit b527abe

Please sign in to comment.