Skip to content

Commit

Permalink
Subscribe API async enumerable change (#182)
Browse files Browse the repository at this point in the history
* Changed subscribe API to async enumerable

* Subscribe docs updates

* Subscriber cancellation fix

The only way to stop the subscription in the async enumerable
model is to use a cancellation token. When we use the token
we must make sure the channel is completed cleanly as well.

* Fixed perf test subscription
  • Loading branch information
mtmk authored Nov 7, 2023
1 parent 342c3ea commit 86fda03
Show file tree
Hide file tree
Showing 38 changed files with 335 additions and 257 deletions.
25 changes: 22 additions & 3 deletions docs/documentation/core/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ Subscribe to all `bar` [related subjects](https://docs.nats.io/nats-concepts/sub
```csharp
await using var nats = new NatsConnection();

await using sub = await nats.SubscribeAsync<Bar>("bar.>");
await foreach (var msg in sub.Msgs.ReadAllAsync())
await foreach (var msg in nats.Subscription("bar.>"))
{
if (msg.Subject == "bar.exit")
break;

Console.WriteLine($"Received {msg.Subject}: {msg.Data}\n");
}
```
Expand All @@ -49,6 +51,8 @@ for (int i = 0; i < 10; i++)
Console.WriteLine($" Publishing {i}...");
await nats.PublishAsync<Bar>($"bar.baz.{i}", new Bar { Id = i, Name = "Baz" });
}

await nats.PublishAsync("bar.exit");
```

## Logging
Expand All @@ -57,7 +61,22 @@ You should also hook your logger to `NatsConnection` to make sure all is working
to get help diagnosing any issues you might have:

```csharp
var opts = NatsOpts.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) };
// First add Nuget package Microsoft.Extensions.Logging.Console
using Microsoft.Extensions.Logging;

using var loggerFactory = LoggerFactory.Create(configure: builder =>
{
builder
.SetMinimumLevel(LogLevel.Information)
.AddSimpleConsole(options =>
{
options.SingleLine = true;
options.TimestampFormat = "yyyy-MM-dd HH:mm:ss.fff zzz ";
});
});

var opts = NatsOpts.Default with { LoggerFactory = loggerFactory };

await using var nats = new NatsConnection(otps);
```

Expand Down
19 changes: 14 additions & 5 deletions docs/documentation/core/pub-sub.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,25 @@ receives the message.
```csharp
await using var nats = new NatsConnection();

await using sub = await nats.SubscribeAsync<int>("foo");
var sub = Task.Run(async () =>
{
await foreach(var msg in nats.SubscribeAsync<int>("foo"))
{
Console.WriteLine($"Received {msg.Subject}: {msg.Data}\n");

if (msg.Data == -1)
break;
}
});

for (int i = 0; i < 10; i++)
{
Console.WriteLine($" Publishing {i}...");
await nats.PublishAsync<int>("foo", i);
await Task.Delay(1000);
}

await foreach (var msg in sub.Msgs.ReadAllAsync())
{
Console.WriteLine($"Received {msg.Subject}: {msg.Data}\n");
}
await nats.PublishAsync<int>("foo", -1);

await sub;
```
12 changes: 4 additions & 8 deletions docs/documentation/core/queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,17 @@ await using var nats = new NatsConnection();

var subs = new List<NatsSubBase>();
var replyTasks = new List<Task>();
var cts = new CancellationTokenSource();

for (int i = 0; i < 3; i++)
{
// Create three subscriptions all on the same queue group
var sub = await nats.SubscribeAsync<int>("math.double", queueGroup: "maths-service");

subs.Add(sub);

// Create a background message loop for every subscription
var replyTaskId = i;
replyTasks.Add(Task.Run(async () =>
{
// Retrieve messages until unsubscribed
await foreach (var msg in sub.Msgs.ReadAllAsync())
await foreach (var msg in nats.SubscribeAsync<int>("math.double", queueGroup: "maths-service", cancellationToken: cts.Token))
{
Console.WriteLine($"[{replyTaskId}] Received request: {msg.Data}");
await msg.ReplyAsync($"Answer is: {2 * msg.Data}");
Expand All @@ -49,9 +46,8 @@ for (int i = 0; i < 10; i++)

Console.WriteLine("Stopping...");

// Unsubscribing or disposing will complete the message loops
foreach (var sub in subs)
await sub.UnsubscribeAsync();
// Cancellation token will unsubcribe and complete the message loops
cts.Cancel();

// Make sure all tasks finished cleanly
await Task.WhenAll(replyTasks);
Expand Down
4 changes: 1 addition & 3 deletions docs/documentation/core/req-rep.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ Create a service that will be responding to requests:
```csharp
await using var nats = new NatsConnection();

await using var sub = await conn.SubscribeAsync<int>("math.double");

await foreach (var msg in sub.Msgs.ReadAllAsync())
await foreach (var msg in conn.SubscribeAsync<int>("math.double"))
{
Console.WriteLine($"Received request: {msg.Data}");

Expand Down
4 changes: 2 additions & 2 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ The NATS.NET V2 client is in preview and not recommended for production use yet.
- [x] Object Store initial support
- [x] Service API initial support
- [x] .NET 8.0 support (Native AOT)
- [ ] Implementation of missing major features (e.g. JetStream ordered consumers)
- [ ] Beta phase
- [x] Implementation of missing major features (e.g. JetStream ordered consumers)
- [x] Beta phase
- [ ] Testing and bug fixing
- [ ] General Availability

Expand Down
15 changes: 7 additions & 8 deletions sandbox/BlazorWasm/Server/NatsServices/WeatherForecastService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@ public class WeatherForecastService : IHostedService, IAsyncDisposable

private readonly ILogger<WeatherForecastService> _logger;
private readonly INatsConnection _natsConnection;
private INatsSub<object>? _replySubscription;
private Task? _replyTask;
private CancellationTokenSource? _cts;

public WeatherForecastService(ILogger<WeatherForecastService> logger, INatsConnection natsConnection)
{
_logger = logger;
_natsConnection = natsConnection;
}

public async Task StartAsync(CancellationToken cancellationToken)
public Task StartAsync(CancellationToken cancellationToken)
{
_replySubscription = await _natsConnection.SubscribeAsync<object>("weather", cancellationToken: cancellationToken);
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_replyTask = Task.Run(
async () =>
{
await foreach (var msg in _replySubscription.Msgs.ReadAllAsync(cancellationToken))
await foreach (var msg in _natsConnection.SubscribeAsync<object>("weather", cancellationToken: cancellationToken))
{
var forecasts = Enumerable.Range(1, 5).Select(index => new WeatherForecast
{
Expand All @@ -40,21 +40,20 @@ public async Task StartAsync(CancellationToken cancellationToken)
},
cancellationToken);
_logger.LogInformation("Weather Forecast Services is running");
return Task.CompletedTask;
}

public async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Weather Forecast Services is stopping");
if (_replySubscription != null)
await _replySubscription.UnsubscribeAsync();
_cts?.Cancel();
if (_replyTask != null)
await _replyTask;
}

public async ValueTask DisposeAsync()
{
if (_replySubscription != null)
await _replySubscription.DisposeAsync();
_cts?.Cancel();
if (_replyTask != null)
await _replyTask;
}
Expand Down
14 changes: 5 additions & 9 deletions sandbox/ConsoleApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
};

// Server
var sub = await conn.SubscribeAsync<int>("foobar");
var cts = new CancellationTokenSource();
var replyTask = Task.Run(async () =>
{
await foreach (var msg in sub.Msgs.ReadAllAsync())
await foreach (var msg in conn.SubscribeAsync<int>("foobar", cancellationToken: cts.Token))
{
await msg.ReplyAsync($"Hello {msg.Data}");
}
Expand All @@ -35,15 +35,13 @@
// Client(response: "Hello 100")
var response = await conn.RequestAsync<int, string>("foobar", 100);

await sub.UnsubscribeAsync();
cts.Cancel();
await replyTask;

// subscribe
var subscription = await conn.SubscribeAsync<Person>("foo");

_ = Task.Run(async () =>
{
await foreach (var msg in subscription.Msgs.ReadAllAsync())
await foreach (var msg in conn.SubscribeAsync<Person>("foo"))
{
Console.WriteLine($"Received {msg.Data}");
}
Expand Down Expand Up @@ -84,11 +82,9 @@ public Runner(INatsConnection connection)
[RootCommand]
public async Task Run()
{
var subscription = await _connection.SubscribeAsync<string>("foo");

_ = Task.Run(async () =>
{
await foreach (var msg in subscription.Msgs.ReadAllAsync())
await foreach (var msg in _connection.SubscribeAsync<string>("foo"))
{
Console.WriteLine("Yeah");
}
Expand Down
4 changes: 1 addition & 3 deletions sandbox/Example.Core.SubscribeHeaders/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@

Print($"[SUB] Subscribing to subject '{subject}'...\n");

var sub = await connection.SubscribeAsync<byte[]>(subject);

await foreach (var msg in sub.Msgs.ReadAllAsync())
await foreach (var msg in connection.SubscribeAsync<byte[]>(subject))
{
Print($"[RCV] {msg.Subject}: {Encoding.UTF8.GetString(msg.Data!)}\n");
if (msg.Headers != null)
Expand Down
4 changes: 1 addition & 3 deletions sandbox/Example.Core.SubscribeModel/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@

Print($"[SUB] Subscribing to subject '{subject}'...\n");

var sub = await connection.SubscribeAsync<Bar>(subject);

await foreach (var msg in sub.Msgs.ReadAllAsync())
await foreach (var msg in connection.SubscribeAsync<Bar>(subject))
{
Print($"[RCV] {msg.Subject}: {msg.Data}\n");
}
Expand Down
12 changes: 6 additions & 6 deletions sandbox/Example.Core.SubscribeQueueGroup/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
await using var connection1 = new NatsConnection(options);

Print($"[1][SUB] Subscribing to subject '{subject}'...\n");
var sub1 = await connection1.SubscribeAsync<string>(subject, queueGroup: "My-Workers");
var cts1 = new CancellationTokenSource();
var task1 = Task.Run(async () =>
{
await foreach (var msg in sub1.Msgs.ReadAllAsync())
await foreach (var msg in connection1.SubscribeAsync<string>(subject, queueGroup: "My-Workers", cancellationToken: cts1.Token))
{
Print($"[1][RCV] {msg.Subject}: {msg.Data}\n");
}
Expand All @@ -26,10 +26,10 @@
await using var connection2 = new NatsConnection(options);

Print($"[2][SUB] Subscribing to subject '{subject}'...\n");
var sub2 = await connection2.SubscribeAsync<string>(subject, queueGroup: "My-Workers");
var cts2 = new CancellationTokenSource();
var task2 = Task.Run(async () =>
{
await foreach (var msg in sub2.Msgs.ReadAllAsync())
await foreach (var msg in connection2.SubscribeAsync<string>(subject, queueGroup: "My-Workers", cancellationToken: cts2.Token))
{
Print($"[2][RCV] {msg.Subject}: {msg.Data}\n");
}
Expand All @@ -40,10 +40,10 @@
// ---
// Clean-up
Print($"[1][SUB] Unsubscribing '{subject}'...\n");
await sub1.DisposeAsync();
cts1.Cancel();

Print($"[2][SUB] Unsubscribing '{subject}'...\n");
await sub2.DisposeAsync();
cts2.Cancel();

await Task.WhenAll(task1, task2);

Expand Down
4 changes: 1 addition & 3 deletions sandbox/Example.Core.SubscribeRaw/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@

Print($"[SUB] Subscribing to subject '{subject}'...\n");

var sub = await connection.SubscribeAsync<byte[]>(subject);

await foreach (var msg in sub.Msgs.ReadAllAsync())
await foreach (var msg in connection.SubscribeAsync<byte[]>(subject))
{
var data = Encoding.UTF8.GetString(msg.Data!);
Print($"[RCV] {msg.Subject}: {data}\n");
Expand Down
Loading

0 comments on commit 86fda03

Please sign in to comment.