Skip to content

Commit

Permalink
JetStream Ordered consumer (#169)
Browse files Browse the repository at this point in the history
* Ordered consumer

* Ordered consume method

* Consumer API tidy up

Simplified consumer api to just consume, fetch and next methods
removing the *All* versions. Now consume and fetch methods return
IAsyncEnumerable only. This reduces the API foot-print as well as
making it consistent between 'Ordered Consumer' and 'Consumer'.

* Removed consume and fetch methods exposing channel reader.
* Renamed *All* methods e.g. FetchAllAsync() to FetchAsync().
* Also removed the interfaces exposing the channel readers.

* Code formatted

* Ordered consumer docs

* Merge fixes

* Removed copy paste code errors
  • Loading branch information
mtmk authored Nov 6, 2023
1 parent 719f517 commit 548b15c
Show file tree
Hide file tree
Showing 23 changed files with 580 additions and 265 deletions.
6 changes: 3 additions & 3 deletions docs/documentation/jetstream/consume.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fast it can process messages without overwhelming the application process.
while (!cancellationToken.IsCancellationRequested)
{
// Consume a batch of messages (1000 by default)
await foreach (var msg in consumer.FetchAllAsync<Order>())
await foreach (var msg in consumer.FetchAsync<Order>())
{
// Process message
await msg.AckAsync();
Expand All @@ -59,7 +59,7 @@ overlapped so that there is a constant flow of messages from the JetStream serve
or `MaxBytes` and respective thresholds to not overwhelm the application and to not waste server resources.

```csharp
await foreach (var msg in consumer.ConsumeAllAsync<Order>())
await foreach (var msg in consumer.ConsumeAsync<Order>())
{
// Process message
await msg.AckAsync();
Expand Down Expand Up @@ -88,7 +88,7 @@ while (!cancellationToken.IsCancellationRequested)
try
{
await consumer.RefreshAsync(); // or try to recreate consumer
await foreach (var msg in consumer.ConsumeAllAsync<Order>())
await foreach (var msg in consumer.ConsumeAsync<Order>())
{
// Process message
await msg.AckAsync();
Expand Down
2 changes: 1 addition & 1 deletion docs/documentation/jetstream/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ Check out [JetStream documentation](https://docs.nats.io/nats-concepts/jetstream
Finally, we're ready to consume the messages we persisted in `shop_orders` stream:

```csharp
await foreach (var msg in consumer.ConsumeAllAsync<Order>())
await foreach (var msg in consumer.ConsumeAsync<Order>())
{
var order = msg.Data;
Console.WriteLine($"Processing {msg.Subject} {order}...");
Expand Down
82 changes: 7 additions & 75 deletions sandbox/Example.JetStream.PullConsumer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,39 +65,7 @@ void Report(int i, Stopwatch sw, string data)

try
{
if (cmd == "fetch")
{
while (!cts.Token.IsCancellationRequested)
{
try
{
Console.WriteLine($"___\nFETCH {maxMsgs}");
await consumer.RefreshAsync(cts.Token);
await using var sub = await consumer.FetchAsync<NatsMemoryOwner<byte>>(fetchOpts, cts.Token);
await foreach (var msg in sub.Msgs.ReadAllAsync(cts.Token))
{
using (msg.Data)
{
var message = Encoding.ASCII.GetString(msg.Data.Span);
Console.WriteLine($"Received: {message}");
}

await msg.AckAsync(cancellationToken: cts.Token);
Report(++count, stopwatch, $"data: {msg.Data}");
}
}
catch (NatsJSProtocolException e)
{
Console.WriteLine(e.Message);
}
catch (NatsJSException e)
{
Console.WriteLine(e.Message);
await Task.Delay(1000);
}
}
}
else if (cmd == "fetch-all-no-wait")
if (cmd == "fetch-no-wait")
{
while (!cts.Token.IsCancellationRequested)
{
Expand All @@ -110,7 +78,7 @@ void Report(int i, Stopwatch sw, string data)
var fetchNoWaitOpts = new NatsJSFetchOpts { MaxMsgs = max };
var fetchMsgCount = 0;

await foreach (var msg in consumer.FetchAllNoWaitAsync<NatsMemoryOwner<byte>>(fetchNoWaitOpts, cts.Token))
await foreach (var msg in consumer.FetchNoWaitAsync<NatsMemoryOwner<byte>>(fetchNoWaitOpts, cts.Token))
{
fetchMsgCount++;
using (msg.Data)
Expand Down Expand Up @@ -140,15 +108,15 @@ void Report(int i, Stopwatch sw, string data)
}
}
}
else if (cmd == "fetch-all")
else if (cmd == "fetch")
{
while (!cts.Token.IsCancellationRequested)
{
try
{
Console.WriteLine($"___\nFETCH {maxMsgs}");
await consumer.RefreshAsync(cts.Token);
await foreach (var msg in consumer.FetchAllAsync<NatsMemoryOwner<byte>>(fetchOpts, cts.Token))
await foreach (var msg in consumer.FetchAsync<NatsMemoryOwner<byte>>(fetchOpts, cts.Token))
{
using (msg.Data)
{
Expand Down Expand Up @@ -209,15 +177,9 @@ void Report(int i, Stopwatch sw, string data)
try
{
Console.WriteLine("___\nCONSUME");
await using var sub = await consumer.ConsumeAsync<NatsMemoryOwner<byte>>(consumeOpts);

cts.Token.Register(() =>
{
sub.DisposeAsync().GetAwaiter().GetResult();
});

var stopped = false;
await foreach (var msg in sub.Msgs.ReadAllAsync())
var consumeStop = CancellationTokenSource.CreateLinkedTokenSource(cts.Token);
await foreach (var msg in consumer.ConsumeAsync<NatsMemoryOwner<byte>>(consumeOpts, consumeStop.Token))
{
using (msg.Data)
{
Expand All @@ -226,7 +188,7 @@ void Report(int i, Stopwatch sw, string data)
if (message == "stop")
{
Console.WriteLine("Stopping consumer...");
sub.Stop();
consumeStop.Cancel();
stopped = true;
}
}
Expand Down Expand Up @@ -257,36 +219,6 @@ void Report(int i, Stopwatch sw, string data)
}
}
}
else if (cmd == "consume-all")
{
while (!cts.Token.IsCancellationRequested)
{
try
{
Console.WriteLine("___\nCONSUME-ALL");
await foreach (var msg in consumer.ConsumeAllAsync<NatsMemoryOwner<byte>>(consumeOpts, cts.Token))
{
using (msg.Data)
{
var message = Encoding.ASCII.GetString(msg.Data.Span);
Console.WriteLine($"Received: {message}");
}

await msg.AckAsync(cancellationToken: cts.Token);
Report(++count, stopwatch, $"data: {msg.Data}");
}
}
catch (NatsJSProtocolException e)
{
Console.WriteLine(e.Message);
}
catch (NatsJSException e)
{
Console.WriteLine(e.Message);
await Task.Delay(1000);
}
}
}
else
{
Console.WriteLine("Usage: dotnet run -- <consume|consume-all|fetch|fetch-all|next>");
Expand Down
29 changes: 0 additions & 29 deletions src/NATS.Client.JetStream/INatsJSConsume.cs

This file was deleted.

14 changes: 0 additions & 14 deletions src/NATS.Client.JetStream/INatsJSFetch.cs

This file was deleted.

12 changes: 4 additions & 8 deletions src/NATS.Client.JetStream/Internal/NatsJSConsume.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ internal struct PullRequest
public string Origin { get; init; }
}

internal class NatsJSConsume<TMsg> : NatsSubBase, INatsJSConsume<TMsg>
internal class NatsJSConsume<TMsg> : NatsSubBase
{
private readonly ILogger _logger;
private readonly bool _debug;
private readonly CancellationTokenSource _cts;
private readonly Channel<NatsJSMsg<TMsg?>> _userMsgs;
private readonly Channel<PullRequest> _pullRequests;
private readonly NatsJSContext _context;
Expand Down Expand Up @@ -60,8 +59,7 @@ public NatsJSConsume(
CancellationToken cancellationToken)
: base(context.Connection, context.Connection.SubscriptionManager, subject, queueGroup, opts)
{
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_cancellationToken = _cts.Token;
_cancellationToken = cancellationToken;
_logger = Connection.Opts.LoggerFactory.CreateLogger<NatsJSConsume<TMsg>>();
_debug = _logger.IsEnabled(LogLevel.Debug);
_context = context;
Expand Down Expand Up @@ -139,8 +137,6 @@ public NatsJSConsume(

public ChannelReader<NatsJSMsg<TMsg?>> Msgs { get; }

public void Stop() => _cts.Cancel();

public ValueTask CallMsgNextAsync(string origin, ConsumerGetnextRequest request, CancellationToken cancellationToken = default)
{
if (_cancellationToken.IsCancellationRequested)
Expand Down Expand Up @@ -295,7 +291,7 @@ protected override async ValueTask ReceiveInternalAsync(
}
else if (headers.HasTerminalJSError())
{
_userMsgs.Writer.TryComplete(new NatsJSProtocolException($"JetStream server error: {headers.Code} {headers.MessageText}"));
_userMsgs.Writer.TryComplete(new NatsJSProtocolException(headers.Code, headers.Message, headers.MessageText));
EndSubscription(NatsSubEndReason.JetStreamError);
}
else
Expand Down Expand Up @@ -434,7 +430,7 @@ private void Pull(string origin, long batch, long maxBytes) => _pullRequests.Wri

private async Task PullLoop()
{
await foreach (var pr in _pullRequests.Reader.ReadAllAsync())
await foreach (var pr in _pullRequests.Reader.ReadAllAsync().ConfigureAwait(false))
{
var origin = $"pull-loop({pr.Origin})";
await CallMsgNextAsync(origin, pr.Request).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public static class NatsJSExtensionsInternal

public static bool HasTerminalJSError(this NatsHeaders headers) => headers
is { Code: 400 }
or { Code: 404 }
or { Code: 409, Message: NatsHeaders.Messages.ConsumerDeleted }
or { Code: 409, Message: NatsHeaders.Messages.ConsumerIsPushBased };
}
4 changes: 2 additions & 2 deletions src/NATS.Client.JetStream/Internal/NatsJSFetch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace NATS.Client.JetStream.Internal;

internal class NatsJSFetch<TMsg> : NatsSubBase, INatsJSFetch<TMsg>
internal class NatsJSFetch<TMsg> : NatsSubBase
{
private readonly ILogger _logger;
private readonly bool _debug;
Expand Down Expand Up @@ -189,7 +189,7 @@ protected override async ValueTask ReceiveInternalAsync(
}
else if (headers.HasTerminalJSError())
{
_userMsgs.Writer.TryComplete(new NatsJSProtocolException($"JetStream server error: {headers.Code} {headers.MessageText}"));
_userMsgs.Writer.TryComplete(new NatsJSProtocolException(headers.Code, headers.Message, headers.MessageText));
EndSubscription(NatsSubEndReason.JetStreamError);
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ internal class NatsJSOrderedPushConsumer<T>
private readonly Task _commandTask;
private readonly long _ackWaitNanos;

private long _sequenceStream;
private long _sequenceConsumer;
private ulong _sequenceStream;
private ulong _sequenceConsumer;
private string _consumer;
private volatile NatsJSOrderedPushConsumerSub<T>? _sub;
private int _done;
Expand Down Expand Up @@ -218,7 +218,7 @@ private async Task CommandLoop()

var sequence = Interlocked.Increment(ref _sequenceConsumer);

if (sequence != (long)metadata.Sequence.Consumer)
if (sequence != metadata.Sequence.Consumer)
{
CreateSub("sequence-mismatch");
_logger.LogWarning("Missed messages, recreating consumer");
Expand All @@ -228,7 +228,7 @@ private async Task CommandLoop()
// Increment the sequence before writing to the channel in case the channel is full
// and the writer is waiting for the reader to read the message. This way the sequence
// will be correctly incremented in case the timeout kicks in and recreated the consumer.
Interlocked.Exchange(ref _sequenceStream, (long)metadata.Sequence.Stream);
Interlocked.Exchange(ref _sequenceStream, metadata.Sequence.Stream);

if (!IsDone)
{
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.JetStream/Models/ConsumerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public record ConsumerConfiguration
[System.Text.Json.Serialization.JsonPropertyName("opt_start_seq")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
[System.ComponentModel.DataAnnotations.Range(0D, 18446744073709552000D)]
public long OptStartSeq { get; set; } = default!;
public ulong OptStartSeq { get; set; } = default!;

[System.Text.Json.Serialization.JsonPropertyName("opt_start_time")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
Expand Down
Loading

0 comments on commit 548b15c

Please sign in to comment.