diff --git a/docs/documentation/jetstream/consume.md b/docs/documentation/jetstream/consume.md index 6124cc964..0f2970c8b 100644 --- a/docs/documentation/jetstream/consume.md +++ b/docs/documentation/jetstream/consume.md @@ -42,7 +42,7 @@ fast it can process messages without overwhelming the application process. while (!cancellationToken.IsCancellationRequested) { // Consume a batch of messages (1000 by default) - await foreach (var msg in consumer.FetchAllAsync()) + await foreach (var msg in consumer.FetchAsync()) { // Process message await msg.AckAsync(); @@ -59,7 +59,7 @@ overlapped so that there is a constant flow of messages from the JetStream serve or `MaxBytes` and respective thresholds to not overwhelm the application and to not waste server resources. ```csharp -await foreach (var msg in consumer.ConsumeAllAsync()) +await foreach (var msg in consumer.ConsumeAsync()) { // Process message await msg.AckAsync(); @@ -88,7 +88,7 @@ while (!cancellationToken.IsCancellationRequested) try { await consumer.RefreshAsync(); // or try to recreate consumer - await foreach (var msg in consumer.ConsumeAllAsync()) + await foreach (var msg in consumer.ConsumeAsync()) { // Process message await msg.AckAsync(); diff --git a/docs/documentation/jetstream/intro.md b/docs/documentation/jetstream/intro.md index 59f1cd198..0f2a5e505 100644 --- a/docs/documentation/jetstream/intro.md +++ b/docs/documentation/jetstream/intro.md @@ -110,7 +110,7 @@ Check out [JetStream documentation](https://docs.nats.io/nats-concepts/jetstream Finally, we're ready to consume the messages we persisted in `shop_orders` stream: ```csharp -await foreach (var msg in consumer.ConsumeAllAsync()) +await foreach (var msg in consumer.ConsumeAsync()) { var order = msg.Data; Console.WriteLine($"Processing {msg.Subject} {order}..."); diff --git a/sandbox/Example.JetStream.PullConsumer/Program.cs b/sandbox/Example.JetStream.PullConsumer/Program.cs index 27a398806..04707111b 100644 --- a/sandbox/Example.JetStream.PullConsumer/Program.cs +++ b/sandbox/Example.JetStream.PullConsumer/Program.cs @@ -65,39 +65,7 @@ void Report(int i, Stopwatch sw, string data) try { - if (cmd == "fetch") - { - while (!cts.Token.IsCancellationRequested) - { - try - { - Console.WriteLine($"___\nFETCH {maxMsgs}"); - await consumer.RefreshAsync(cts.Token); - await using var sub = await consumer.FetchAsync>(fetchOpts, cts.Token); - await foreach (var msg in sub.Msgs.ReadAllAsync(cts.Token)) - { - using (msg.Data) - { - var message = Encoding.ASCII.GetString(msg.Data.Span); - Console.WriteLine($"Received: {message}"); - } - - await msg.AckAsync(cancellationToken: cts.Token); - Report(++count, stopwatch, $"data: {msg.Data}"); - } - } - catch (NatsJSProtocolException e) - { - Console.WriteLine(e.Message); - } - catch (NatsJSException e) - { - Console.WriteLine(e.Message); - await Task.Delay(1000); - } - } - } - else if (cmd == "fetch-all-no-wait") + if (cmd == "fetch-no-wait") { while (!cts.Token.IsCancellationRequested) { @@ -110,7 +78,7 @@ void Report(int i, Stopwatch sw, string data) var fetchNoWaitOpts = new NatsJSFetchOpts { MaxMsgs = max }; var fetchMsgCount = 0; - await foreach (var msg in consumer.FetchAllNoWaitAsync>(fetchNoWaitOpts, cts.Token)) + await foreach (var msg in consumer.FetchNoWaitAsync>(fetchNoWaitOpts, cts.Token)) { fetchMsgCount++; using (msg.Data) @@ -140,7 +108,7 @@ void Report(int i, Stopwatch sw, string data) } } } - else if (cmd == "fetch-all") + else if (cmd == "fetch") { while (!cts.Token.IsCancellationRequested) { @@ -148,7 +116,7 @@ void Report(int i, Stopwatch sw, string data) { Console.WriteLine($"___\nFETCH {maxMsgs}"); await consumer.RefreshAsync(cts.Token); - await foreach (var msg in consumer.FetchAllAsync>(fetchOpts, cts.Token)) + await foreach (var msg in consumer.FetchAsync>(fetchOpts, cts.Token)) { using (msg.Data) { @@ -209,15 +177,9 @@ void Report(int i, Stopwatch sw, string data) try { Console.WriteLine("___\nCONSUME"); - await using var sub = await consumer.ConsumeAsync>(consumeOpts); - - cts.Token.Register(() => - { - sub.DisposeAsync().GetAwaiter().GetResult(); - }); - var stopped = false; - await foreach (var msg in sub.Msgs.ReadAllAsync()) + var consumeStop = CancellationTokenSource.CreateLinkedTokenSource(cts.Token); + await foreach (var msg in consumer.ConsumeAsync>(consumeOpts, consumeStop.Token)) { using (msg.Data) { @@ -226,7 +188,7 @@ void Report(int i, Stopwatch sw, string data) if (message == "stop") { Console.WriteLine("Stopping consumer..."); - sub.Stop(); + consumeStop.Cancel(); stopped = true; } } @@ -257,36 +219,6 @@ void Report(int i, Stopwatch sw, string data) } } } - else if (cmd == "consume-all") - { - while (!cts.Token.IsCancellationRequested) - { - try - { - Console.WriteLine("___\nCONSUME-ALL"); - await foreach (var msg in consumer.ConsumeAllAsync>(consumeOpts, cts.Token)) - { - using (msg.Data) - { - var message = Encoding.ASCII.GetString(msg.Data.Span); - Console.WriteLine($"Received: {message}"); - } - - await msg.AckAsync(cancellationToken: cts.Token); - Report(++count, stopwatch, $"data: {msg.Data}"); - } - } - catch (NatsJSProtocolException e) - { - Console.WriteLine(e.Message); - } - catch (NatsJSException e) - { - Console.WriteLine(e.Message); - await Task.Delay(1000); - } - } - } else { Console.WriteLine("Usage: dotnet run -- "); diff --git a/src/NATS.Client.JetStream/INatsJSConsume.cs b/src/NATS.Client.JetStream/INatsJSConsume.cs deleted file mode 100644 index 92302c7ef..000000000 --- a/src/NATS.Client.JetStream/INatsJSConsume.cs +++ /dev/null @@ -1,29 +0,0 @@ -using System.Threading.Channels; - -namespace NATS.Client.JetStream; - -/// -/// Interface to extract messages from a consume() operation on a consumer. -/// -public interface INatsJSConsume : IAsyncDisposable -{ - /// - /// Messages received from the consumer. - /// - ChannelReader> Msgs { get; } - - /// - /// Stop the consumer gracefully. - /// - /// - /// - /// This will wait for any inflight messages to be processed before stopping. - /// - /// - /// Disposing would stop consuming immediately. This might leave messages behind - /// without acknowledgement. Which is fine, messages will be scheduled for redelivery, - /// however, it might not be the desired behavior. - /// - /// - void Stop(); -} diff --git a/src/NATS.Client.JetStream/INatsJSFetch.cs b/src/NATS.Client.JetStream/INatsJSFetch.cs deleted file mode 100644 index 8a551d541..000000000 --- a/src/NATS.Client.JetStream/INatsJSFetch.cs +++ /dev/null @@ -1,14 +0,0 @@ -using System.Threading.Channels; - -namespace NATS.Client.JetStream; - -/// -/// Interface to extract messages from a fetch() operation on a consumer. -/// -public interface INatsJSFetch : IAsyncDisposable -{ - /// - /// User messages received from the consumer. - /// - ChannelReader> Msgs { get; } -} diff --git a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs index d9d929f2f..005e3d8fc 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs @@ -16,11 +16,10 @@ internal struct PullRequest public string Origin { get; init; } } -internal class NatsJSConsume : NatsSubBase, INatsJSConsume +internal class NatsJSConsume : NatsSubBase { private readonly ILogger _logger; private readonly bool _debug; - private readonly CancellationTokenSource _cts; private readonly Channel> _userMsgs; private readonly Channel _pullRequests; private readonly NatsJSContext _context; @@ -60,8 +59,7 @@ public NatsJSConsume( CancellationToken cancellationToken) : base(context.Connection, context.Connection.SubscriptionManager, subject, queueGroup, opts) { - _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - _cancellationToken = _cts.Token; + _cancellationToken = cancellationToken; _logger = Connection.Opts.LoggerFactory.CreateLogger>(); _debug = _logger.IsEnabled(LogLevel.Debug); _context = context; @@ -139,8 +137,6 @@ public NatsJSConsume( public ChannelReader> Msgs { get; } - public void Stop() => _cts.Cancel(); - public ValueTask CallMsgNextAsync(string origin, ConsumerGetnextRequest request, CancellationToken cancellationToken = default) { if (_cancellationToken.IsCancellationRequested) @@ -295,7 +291,7 @@ protected override async ValueTask ReceiveInternalAsync( } else if (headers.HasTerminalJSError()) { - _userMsgs.Writer.TryComplete(new NatsJSProtocolException($"JetStream server error: {headers.Code} {headers.MessageText}")); + _userMsgs.Writer.TryComplete(new NatsJSProtocolException(headers.Code, headers.Message, headers.MessageText)); EndSubscription(NatsSubEndReason.JetStreamError); } else @@ -434,7 +430,7 @@ private void Pull(string origin, long batch, long maxBytes) => _pullRequests.Wri private async Task PullLoop() { - await foreach (var pr in _pullRequests.Reader.ReadAllAsync()) + await foreach (var pr in _pullRequests.Reader.ReadAllAsync().ConfigureAwait(false)) { var origin = $"pull-loop({pr.Origin})"; await CallMsgNextAsync(origin, pr.Request).ConfigureAwait(false); diff --git a/src/NATS.Client.JetStream/Internal/NatsJSExtensionsInternal.cs b/src/NATS.Client.JetStream/Internal/NatsJSExtensionsInternal.cs index 1673d95ee..286021aed 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSExtensionsInternal.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSExtensionsInternal.cs @@ -8,6 +8,7 @@ public static class NatsJSExtensionsInternal public static bool HasTerminalJSError(this NatsHeaders headers) => headers is { Code: 400 } + or { Code: 404 } or { Code: 409, Message: NatsHeaders.Messages.ConsumerDeleted } or { Code: 409, Message: NatsHeaders.Messages.ConsumerIsPushBased }; } diff --git a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs index bf30c89b4..524f296d0 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs @@ -8,7 +8,7 @@ namespace NATS.Client.JetStream.Internal; -internal class NatsJSFetch : NatsSubBase, INatsJSFetch +internal class NatsJSFetch : NatsSubBase { private readonly ILogger _logger; private readonly bool _debug; @@ -189,7 +189,7 @@ protected override async ValueTask ReceiveInternalAsync( } else if (headers.HasTerminalJSError()) { - _userMsgs.Writer.TryComplete(new NatsJSProtocolException($"JetStream server error: {headers.Code} {headers.MessageText}")); + _userMsgs.Writer.TryComplete(new NatsJSProtocolException(headers.Code, headers.Message, headers.MessageText)); EndSubscription(NatsSubEndReason.JetStreamError); } else diff --git a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs index 35ebb36ca..241889bb0 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs @@ -62,8 +62,8 @@ internal class NatsJSOrderedPushConsumer private readonly Task _commandTask; private readonly long _ackWaitNanos; - private long _sequenceStream; - private long _sequenceConsumer; + private ulong _sequenceStream; + private ulong _sequenceConsumer; private string _consumer; private volatile NatsJSOrderedPushConsumerSub? _sub; private int _done; @@ -218,7 +218,7 @@ private async Task CommandLoop() var sequence = Interlocked.Increment(ref _sequenceConsumer); - if (sequence != (long)metadata.Sequence.Consumer) + if (sequence != metadata.Sequence.Consumer) { CreateSub("sequence-mismatch"); _logger.LogWarning("Missed messages, recreating consumer"); @@ -228,7 +228,7 @@ private async Task CommandLoop() // Increment the sequence before writing to the channel in case the channel is full // and the writer is waiting for the reader to read the message. This way the sequence // will be correctly incremented in case the timeout kicks in and recreated the consumer. - Interlocked.Exchange(ref _sequenceStream, (long)metadata.Sequence.Stream); + Interlocked.Exchange(ref _sequenceStream, metadata.Sequence.Stream); if (!IsDone) { diff --git a/src/NATS.Client.JetStream/Models/ConsumerConfiguration.cs b/src/NATS.Client.JetStream/Models/ConsumerConfiguration.cs index 609d71044..67d8f6cc2 100644 --- a/src/NATS.Client.JetStream/Models/ConsumerConfiguration.cs +++ b/src/NATS.Client.JetStream/Models/ConsumerConfiguration.cs @@ -11,7 +11,7 @@ public record ConsumerConfiguration [System.Text.Json.Serialization.JsonPropertyName("opt_start_seq")] [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)] [System.ComponentModel.DataAnnotations.Range(0D, 18446744073709552000D)] - public long OptStartSeq { get; set; } = default!; + public ulong OptStartSeq { get; set; } = default!; [System.Text.Json.Serialization.JsonPropertyName("opt_start_time")] [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)] diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs index b3031ea96..2bfbe4b0a 100644 --- a/src/NATS.Client.JetStream/NatsJSConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs @@ -42,7 +42,7 @@ internal NatsJSConsumer(NatsJSContext context, ConsumerInfo info) public async ValueTask DeleteAsync(CancellationToken cancellationToken = default) { ThrowIfDeleted(); - return _deleted = await _context.DeleteConsumerAsync(_stream, _consumer, cancellationToken); + return _deleted = await _context.DeleteConsumerAsync(_stream, _consumer, cancellationToken).ConfigureAwait(false); } /// @@ -53,74 +53,18 @@ public async ValueTask DeleteAsync(CancellationToken cancellationToken = d /// Message type to deserialize. /// Async enumerable of messages which can be used in a await foreach loop. /// Consumer is deleted, it's push based or request sent to server is invalid. - public async IAsyncEnumerable> ConsumeAllAsync( + public async IAsyncEnumerable> ConsumeAsync( NatsJSConsumeOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default) { opts ??= _context.Opts.DefaultConsumeOpts; - await using var cc = await ConsumeAsync(opts, cancellationToken); - await foreach (var jsMsg in cc.Msgs.ReadAllAsync(cancellationToken)) + await using var cc = await ConsumeInternalAsync(opts, cancellationToken).ConfigureAwait(false); + await foreach (var jsMsg in cc.Msgs.ReadAllAsync(cancellationToken).ConfigureAwait(false)) { yield return jsMsg; } } - /// - /// Starts consuming messages from the stream using this consumer. - /// - /// Consume options. (default: MaxMsgs 1,000) - /// A used to cancel the call. - /// Message type to deserialize. - /// A consume object to manage the operation and retrieve messages. - /// Consumer is deleted, it's push based or request sent to server is invalid. - /// There is an error sending the message or this consumer object isn't valid anymore because it was deleted earlier. - public async ValueTask> ConsumeAsync(NatsJSConsumeOpts? opts = default, CancellationToken cancellationToken = default) - { - ThrowIfDeleted(); - - opts ??= new NatsJSConsumeOpts(); - - var inbox = _context.NewInbox(); - - var max = NatsJSOptsDefaults.SetMax(opts.MaxMsgs, opts.MaxBytes, opts.ThresholdMsgs, opts.ThresholdBytes); - var timeouts = NatsJSOptsDefaults.SetTimeouts(opts.Expires, opts.IdleHeartbeat); - - var requestOpts = BuildRequestOpts(opts.Serializer, opts.MaxMsgs); - - var sub = new NatsJSConsume( - stream: _stream, - consumer: _consumer, - context: _context, - subject: inbox, - queueGroup: default, - opts: requestOpts, - maxMsgs: max.MaxMsgs, - maxBytes: max.MaxBytes, - thresholdMsgs: max.ThresholdMsgs, - thresholdBytes: max.ThresholdBytes, - expires: timeouts.Expires, - idle: timeouts.IdleHeartbeat, - cancellationToken: cancellationToken); - - await _context.Connection.SubAsync(sub: sub, cancellationToken); - - // Start consuming with the first Pull Request - await sub.CallMsgNextAsync( - "init", - new ConsumerGetnextRequest - { - Batch = max.MaxMsgs, - MaxBytes = max.MaxBytes, - IdleHeartbeat = timeouts.IdleHeartbeat.ToNanos(), - Expires = timeouts.Expires.ToNanos(), - }, - cancellationToken); - - sub.ResetHeartbeatTimer(); - - return sub; - } - /// /// Consume a single message from the stream using this consumer. /// @@ -156,7 +100,7 @@ await sub.CallMsgNextAsync( ThrowIfDeleted(); opts ??= _context.Opts.DefaultNextOpts; - await using var f = await FetchAsync( + await using var f = await FetchInternalAsync( new NatsJSFetchOpts { MaxMsgs = 1, @@ -164,9 +108,9 @@ await sub.CallMsgNextAsync( Expires = opts.Expires, Serializer = opts.Serializer, }, - cancellationToken: cancellationToken); + cancellationToken: cancellationToken).ConfigureAwait(false); - await foreach (var natsJSMsg in f.Msgs.ReadAllAsync(cancellationToken)) + await foreach (var natsJSMsg in f.Msgs.ReadAllAsync(cancellationToken).ConfigureAwait(false)) { return natsJSMsg; } @@ -183,15 +127,15 @@ await sub.CallMsgNextAsync( /// Async enumerable of messages which can be used in a await foreach loop. /// Consumer is deleted, it's push based or request sent to server is invalid. /// There is an error sending the message or this consumer object isn't valid anymore because it was deleted earlier. - public async IAsyncEnumerable> FetchAllAsync( + public async IAsyncEnumerable> FetchAsync( NatsJSFetchOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default) { ThrowIfDeleted(); opts ??= _context.Opts.DefaultFetchOpts; - await using var fc = await FetchAsync(opts, cancellationToken); - await foreach (var jsMsg in fc.Msgs.ReadAllAsync(cancellationToken)) + await using var fc = await FetchInternalAsync(opts, cancellationToken).ConfigureAwait(false); + await foreach (var jsMsg in fc.Msgs.ReadAllAsync(cancellationToken).ConfigureAwait(false)) { yield return jsMsg; } @@ -241,30 +185,80 @@ await sub.CallMsgNextAsync( /// } /// /// - public async IAsyncEnumerable> FetchAllNoWaitAsync( + public async IAsyncEnumerable> FetchNoWaitAsync( NatsJSFetchOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default) { ThrowIfDeleted(); opts ??= _context.Opts.DefaultFetchOpts; - await using var fc = await FetchAsync(opts with { NoWait = true }, cancellationToken); - await foreach (var jsMsg in fc.Msgs.ReadAllAsync(cancellationToken)) + await using var fc = await FetchInternalAsync(opts with { NoWait = true }, cancellationToken).ConfigureAwait(false); + await foreach (var jsMsg in fc.Msgs.ReadAllAsync(cancellationToken).ConfigureAwait(false)) { yield return jsMsg; } } /// - /// Consume a set number of messages from the stream using this consumer. + /// Retrieve the consumer info from the server and update this consumer. /// - /// Fetch options. (default: MaxMsgs 1,000 and timeout in 30 seconds) - /// A used to cancel the call. - /// Message type to deserialize. - /// A fetch object to manage the operation and retrieve messages. - /// Consumer is deleted, it's push based or request sent to server is invalid. - /// There is an error sending the message or this consumer object isn't valid anymore because it was deleted earlier. - public async ValueTask> FetchAsync( + /// A used to cancel the API call. + /// There was an issue retrieving the response. + /// Server responded with an error. + public async ValueTask RefreshAsync(CancellationToken cancellationToken = default) => + Info = await _context.JSRequestResponseAsync( + subject: $"{_context.Opts.Prefix}.CONSUMER.INFO.{_stream}.{_consumer}", + request: null, + cancellationToken).ConfigureAwait(false); + + internal async ValueTask> ConsumeInternalAsync(NatsJSConsumeOpts? opts = default, CancellationToken cancellationToken = default) + { + ThrowIfDeleted(); + + opts ??= new NatsJSConsumeOpts(); + + var inbox = _context.NewInbox(); + + var max = NatsJSOptsDefaults.SetMax(opts.MaxMsgs, opts.MaxBytes, opts.ThresholdMsgs, opts.ThresholdBytes); + var timeouts = NatsJSOptsDefaults.SetTimeouts(opts.Expires, opts.IdleHeartbeat); + + var requestOpts = BuildRequestOpts(opts.Serializer, opts.MaxMsgs); + + var sub = new NatsJSConsume( + stream: _stream, + consumer: _consumer, + context: _context, + subject: inbox, + queueGroup: default, + opts: requestOpts, + maxMsgs: max.MaxMsgs, + maxBytes: max.MaxBytes, + thresholdMsgs: max.ThresholdMsgs, + thresholdBytes: max.ThresholdBytes, + expires: timeouts.Expires, + idle: timeouts.IdleHeartbeat, + cancellationToken: cancellationToken); + + await _context.Connection.SubAsync(sub: sub, cancellationToken).ConfigureAwait(false); + + // Start consuming with the first Pull Request + await sub.CallMsgNextAsync( + "init", + new ConsumerGetnextRequest + { + Batch = max.MaxMsgs, + MaxBytes = max.MaxBytes, + IdleHeartbeat = timeouts.IdleHeartbeat.ToNanos(), + Expires = timeouts.Expires.ToNanos(), + }, + cancellationToken).ConfigureAwait(false); + + sub.ResetHeartbeatTimer(); + + return sub; + } + + internal async ValueTask> FetchInternalAsync( NatsJSFetchOpts? opts = default, CancellationToken cancellationToken = default) { @@ -290,7 +284,7 @@ public async ValueTask> FetchAsync( expires: timeouts.Expires, idle: timeouts.IdleHeartbeat); - await _context.Connection.SubAsync(sub: sub, cancellationToken); + await _context.Connection.SubAsync(sub: sub, cancellationToken).ConfigureAwait(false); await sub.CallMsgNextAsync( opts.NoWait @@ -307,25 +301,13 @@ await sub.CallMsgNextAsync( Expires = timeouts.Expires.ToNanos(), NoWait = opts.NoWait, }, - cancellationToken); + cancellationToken).ConfigureAwait(false); sub.ResetHeartbeatTimer(); return sub; } - /// - /// Retrieve the consumer info from the server and update this consumer. - /// - /// A used to cancel the API call. - /// There was an issue retrieving the response. - /// Server responded with an error. - public async ValueTask RefreshAsync(CancellationToken cancellationToken = default) => - Info = await _context.JSRequestResponseAsync( - subject: $"{_context.Opts.Prefix}.CONSUMER.INFO.{_stream}.{_consumer}", - request: null, - cancellationToken).ConfigureAwait(false); - private static NatsSubOpts BuildRequestOpts(INatsSerializer? serializer, int? maxMsgs) => new() { diff --git a/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs b/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs index 04fc937dc..3fef3bb2f 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs @@ -1,10 +1,28 @@ using System.Runtime.CompilerServices; +using NATS.Client.Core.Internal; +using NATS.Client.JetStream.Internal; using NATS.Client.JetStream.Models; namespace NATS.Client.JetStream; public partial class NatsJSContext { + /// + /// Creates new ordered consumer. + /// + /// Stream name to create the consumer under. + /// Ordered consumer options. + /// A used to cancel the API call. + /// The NATS JetStream consumer object which can be used retrieving ordered data from the stream. + public ValueTask CreateOrderedConsumerAsync( + string stream, + NatsJSOrderedConsumerOpts? opts = default, + CancellationToken cancellationToken = default) + { + opts ??= NatsJSOrderedConsumerOpts.Default; + return new ValueTask(new NatsJSOrderedConsumer(stream, this, opts, cancellationToken)); + } + /// /// Creates new consumer if it doesn't exists or returns an existing one with the same name. /// @@ -90,13 +108,13 @@ public async ValueTask GetConsumerAsync(string stream, string co /// /// Stream name the consumers belong to. /// A used to cancel the API call. - /// Async enumerable of consumer objects. Can be used in a await foreach loop. + /// Async enumerable of consumer info objects. Can be used in a await foreach loop. /// There was an issue retrieving the response. /// Server responded with an error. /// /// Note that paging isn't implemented. You might receive only a partial list of consumers if there are a lot of them. /// - public async IAsyncEnumerable ListConsumersAsync( + public async IAsyncEnumerable ListConsumersAsync( string stream, [EnumeratorCancellation] CancellationToken cancellationToken = default) { @@ -105,7 +123,7 @@ public async IAsyncEnumerable ListConsumersAsync( new ConsumerListRequest { Offset = 0 }, cancellationToken); foreach (var consumer in response.Consumers) - yield return new NatsJSConsumer(this, consumer); + yield return consumer; } /// @@ -125,4 +143,52 @@ public async ValueTask DeleteConsumerAsync(string stream, string consumer, cancellationToken); return response.Success; } + + internal ValueTask CreateOrderedConsumerInternalAsync( + string stream, + NatsJSOrderedConsumerOpts opts, + CancellationToken cancellationToken) + { + var request = new ConsumerCreateRequest + { + StreamName = stream, + Config = new ConsumerConfiguration + { + DeliverPolicy = opts.DeliverPolicy, + AckPolicy = ConsumerConfigurationAckPolicy.none, + ReplayPolicy = opts.ReplayPolicy, + InactiveThreshold = opts.InactiveThreshold.ToNanos(), + NumReplicas = 1, + MemStorage = true, + }, + }; + + if (opts.OptStartSeq > 0) + { + request.Config.OptStartSeq = opts.OptStartSeq; + } + + if (opts.OptStartTime != default) + { + request.Config.OptStartTime = opts.OptStartTime; + } + + if (opts.HeadersOnly) + { + request.Config.HeadersOnly = true; + } + + if (opts.FilterSubjects.Length > 0) + { + request.Config.FilterSubjects = opts.FilterSubjects; + } + + var name = NuidWriter.NewNuid(); + var subject = $"{Opts.Prefix}.CONSUMER.CREATE.{stream}.{name}"; + + return JSRequestResponseAsync( + subject: subject, + request, + cancellationToken); + } } diff --git a/src/NATS.Client.JetStream/NatsJSException.cs b/src/NATS.Client.JetStream/NatsJSException.cs index 4c714a982..82da40f57 100644 --- a/src/NATS.Client.JetStream/NatsJSException.cs +++ b/src/NATS.Client.JetStream/NatsJSException.cs @@ -36,21 +36,22 @@ public class NatsJSProtocolException : NatsJSException /// /// Create JetStream protocol exception. /// - /// Error message. - public NatsJSProtocolException(string message) - : base(message) + /// Server error code + /// Server error message enum (if defined) + /// Server error message string + public NatsJSProtocolException(int headerCode, NatsHeaders.Messages headerMessage, string headerMessageText) + : base($"JetStream server error: {headerCode} {headerMessageText}") { + HeaderCode = headerCode; + HeaderMessage = headerMessage; + HeaderMessageText = headerMessageText; } - /// - /// Create JetStream protocol exception. - /// - /// Error message. - /// Inner exception. - public NatsJSProtocolException(string message, Exception exception) - : base(message, exception) - { - } + public int HeaderCode { get; } + + public NatsHeaders.Messages HeaderMessage { get; } + + public string HeaderMessageText { get; } } /// diff --git a/src/NATS.Client.JetStream/NatsJSOpts.cs b/src/NATS.Client.JetStream/NatsJSOpts.cs index 0839c2e05..2584fcaa9 100644 --- a/src/NATS.Client.JetStream/NatsJSOpts.cs +++ b/src/NATS.Client.JetStream/NatsJSOpts.cs @@ -1,4 +1,5 @@ using NATS.Client.Core; +using NATS.Client.JetStream.Models; namespace NATS.Client.JetStream; @@ -59,6 +60,30 @@ public NatsJSOpts(NatsOpts opts, string? apiPrefix = default, string? domain = d public NatsJSNextOpts DefaultNextOpts { get; init; } = new(); } +public record NatsJSOrderedConsumerOpts +{ + public static readonly NatsJSOrderedConsumerOpts Default = new(); + + public string[] FilterSubjects { get; init; } = Array.Empty(); + + public ConsumerConfigurationDeliverPolicy DeliverPolicy { get; init; } = ConsumerConfigurationDeliverPolicy.all; + + public ulong OptStartSeq { get; init; } = 0; + + public DateTimeOffset OptStartTime { get; init; } = default; + + public ConsumerConfigurationReplayPolicy ReplayPolicy { get; init; } = ConsumerConfigurationReplayPolicy.instant; + + public TimeSpan InactiveThreshold { get; init; } = TimeSpan.FromMinutes(5); + + public bool HeadersOnly { get; init; } = false; + + /// + /// Maximum number of attempts for the consumer to be recreated (Defaults to unlimited). + /// + public int MaxResetAttempts { get; init; } = -1; +} + /// /// Consumer consume method options. /// diff --git a/src/NATS.Client.JetStream/NatsJSOrderedConsumer.cs b/src/NATS.Client.JetStream/NatsJSOrderedConsumer.cs new file mode 100644 index 000000000..42e0cc073 --- /dev/null +++ b/src/NATS.Client.JetStream/NatsJSOrderedConsumer.cs @@ -0,0 +1,237 @@ +using System.Runtime.CompilerServices; +using NATS.Client.Core; +using NATS.Client.JetStream.Models; + +namespace NATS.Client.JetStream; + +/// +/// NATS JetStream ordered consumer. +/// +public class NatsJSOrderedConsumer +{ + private readonly string _stream; + private readonly NatsJSContext _context; + private readonly NatsJSOrderedConsumerOpts _opts; + private readonly CancellationToken _cancellationToken; + private ulong _fetchSeq; + private string _fetchConsumerName = string.Empty; + + /// + /// Creates a new NATS JetStream ordered consumer. + /// + /// Name of the stream. + /// NATS JetStream context. + /// Consumer options. + /// A used to cancel consume and fetch operations. + public NatsJSOrderedConsumer(string stream, NatsJSContext context, NatsJSOrderedConsumerOpts opts, CancellationToken cancellationToken) + { + _stream = stream; + _context = context; + _opts = opts; + _cancellationToken = cancellationToken; + } + + /// + /// Consume messages from the stream in order. + /// + /// Consume options. + /// A used to cancel consume operation. + /// Serialized message data type. + /// Asynchronous enumeration which can be used in a await foreach loop. + /// There was a JetStream server error. + public async IAsyncEnumerable> ConsumeAsync( + NatsJSConsumeOpts? opts = default, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_cancellationToken, cancellationToken).Token; + + var consumerName = string.Empty; + ulong seq = 0; + while (!cancellationToken.IsCancellationRequested) + { + var consumer = await RecreateConsumer(consumerName, seq, cancellationToken); + consumerName = consumer.Info.Name; + + await using var cc = await consumer.ConsumeInternalAsync(opts, cancellationToken); + + NatsJSProtocolException? protocolException = default; + while (true) + { + // We have to check every call to WaitToReadAsync and TryRead for + // protocol exceptions individually because we can't yield return + // within try-catch. + try + { + var read = await cc.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false); + if (!read) + break; + } + catch (NatsJSProtocolException pe) + { + protocolException = pe; + goto CONSUME_LOOP; + } + + while (true) + { + NatsJSMsg msg; + + try + { + var canRead = cc.Msgs.TryRead(out msg); + if (!canRead) + break; + } + catch (NatsJSProtocolException pe) + { + protocolException = pe; + goto CONSUME_LOOP; + } + + if (msg.Metadata is not { } metadata) + continue; + + seq = metadata.Sequence.Stream; + + yield return msg; + } + } + + CONSUME_LOOP: + if (protocolException != null) + { + if (protocolException + is { HeaderCode: 409, HeaderMessage: NatsHeaders.Messages.ConsumerDeleted } + or { HeaderCode: 404 }) + { + // Ignore missing consumer errors and let the + // consumer be recreated above. + } + else + { + await TryDeleteConsumer(consumerName, cancellationToken); + throw protocolException; + } + } + + if (await TryDeleteConsumer(consumerName, cancellationToken)) + { + consumerName = string.Empty; + } + } + } + + /// + /// Fetch messages from the stream in order. + /// + /// Fetch options. + /// A used to cancel fetch operation. + /// Serialized message data type. + /// Asynchronous enumeration which can be used in a await foreach loop. + public async IAsyncEnumerable> FetchAsync( + NatsJSFetchOpts? opts = default, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_cancellationToken, cancellationToken).Token; + + var consumer = await RecreateConsumer(_fetchConsumerName, _fetchSeq, cancellationToken); + _fetchConsumerName = consumer.Info.Name; + + await foreach (var msg in consumer.FetchAsync(opts, cancellationToken)) + { + if (msg.Metadata is not { } metadata) + continue; + + _fetchSeq = metadata.Sequence.Stream; + yield return msg; + } + + var deleted = await TryDeleteConsumer(_fetchConsumerName, cancellationToken); + + if (deleted) + _fetchConsumerName = string.Empty; + } + + /// + /// Get the next message from the stream in order. + /// + /// Next options. + /// A used to cancel the underlying fetch operation. + /// Serialized message data type. + /// The next NATS JetStream message in order. + public async ValueTask?> NextAsync(NatsJSNextOpts? opts = default, CancellationToken cancellationToken = default) + { + opts ??= _context.Opts.DefaultNextOpts; + + var fetchOpts = new NatsJSFetchOpts + { + MaxMsgs = 1, + IdleHeartbeat = opts.IdleHeartbeat, + Expires = opts.Expires, + Serializer = opts.Serializer, + }; + + await foreach (var msg in FetchAsync(fetchOpts, cancellationToken)) + { + return msg; + } + + return default; + } + + private async Task RecreateConsumer(string consumer, ulong seq, CancellationToken cancellationToken) + { + var consumerOpts = _opts; + + if (seq > 0) + { + consumerOpts = _opts with + { + OptStartSeq = _fetchSeq + 1, + DeliverPolicy = ConsumerConfigurationDeliverPolicy.by_start_sequence, + }; + + if (consumer != string.Empty) + { + for (var i = 1; ; i++) + { + cancellationToken.ThrowIfCancellationRequested(); + + try + { + await _context.DeleteConsumerAsync(_stream, consumer, cancellationToken); + break; + } + catch (NatsJSApiException apiException) + { + if (apiException.Error.Code == 404) + { + break; + } + } + + if (i == _opts.MaxResetAttempts) + { + throw new NatsJSException("Maximum number of delete attempts reached."); + } + } + } + } + + var info = await _context.CreateOrderedConsumerInternalAsync(_stream, consumerOpts, cancellationToken); + + return new NatsJSConsumer(_context, info); + } + + private async ValueTask TryDeleteConsumer(string consumerName, CancellationToken cancellationToken) + { + try + { + return await _context.DeleteConsumerAsync(_stream, consumerName, cancellationToken); + } + catch + { + return false; + } + } +} diff --git a/src/NATS.Client.JetStream/NatsJSStream.cs b/src/NATS.Client.JetStream/NatsJSStream.cs index 3dce83e40..362300177 100644 --- a/src/NATS.Client.JetStream/NatsJSStream.cs +++ b/src/NATS.Client.JetStream/NatsJSStream.cs @@ -99,6 +99,12 @@ public ValueTask CreateConsumerAsync(string consumer, ConsumerCo return _context.CreateConsumerAsync(_name, consumer, ackPolicy, cancellationToken); } + public ValueTask CreateOrderedConsumerAsync(NatsJSOrderedConsumerOpts? opts = default, CancellationToken cancellationToken = default) + { + ThrowIfDeleted(); + return _context.CreateOrderedConsumerAsync(_name, opts, cancellationToken); + } + /// /// Creates new consumer for this stream if it doesn't exists or returns an existing one with the same name. /// @@ -137,7 +143,7 @@ public ValueTask GetConsumerAsync(string consumer, CancellationT /// /// Note that paging isn't implemented. You might receive only a partial list of consumers if there are a lot of them. /// - public IAsyncEnumerable ListConsumersAsync(CancellationToken cancellationToken = default) + public IAsyncEnumerable ListConsumersAsync(CancellationToken cancellationToken = default) { ThrowIfDeleted(); return _context.ListConsumersAsync(_name, cancellationToken); diff --git a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs index 6f4ba61b7..96d326671 100644 --- a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs +++ b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs @@ -48,7 +48,7 @@ internal class NatsKVWatcher : IAsyncDisposable private readonly Task _commandTask; private readonly long _ackWaitNanos; - private long _sequenceStream; + private ulong _sequenceStream; private long _sequenceConsumer; private string _consumer; private volatile NatsKVWatchSub? _sub; @@ -243,7 +243,7 @@ private async Task CommandLoop() // Increment the sequence before writing to the channel in case the channel is full // and the writer is waiting for the reader to read the message. This way the sequence // will be correctly incremented in case the timeout kicks in and recreated the consumer. - Interlocked.Exchange(ref _sequenceStream, (long)metadata.Sequence.Stream); + Interlocked.Exchange(ref _sequenceStream, metadata.Sequence.Stream); await _entryChannel.Writer.WriteAsync(entry, _cancellationToken); } diff --git a/tests/NATS.Client.CheckNativeAot/Program.cs b/tests/NATS.Client.CheckNativeAot/Program.cs index ce0265a98..73dc12647 100644 --- a/tests/NATS.Client.CheckNativeAot/Program.cs +++ b/tests/NATS.Client.CheckNativeAot/Program.cs @@ -127,10 +127,9 @@ async Task JetStreamTests() // Consume var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var messages = new List>(); - var cc = await consumer.ConsumeAsync( - new NatsJSConsumeOpts { MaxMsgs = 100, Serializer = TestDataJsonSerializer.Default }, - cancellationToken: cts2.Token); - await foreach (var msg in cc.Msgs.ReadAllAsync(cts2.Token)) + await foreach (var msg in consumer.ConsumeAsync( + new NatsJSConsumeOpts { MaxMsgs = 100, Serializer = TestDataJsonSerializer.Default }, + cancellationToken: cts2.Token)) { messages.Add(msg); diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs index d0d0a4edd..8e06cc000 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs @@ -37,7 +37,7 @@ public async Task Consume_msgs_test() var consumerOpts = new NatsJSConsumeOpts { MaxMsgs = 10, Serializer = TestDataJsonSerializer.Default }; var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token); var count = 0; - await using var cc = await consumer.ConsumeAsync(consumerOpts, cancellationToken: cts.Token); + await using var cc = await consumer.ConsumeInternalAsync(consumerOpts, cancellationToken: cts.Token); await foreach (var msg in cc.Msgs.ReadAllAsync(cts.Token)) { await msg.AckAsync(new AckOpts(true), cts.Token); @@ -111,7 +111,7 @@ public async Task Consume_idle_heartbeat_test() }; var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token); var count = 0; - var cc = await consumer.ConsumeAsync(consumerOpts, cancellationToken: cts.Token); + var cc = await consumer.ConsumeInternalAsync(consumerOpts, cancellationToken: cts.Token); await foreach (var msg in cc.Msgs.ReadAllAsync(cts.Token)) { await msg.AckAsync(new AckOpts(WaitUntilSent: true), cts.Token); @@ -178,7 +178,7 @@ public async Task Consume_reconnect_test() // Not interested in management messages sent upto this point await proxy.FlushFramesAsync(nats); - var cc = await consumer.ConsumeAsync(consumerOpts, cancellationToken: cts.Token); + var cc = await consumer.ConsumeInternalAsync(consumerOpts, cancellationToken: cts.Token); var readerTask = Task.Run(async () => { @@ -255,7 +255,7 @@ public async Task Consume_dispose_test() ack.EnsureSuccess(); } - var cc = await consumer.ConsumeAsync(consumerOpts, cancellationToken: cts.Token); + var cc = await consumer.ConsumeInternalAsync(consumerOpts, cancellationToken: cts.Token); var signal = new WaitSignal(); var reader = Task.Run(async () => @@ -278,7 +278,7 @@ public async Task Consume_dispose_test() var infos = new List(); await foreach (var natsJSConsumer in stream.ListConsumersAsync(cts.Token)) { - infos.Add(natsJSConsumer.Info); + infos.Add(natsJSConsumer); } Assert.Single(infos); @@ -312,7 +312,8 @@ public async Task Consume_stop_test() ack.EnsureSuccess(); } - var cc = await consumer.ConsumeAsync(consumerOpts, cancellationToken: cts.Token); + var consumeStop = CancellationTokenSource.CreateLinkedTokenSource(cts.Token); + var cc = await consumer.ConsumeInternalAsync(consumerOpts, cancellationToken: consumeStop.Token); var signal = new WaitSignal(); var reader = Task.Run(async () => @@ -325,14 +326,14 @@ public async Task Consume_stop_test() }); await signal; - cc.Stop(); + consumeStop.Cancel(); await reader; var infos = new List(); await foreach (var natsJSConsumer in stream.ListConsumersAsync(cts.Token)) { - infos.Add(natsJSConsumer.Info); + infos.Add(natsJSConsumer); } Assert.Single(infos); diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs index eefe91dd2..9f2198aaa 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs @@ -28,7 +28,7 @@ public async Task Fetch_test() var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token); var count = 0; await using var fc = - await consumer.FetchAsync(new NatsJSFetchOpts { MaxMsgs = 10, Serializer = TestDataJsonSerializer.Default }, cancellationToken: cts.Token); + await consumer.FetchInternalAsync(new NatsJSFetchOpts { MaxMsgs = 10, Serializer = TestDataJsonSerializer.Default }, cancellationToken: cts.Token); await foreach (var msg in fc.Msgs.ReadAllAsync(cts.Token)) { await msg.AckAsync(new AckOpts(WaitUntilSent: true), cts.Token); @@ -57,7 +57,7 @@ public async Task FetchNoWait_test() var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token); var count = 0; - await foreach (var msg in consumer.FetchAllNoWaitAsync(new NatsJSFetchOpts { MaxMsgs = 10, Serializer = TestDataJsonSerializer.Default }, cancellationToken: cts.Token)) + await foreach (var msg in consumer.FetchNoWaitAsync(new NatsJSFetchOpts { MaxMsgs = 10, Serializer = TestDataJsonSerializer.Default }, cancellationToken: cts.Token)) { await msg.AckAsync(new AckOpts(WaitUntilSent: true), cts.Token); Assert.Equal(count, msg.Data!.Test); @@ -93,7 +93,7 @@ public async Task Fetch_dispose_test() ack.EnsureSuccess(); } - var fc = await consumer.FetchAsync(fetchOpts, cancellationToken: cts.Token); + var fc = await consumer.FetchInternalAsync(fetchOpts, cancellationToken: cts.Token); var signal = new WaitSignal(); var reader = Task.Run(async () => @@ -116,7 +116,7 @@ public async Task Fetch_dispose_test() var infos = new List(); await foreach (var natsJSConsumer in stream.ListConsumersAsync(cts.Token)) { - infos.Add(natsJSConsumer.Info); + infos.Add(natsJSConsumer); } Assert.Single(infos); diff --git a/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs b/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs index 381cf6980..4cb59ac2e 100644 --- a/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs +++ b/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs @@ -83,7 +83,7 @@ public async Task Create_stream_test() // Consume var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var messages = new List>(); - var cc = await consumer.ConsumeAsync( + var cc = await consumer.ConsumeInternalAsync( new NatsJSConsumeOpts { MaxMsgs = 100, Serializer = TestDataJsonSerializer.Default }, cancellationToken: cts2.Token); await foreach (var msg in cc.Msgs.ReadAllAsync(cts2.Token)) diff --git a/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs b/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs index 5cdfc3a85..8cabdb5b5 100644 --- a/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs @@ -63,7 +63,7 @@ public async Task List_delete_consumer() var list = new List(); await foreach (var consumer in js.ListConsumersAsync("s1", cts.Token)) { - list.Add(consumer.Info); + list.Add(consumer); } Assert.Equal(3, list.Count); @@ -81,7 +81,7 @@ public async Task List_delete_consumer() var list = new List(); await foreach (var consumer in js.ListConsumersAsync("s1", cts.Token)) { - list.Add(consumer.Info); + list.Add(consumer); } Assert.Equal(2, list.Count); diff --git a/tests/NATS.Client.JetStream.Tests/OrderedConsumerTest.cs b/tests/NATS.Client.JetStream.Tests/OrderedConsumerTest.cs new file mode 100644 index 000000000..3d78d73c5 --- /dev/null +++ b/tests/NATS.Client.JetStream.Tests/OrderedConsumerTest.cs @@ -0,0 +1,112 @@ +using NATS.Client.Core.Tests; + +namespace NATS.Client.JetStream.Tests; + +public class OrderedConsumerTest +{ + private readonly ITestOutputHelper _output; + + public OrderedConsumerTest(ITestOutputHelper output) => _output = output; + + [Fact] + public async Task Consume_test() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats); + + var stream = await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); + + for (var i = 0; i < 10; i++) + { + await js.PublishAsync("s1.foo", i, cancellationToken: cts.Token); + } + + var consumer = await stream.CreateOrderedConsumerAsync(cancellationToken: cts.Token); + + var count = 0; + _output.WriteLine("Consuming..."); + var consumeOpts = new NatsJSConsumeOpts + { + MaxMsgs = 3, + Expires = TimeSpan.FromSeconds(3), + }; + await foreach (var msg in consumer.ConsumeAsync(consumeOpts, cancellationToken: cts.Token)) + { + _output.WriteLine($"[RCV] {msg.Data}"); + Assert.Equal(count, msg.Data); + if (++count == 10) + break; + } + } + + [Fact] + public async Task Fetch_test() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats); + + var stream = await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); + + for (var i = 0; i < 10; i++) + { + await js.PublishAsync("s1.foo", i, cancellationToken: cts.Token); + } + + var consumer = await stream.CreateOrderedConsumerAsync(cancellationToken: cts.Token); + + for (var i = 0; i < 10;) + { + _output.WriteLine("Fetching..."); + var fetchOpts = new NatsJSFetchOpts + { + MaxMsgs = 3, + Expires = TimeSpan.FromSeconds(3), + }; + await foreach (var msg in consumer.FetchAsync(fetchOpts, cancellationToken: cts.Token)) + { + _output.WriteLine($"[RCV] {msg.Data}"); + Assert.Equal(i, msg.Data); + i++; + } + } + } + + [Fact] + public async Task Next_test() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats); + + var stream = await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); + + for (var i = 0; i < 10; i++) + { + await js.PublishAsync("s1.foo", i, cancellationToken: cts.Token); + } + + var consumer = await stream.CreateOrderedConsumerAsync(cancellationToken: cts.Token); + + for (var i = 0; i < 10;) + { + _output.WriteLine("Next..."); + var nextOpts = new NatsJSNextOpts + { + Expires = TimeSpan.FromSeconds(3), + }; + var next = await consumer.NextAsync(nextOpts, cts.Token); + + if (next is { } msg) + { + _output.WriteLine($"[RCV] {msg.Data}"); + Assert.Equal(i, msg.Data); + i++; + } + } + } +}