diff --git a/sandbox/BlazorWasm/Server/NatsServices/WeatherForecastService.cs b/sandbox/BlazorWasm/Server/NatsServices/WeatherForecastService.cs index afcf049ce..4f7ca0919 100644 --- a/sandbox/BlazorWasm/Server/NatsServices/WeatherForecastService.cs +++ b/sandbox/BlazorWasm/Server/NatsServices/WeatherForecastService.cs @@ -12,7 +12,7 @@ public class WeatherForecastService : IHostedService, IAsyncDisposable private readonly ILogger _logger; private readonly INatsConnection _natsConnection; - private NatsSub? _replySubscription; + private INatsSub? _replySubscription; private Task? _replyTask; public WeatherForecastService(ILogger logger, INatsConnection natsConnection) diff --git a/sandbox/NatsBenchmark/Program.cs b/sandbox/NatsBenchmark/Program.cs index c20ae5f65..e78c0bbb3 100644 --- a/sandbox/NatsBenchmark/Program.cs +++ b/sandbox/NatsBenchmark/Program.cs @@ -811,7 +811,7 @@ public struct Vector3 internal static class NatsMsgTestUtils { - internal static NatsSub? Register(this NatsSub? sub, Action> action) + internal static INatsSub? Register(this INatsSub? sub, Action> action) { if (sub == null) return null; @@ -825,7 +825,7 @@ internal static class NatsMsgTestUtils return sub; } - internal static NatsSub? Register(this NatsSub? sub, Action action) + internal static INatsSub? Register(this INatsSub? sub, Action action) { if (sub == null) return null; diff --git a/src/NATS.Client.Core/INatsConnection.cs b/src/NATS.Client.Core/INatsConnection.cs index e095810cc..22c8ddfb3 100644 --- a/src/NATS.Client.Core/INatsConnection.cs +++ b/src/NATS.Client.Core/INatsConnection.cs @@ -19,7 +19,7 @@ public interface INatsConnection /// A for publishing options. /// A used to cancel the command. /// A that represents the asynchronous send operation. - ValueTask PublishAsync(string subject, ReadOnlySequence payload = default, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default); + ValueTask PublishAsync(string subject, ReadOnlySequence payload = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); /// /// Publishes the message payload to the given subject name, optionally supplying a reply subject. @@ -28,7 +28,7 @@ public interface INatsConnection /// A for publishing options. /// A used to cancel the command. /// A that represents the asynchronous send operation. - ValueTask PublishAsync(in NatsMsg msg, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default); + ValueTask PublishAsync(in NatsMsg msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); /// /// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject. @@ -39,7 +39,7 @@ public interface INatsConnection /// A used to cancel the command. /// Specifies the type of data that may be send to the NATS Server. /// A that represents the asynchronous send operation. - ValueTask PublishAsync(string subject, T data, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default); + ValueTask PublishAsync(string subject, T data, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); /// /// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject. @@ -49,7 +49,7 @@ public interface INatsConnection /// A used to cancel the command. /// Specifies the type of data that may be send to the NATS Server. /// A that represents the asynchronous send operation. - ValueTask PublishAsync(in NatsMsg msg, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default); + ValueTask PublishAsync(in NatsMsg msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default); /// /// Initiates a subscription to a subject, optionally joining a distributed queue group. @@ -58,7 +58,7 @@ public interface INatsConnection /// A for subscription options. /// A used to cancel the command. /// A that represents the asynchronous send operation. - ValueTask SubscribeAsync(string subject, in NatsSubOpts? opts = default, CancellationToken cancellationToken = default); + ValueTask SubscribeAsync(string subject, NatsSubOpts? opts = default, CancellationToken cancellationToken = default); /// /// Initiates a subscription to a subject, optionally joining a distributed queue group. @@ -68,5 +68,5 @@ public interface INatsConnection /// A used to cancel the command. /// Specifies the type of data that may be received from the NATS Server. /// A that represents the asynchronous send operation. - ValueTask> SubscribeAsync(string subject, in NatsSubOpts? opts = default, CancellationToken cancellationToken = default); + ValueTask> SubscribeAsync(string subject, NatsSubOpts? opts = default, CancellationToken cancellationToken = default); } diff --git a/src/NATS.Client.Core/INatsSub.cs b/src/NATS.Client.Core/INatsSub.cs new file mode 100644 index 000000000..91ef3c48f --- /dev/null +++ b/src/NATS.Client.Core/INatsSub.cs @@ -0,0 +1,57 @@ +using System.Threading.Channels; + +namespace NATS.Client.Core; + +public interface INatsSub : IAsyncDisposable +{ + /// + /// Access incoming messages for your subscription. + /// + ChannelReader Msgs { get; } + + /// + /// The subject name to subscribe to. + /// + string Subject { get; } + + /// + /// If specified, the subscriber will join this queue group. Subscribers with the same queue group name, + /// become a queue group, and only one randomly chosen subscriber of the queue group will + /// consume a message each time a message is received by the queue group. + /// + string? QueueGroup { get; } + + /// + /// Complete the message channel, stop timers if they were used and send an unsubscribe + /// message to the server. + /// + /// A that represents the asynchronous server UNSUB operation. + public ValueTask UnsubscribeAsync(); +} + +public interface INatsSub : IAsyncDisposable +{ + /// + /// Access incoming messages for your subscription. + /// + ChannelReader> Msgs { get; } + + /// + /// The subject name to subscribe to. + /// + string Subject { get; } + + /// + /// If specified, the subscriber will join this queue group. Subscribers with the same queue group name, + /// become a queue group, and only one randomly chosen subscriber of the queue group will + /// consume a message each time a message is received by the queue group. + /// + string? QueueGroup { get; } + + /// + /// Complete the message channel, stop timers if they were used and send an unsubscribe + /// message to the server. + /// + /// A that represents the asynchronous server UNSUB operation. + public ValueTask UnsubscribeAsync(); +} diff --git a/src/NATS.Client.Core/Internal/INatsSub.cs b/src/NATS.Client.Core/Internal/INatsSub.cs deleted file mode 100644 index 3c7164f25..000000000 --- a/src/NATS.Client.Core/Internal/INatsSub.cs +++ /dev/null @@ -1,53 +0,0 @@ -using System.Buffers; -using System.Collections.Concurrent; - -namespace NATS.Client.Core.Internal; - -internal interface INatsSub : IAsyncDisposable -{ - string Subject { get; } - - string? QueueGroup { get; } - - int? PendingMsgs { get; } - - /// - /// Called after subscription is sent to the server. - /// Helps maintain more accurate timeouts, especially idle timeout. - /// - void Ready(); - - ValueTask ReceiveAsync(string subject, string? replyTo, ReadOnlySequence? headersBuffer, ReadOnlySequence payloadBuffer); -} - -internal interface INatsSubBuilder - where T : INatsSub -{ - T Build(string subject, NatsSubOpts? opts, NatsConnection connection, ISubscriptionManager manager); -} - -internal class NatsSubBuilder : INatsSubBuilder -{ - public static readonly NatsSubBuilder Default = new(); - - public NatsSub Build(string subject, NatsSubOpts? opts, NatsConnection connection, ISubscriptionManager manager) - { - return new NatsSub(connection, manager, subject, opts); - } -} - -internal class NatsSubModelBuilder : INatsSubBuilder> -{ - private static readonly ConcurrentDictionary> Cache = new(); - private readonly INatsSerializer _serializer; - - public NatsSubModelBuilder(INatsSerializer serializer) => _serializer = serializer; - - public static NatsSubModelBuilder For(INatsSerializer serializer) => - Cache.GetOrAdd(serializer, static s => new NatsSubModelBuilder(s)); - - public NatsSub Build(string subject, NatsSubOpts? opts, NatsConnection connection, ISubscriptionManager manager) - { - return new NatsSub(connection, manager, subject, opts, _serializer); - } -} diff --git a/src/NATS.Client.Core/Internal/InboxSub.cs b/src/NATS.Client.Core/Internal/InboxSub.cs index e90f1c074..c81d87e14 100644 --- a/src/NATS.Client.Core/Internal/InboxSub.cs +++ b/src/NATS.Client.Core/Internal/InboxSub.cs @@ -5,11 +5,10 @@ namespace NATS.Client.Core.Internal; -internal class InboxSub : INatsSub +internal class InboxSub : NatsSubBase { private readonly InboxSubBuilder _inbox; private readonly NatsConnection _connection; - private readonly ISubscriptionManager _manager; public InboxSub( InboxSubBuilder inbox, @@ -17,37 +16,28 @@ public InboxSub( NatsSubOpts? opts, NatsConnection connection, ISubscriptionManager manager) + : base(connection, manager, subject, opts) { _inbox = inbox; _connection = connection; - _manager = manager; - Subject = subject; - QueueGroup = opts?.QueueGroup; - PendingMsgs = opts?.MaxMsgs; } - public string Subject { get; } - - public string? QueueGroup { get; } - - public int? PendingMsgs { get; } - - public void Ready() - { - } + protected override ValueTask ReceiveInternalAsync( + string subject, + string? replyTo, + ReadOnlySequence? headersBuffer, + ReadOnlySequence payloadBuffer) => + _inbox.ReceivedAsync(subject, replyTo, headersBuffer, payloadBuffer, _connection); - public ValueTask ReceiveAsync(string subject, string? replyTo, ReadOnlySequence? headersBuffer, ReadOnlySequence payloadBuffer) + protected override void TryComplete() { - return _inbox.ReceivedAsync(subject, replyTo, headersBuffer, payloadBuffer, _connection); } - - public ValueTask DisposeAsync() => _manager.RemoveAsync(this); } -internal class InboxSubBuilder : INatsSubBuilder, ISubscriptionManager +internal class InboxSubBuilder : ISubscriptionManager { private readonly ILogger _logger; - private readonly ConcurrentDictionary> _bySubject = new(); + private readonly ConcurrentDictionary> _bySubject = new(); public InboxSubBuilder(ILogger logger) => _logger = logger; @@ -56,11 +46,11 @@ public InboxSub Build(string subject, NatsSubOpts? opts, NatsConnection connecti return new InboxSub(this, subject, opts, connection, manager); } - public void Register(INatsSub sub) + public void Register(NatsSubBase sub) { _bySubject.AddOrUpdate( sub.Subject, - static (_, s) => new ConditionalWeakTable { { s, new object() } }, + static (_, s) => new ConditionalWeakTable { { s, new object() } }, static (_, subTable, s) => { lock (subTable) @@ -69,7 +59,7 @@ public void Register(INatsSub sub) { // if current subTable is empty, it may be in process of being removed // return a new object - return new ConditionalWeakTable { { s, new object() } }; + return new ConditionalWeakTable { { s, new object() } }; } // the updateValueFactory delegate can be called multiple times @@ -97,7 +87,7 @@ public async ValueTask ReceivedAsync(string subject, string? replyTo, ReadOnlySe } } - public ValueTask RemoveAsync(INatsSub sub) + public ValueTask RemoveAsync(NatsSubBase sub) { if (!_bySubject.TryGetValue(sub.Subject, out var subTable)) { diff --git a/src/NATS.Client.Core/Internal/SubscriptionManager.cs b/src/NATS.Client.Core/Internal/SubscriptionManager.cs index e09918d00..de3bead34 100644 --- a/src/NATS.Client.Core/Internal/SubscriptionManager.cs +++ b/src/NATS.Client.Core/Internal/SubscriptionManager.cs @@ -7,10 +7,10 @@ namespace NATS.Client.Core.Internal; internal interface ISubscriptionManager { - public ValueTask RemoveAsync(INatsSub sub); + public ValueTask RemoveAsync(NatsSubBase sub); } -internal record struct SidMetadata(string Subject, WeakReference WeakReference); +internal record struct SidMetadata(string Subject, WeakReference WeakReference); internal sealed record SubscriptionMetadata(int Sid); @@ -21,11 +21,10 @@ internal sealed class SubscriptionManager : ISubscriptionManager, IAsyncDisposab private readonly NatsConnection _connection; private readonly string _inboxPrefix; private readonly ConcurrentDictionary _bySid = new(); - private readonly ConditionalWeakTable _bySub = new(); + private readonly ConditionalWeakTable _bySub = new(); private readonly CancellationTokenSource _cts; private readonly Task _timer; private readonly TimeSpan _cleanupInterval; - private readonly InboxSubBuilder _inboxSubBuilder; private readonly InboxSub _inboxSubSentinel; private readonly SemaphoreSlim _inboxSubLock = new(initialCount: 1, maxCount: 1); @@ -40,58 +39,22 @@ public SubscriptionManager(NatsConnection connection, string inboxPrefix) _cts = new CancellationTokenSource(); _cleanupInterval = _connection.Options.SubscriptionCleanUpInterval; _timer = Task.Run(CleanupAsync); - _inboxSubBuilder = new InboxSubBuilder(connection.Options.LoggerFactory.CreateLogger()); - _inboxSubSentinel = new InboxSub(_inboxSubBuilder, nameof(_inboxSubSentinel), default, connection, this); + InboxSubBuilder = new InboxSubBuilder(connection.Options.LoggerFactory.CreateLogger()); + _inboxSubSentinel = new InboxSub(InboxSubBuilder, nameof(_inboxSubSentinel), default, connection, this); _inboxSub = _inboxSubSentinel; } - public IEnumerable<(int Sid, string Subject, string? QueueGroup, int? maxMsgs)> GetExistingSubscriptions() - { - lock (_gate) - { - foreach (var (sid, sidMetadata) in _bySid) - { - if (sidMetadata.WeakReference.TryGetTarget(out var sub)) - { - yield return (sid, sub.Subject, sub.QueueGroup, sub.PendingMsgs); - } - } - } - } + internal InboxSubBuilder InboxSubBuilder { get; } - public async ValueTask SubscribeAsync(string subject, NatsSubOpts? opts, INatsSubBuilder builder, CancellationToken cancellationToken) - where T : INatsSub + public async ValueTask SubscribeAsync(string subject, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken) { if (subject.StartsWith(_inboxPrefix, StringComparison.Ordinal)) { - if (Interlocked.CompareExchange(ref _inboxSub, _inboxSubSentinel, _inboxSubSentinel) == _inboxSubSentinel) - { - await _inboxSubLock.WaitAsync(CancellationToken.None).ConfigureAwait(false); - try - { - if (Interlocked.CompareExchange(ref _inboxSub, _inboxSubSentinel, _inboxSubSentinel) == _inboxSubSentinel) - { - var inboxSubject = $"{_inboxPrefix}*"; - _inboxSub = await SubscribeInternalAsync( - inboxSubject, - opts: default, - _inboxSubBuilder, - cancellationToken).ConfigureAwait(false); - } - } - finally - { - _inboxSubLock.Release(); - } - } - - var sub = builder.Build(subject, opts, connection: _connection, _inboxSubBuilder); - _inboxSubBuilder.Register(sub); - return sub; + await SubscribeInboxAsync(subject, opts, sub, cancellationToken).ConfigureAwait(false); } else { - return await SubscribeInternalAsync(subject, opts, builder, cancellationToken).ConfigureAwait(false); + await SubscribeInternalAsync(subject, opts, sub, cancellationToken).ConfigureAwait(false); } } @@ -137,7 +100,7 @@ public async ValueTask DisposeAsync() { _cts.Cancel(); - WeakReference[] subRefs; + WeakReference[] subRefs; lock (_gate) { subRefs = _bySid.Values.Select(m => m.WeakReference).ToArray(); @@ -151,7 +114,7 @@ public async ValueTask DisposeAsync() } } - public ValueTask RemoveAsync(INatsSub sub) + public ValueTask RemoveAsync(NatsSubBase sub) { if (!_bySub.TryGetValue(sub, out var subMetadata)) { @@ -167,14 +130,54 @@ public ValueTask RemoveAsync(INatsSub sub) return _connection.UnsubscribeAsync(subMetadata.Sid); } - private async ValueTask SubscribeInternalAsync(string subject, NatsSubOpts? opts, INatsSubBuilder builder, CancellationToken cancellationToken) - where T : INatsSub + public async ValueTask ReconnectAsync(CancellationToken cancellationToken) + { + foreach (var (sid, sidMetadata) in _bySid) + { + if (sidMetadata.WeakReference.TryGetTarget(out var sub)) + { + // yield return (sid, sub.Subject, sub.QueueGroup, sub.PendingMsgs); + await _connection + .SubscribeCoreAsync(sid, sub.Subject, sub.QueueGroup, sub.PendingMsgs, cancellationToken) + .ConfigureAwait(false); + sub.Ready(); + } + } + } + + private async ValueTask SubscribeInboxAsync(string subject, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken) + { + if (Interlocked.CompareExchange(ref _inboxSub, _inboxSubSentinel, _inboxSubSentinel) == _inboxSubSentinel) + { + await _inboxSubLock.WaitAsync(CancellationToken.None).ConfigureAwait(false); + try + { + if (Interlocked.CompareExchange(ref _inboxSub, _inboxSubSentinel, _inboxSubSentinel) == _inboxSubSentinel) + { + var inboxSubject = $"{_inboxPrefix}*"; + _inboxSub = InboxSubBuilder.Build(subject, opts, _connection, manager: this); + await SubscribeInternalAsync( + inboxSubject, + opts: default, + _inboxSub, + cancellationToken).ConfigureAwait(false); + } + } + finally + { + _inboxSubLock.Release(); + } + } + + InboxSubBuilder.Register(sub); + } + + private async ValueTask SubscribeInternalAsync(string subject, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken) { - var sub = builder.Build(subject, opts, connection: _connection, this); var sid = GetNextSid(); lock (_gate) { - _bySid[sid] = new SidMetadata(Subject: subject, WeakReference: new WeakReference(sub)); + _bySid[sid] = new SidMetadata(Subject: subject, WeakReference: new WeakReference(sub)); _bySub.AddOrUpdate(sub, new SubscriptionMetadata(Sid: sid)); } @@ -183,7 +186,6 @@ private async ValueTask SubscribeInternalAsync(string subject, NatsSubOpts await _connection.SubscribeCoreAsync(sid, subject, opts?.QueueGroup, opts?.MaxMsgs, cancellationToken) .ConfigureAwait(false); sub.Ready(); - return sub; } catch { diff --git a/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs b/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs index ad3ba4fcb..e3b0f3e30 100644 --- a/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs +++ b/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs @@ -100,18 +100,17 @@ internal ValueTask PubModelAsync(string subject, T? data, INatsSerializer ser } } - internal ValueTask SubAsync(string subject, NatsSubOpts? opts, INatsSubBuilder builder, CancellationToken cancellationToken = default) - where T : INatsSub + internal ValueTask SubAsync(string subject, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken = default) { if (ConnectionState == NatsConnectionState.Open) { - return _subscriptionManager.SubscribeAsync(subject, opts, builder, cancellationToken); + return SubscriptionManager.SubscribeAsync(subject, opts, sub, cancellationToken); } else { - return WithConnectAsync(subject, opts, builder, cancellationToken, static (self, s, o, b, token) => + return WithConnectAsync(subject, opts, sub, cancellationToken, static (self, s, o, b, token) => { - return self._subscriptionManager.SubscribeAsync(s, o, b, token); + return self.SubscriptionManager.SubscribeAsync(s, o, b, token); }); } } diff --git a/src/NATS.Client.Core/NatsConnection.Publish.cs b/src/NATS.Client.Core/NatsConnection.Publish.cs index 86ff987bd..a2453c652 100644 --- a/src/NATS.Client.Core/NatsConnection.Publish.cs +++ b/src/NATS.Client.Core/NatsConnection.Publish.cs @@ -5,7 +5,7 @@ namespace NATS.Client.Core; public partial class NatsConnection { /// - public ValueTask PublishAsync(string subject, ReadOnlySequence payload = default, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) + public ValueTask PublishAsync(string subject, ReadOnlySequence payload = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { if (opts?.WaitUntilSent ?? false) { @@ -18,13 +18,13 @@ public ValueTask PublishAsync(string subject, ReadOnlySequence payload = d } /// - public ValueTask PublishAsync(in NatsMsg msg, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) + public ValueTask PublishAsync(in NatsMsg msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { return PublishAsync(msg.Subject, msg.Data, opts, cancellationToken); } /// - public ValueTask PublishAsync(string subject, T? data, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) + public ValueTask PublishAsync(string subject, T? data, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { var serializer = opts?.Serializer ?? Options.Serializer; if (opts?.WaitUntilSent ?? false) @@ -38,7 +38,7 @@ public ValueTask PublishAsync(string subject, T? data, in NatsPubOpts? opts = } /// - public ValueTask PublishAsync(in NatsMsg msg, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) + public ValueTask PublishAsync(in NatsMsg msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { return PublishAsync(msg.Subject, msg.Data, opts, cancellationToken); } diff --git a/src/NATS.Client.Core/NatsConnection.RequestSub.cs b/src/NATS.Client.Core/NatsConnection.RequestSub.cs index 259bdd8f9..ec7347ba6 100644 --- a/src/NATS.Client.Core/NatsConnection.RequestSub.cs +++ b/src/NATS.Client.Core/NatsConnection.RequestSub.cs @@ -5,7 +5,7 @@ namespace NATS.Client.Core; public partial class NatsConnection { - internal async ValueTask RequestSubAsync( + internal async ValueTask RequestSubAsync( string subject, ReadOnlySequence payload = default, NatsPubOpts? requestOpts = default, @@ -13,12 +13,13 @@ internal async ValueTask RequestSubAsync( CancellationToken cancellationToken = default) { var replyTo = $"{InboxPrefix}{Guid.NewGuid():n}"; - var sub = await SubAsync(replyTo, replyOpts, NatsSubBuilder.Default, cancellationToken).ConfigureAwait(false); + var sub = new NatsSub(this, SubscriptionManager.InboxSubBuilder, replyTo, replyOpts); + await SubAsync(replyTo, replyOpts, sub, cancellationToken).ConfigureAwait(false); await PubAsync(subject, replyTo, payload, requestOpts?.Headers, cancellationToken).ConfigureAwait(false); return sub; } - internal async ValueTask> RequestSubAsync( + internal async ValueTask> RequestSubAsync( string subject, TRequest? data, NatsPubOpts? requestOpts = default, @@ -27,8 +28,9 @@ internal async ValueTask> RequestSubAsync( { var replyTo = $"{InboxPrefix}{Guid.NewGuid():n}"; - var builder = NatsSubModelBuilder.For(replyOpts?.Serializer ?? Options.Serializer); - var sub = await SubAsync(replyTo, replyOpts, builder, cancellationToken).ConfigureAwait(false); + var replySerializer = replyOpts?.Serializer ?? Options.Serializer; + var sub = new NatsSub(this, SubscriptionManager.InboxSubBuilder, replyTo, replyOpts, replySerializer); + await SubAsync(replyTo, replyOpts, sub, cancellationToken).ConfigureAwait(false); await PubModelAsync( subject, diff --git a/src/NATS.Client.Core/NatsConnection.Subscribe.cs b/src/NATS.Client.Core/NatsConnection.Subscribe.cs index 91346078a..a03f68e00 100644 --- a/src/NATS.Client.Core/NatsConnection.Subscribe.cs +++ b/src/NATS.Client.Core/NatsConnection.Subscribe.cs @@ -6,15 +6,19 @@ namespace NATS.Client.Core; public partial class NatsConnection { /// - public ValueTask SubscribeAsync(string subject, in NatsSubOpts? opts = default, CancellationToken cancellationToken = default) + public async ValueTask SubscribeAsync(string subject, NatsSubOpts? opts = default, CancellationToken cancellationToken = default) { - return SubAsync(subject, opts, NatsSubBuilder.Default, cancellationToken); + var sub = new NatsSub(this, SubscriptionManager, subject, opts); + await SubAsync(subject, opts, sub, cancellationToken).ConfigureAwait(false); + return sub; } /// - public ValueTask> SubscribeAsync(string subject, in NatsSubOpts? opts = default, CancellationToken cancellationToken = default) + public async ValueTask> SubscribeAsync(string subject, NatsSubOpts? opts = default, CancellationToken cancellationToken = default) { var serializer = opts?.Serializer ?? Options.Serializer; - return SubAsync>(subject, opts, NatsSubModelBuilder.For(serializer), cancellationToken); + var sub = new NatsSub(this, SubscriptionManager, subject, opts, serializer); + await SubAsync(subject, opts, sub, cancellationToken).ConfigureAwait(false); + return sub; } } diff --git a/src/NATS.Client.Core/NatsConnection.cs b/src/NATS.Client.Core/NatsConnection.cs index 13234947c..5cfad2354 100644 --- a/src/NATS.Client.Core/NatsConnection.cs +++ b/src/NATS.Client.Core/NatsConnection.cs @@ -28,7 +28,6 @@ public partial class NatsConnection : IAsyncDisposable, INatsConnection private readonly object _gate = new object(); private readonly WriterState _writerState; private readonly ChannelWriter _commandWriter; - private readonly SubscriptionManager _subscriptionManager; private readonly ILogger _logger; private readonly ObjectPool _pool; private readonly CancellationTimerPool _cancellationTimerPool; @@ -69,7 +68,7 @@ public NatsConnection(NatsOptions options) _writerState = new WriterState(options); _commandWriter = _writerState.CommandBuffer.Writer; InboxPrefix = $"{options.InboxPrefix}.{Guid.NewGuid():n}."; - _subscriptionManager = new SubscriptionManager(this, InboxPrefix); + SubscriptionManager = new SubscriptionManager(this, InboxPrefix); _logger = options.LoggerFactory.CreateLogger(); _clientOptions = new ClientOptions(Options); HeaderParser = new HeaderParser(options.HeaderEncoding); @@ -90,6 +89,8 @@ public NatsConnection(NatsOptions options) public HeaderParser HeaderParser { get; } + internal SubscriptionManager SubscriptionManager { get; } + internal string InboxPrefix { get; } /// @@ -143,7 +144,7 @@ public async ValueTask DisposeAsync() item.SetCanceled(); } - await _subscriptionManager.DisposeAsync().ConfigureAwait(false); + await SubscriptionManager.DisposeAsync().ConfigureAwait(false); _waitForOpenConnection.TrySetCanceled(); _disposedCancellationTokenSource.Cancel(); } @@ -167,7 +168,7 @@ internal void EnqueuePing(AsyncPingCommand pingCommand) internal ValueTask PublishToClientHandlersAsync(string subject, string? replyTo, int sid, in ReadOnlySequence? headersBuffer, in ReadOnlySequence payloadBuffer) { - return _subscriptionManager.PublishToClientHandlersAsync(subject, replyTo, sid, headersBuffer, payloadBuffer); + return SubscriptionManager.PublishToClientHandlersAsync(subject, replyTo, sid, headersBuffer, payloadBuffer); } internal void ResetPongCount() @@ -383,14 +384,6 @@ private async ValueTask SetupReaderWriterAsync(bool reconnect) _writerState.PriorityCommands.Add(connectCommand); _writerState.PriorityCommands.Add(PingCommand.Create(_pool, GetCancellationTimer(CancellationToken.None))); - if (reconnect) - { - // Add SUBSCRIBE command to priority lane - var subscribeCommand = - AsyncSubscribeBatchCommand.Create(_pool, GetCancellationTimer(CancellationToken.None), _subscriptionManager.GetExistingSubscriptions().ToArray()); - _writerState.PriorityCommands.Add(subscribeCommand); - } - // create the socket writer _socketWriter = new NatsPipeliningWriteProtocolProcessor(_socket!, _writerState, _pool, Counter); @@ -399,6 +392,9 @@ private async ValueTask SetupReaderWriterAsync(bool reconnect) // receive COMMAND response (PONG or ERROR) await waitForPongOrErrorSignal.Task.ConfigureAwait(false); + + // Reestablish subscriptions and consumers + await SubscriptionManager.ReconnectAsync(_disposedCancellationTokenSource.Token).ConfigureAwait(false); } catch (Exception) { diff --git a/src/NATS.Client.Core/NatsRequestExtensions.cs b/src/NATS.Client.Core/NatsRequestExtensions.cs index dfe7c7cdc..d7e99b98f 100644 --- a/src/NATS.Client.Core/NatsRequestExtensions.cs +++ b/src/NATS.Client.Core/NatsRequestExtensions.cs @@ -49,11 +49,6 @@ public static class NatsRequestExtensions } } - if (sub is { EndReason: NatsSubEndReason.Exception, Exception: not null }) - { - throw sub.Exception; - } - return null; } @@ -125,11 +120,6 @@ public static class NatsRequestExtensions } } - if (sub is { EndReason: NatsSubEndReason.Exception, Exception: not null }) - { - throw sub.Exception; - } - return null; } diff --git a/src/NATS.Client.Core/NatsSub.cs b/src/NATS.Client.Core/NatsSub.cs index e3fb4bd19..d8fba0272 100644 --- a/src/NATS.Client.Core/NatsSub.cs +++ b/src/NATS.Client.Core/NatsSub.cs @@ -14,7 +14,7 @@ internal enum NatsSubEndReason Exception, } -public abstract class NatsSubBase : INatsSub +public abstract class NatsSubBase { private readonly ISubscriptionManager _manager; private readonly Timer? _timeoutTimer; @@ -90,13 +90,13 @@ internal NatsSubBase( // Hide from public API using explicit interface implementations // since INatsSub is marked as internal. - int? INatsSub.PendingMsgs => _pendingMsgs == -1 ? null : Volatile.Read(ref _pendingMsgs); + public int? PendingMsgs => _pendingMsgs == -1 ? null : Volatile.Read(ref _pendingMsgs); internal NatsSubEndReason EndReason => (NatsSubEndReason)Volatile.Read(ref _endReasonRaw); protected NatsConnection Connection { get; } - void INatsSub.Ready() + public void Ready() { // Let idle timer start with the first message, in case // we're allowed to wait longer for the first message. @@ -149,7 +149,7 @@ public ValueTask DisposeAsync() return unsubscribeAsync; } - ValueTask INatsSub.ReceiveAsync( + public ValueTask ReceiveAsync( string subject, string? replyTo, ReadOnlySequence? headersBuffer, @@ -206,7 +206,7 @@ private void EndSubscription(NatsSubEndReason reason) } } -public sealed class NatsSub : NatsSubBase +public sealed class NatsSub : NatsSubBase, INatsSub { private static readonly BoundedChannelOptions DefaultChannelOptions = new BoundedChannelOptions(1_000) @@ -269,7 +269,7 @@ protected override async ValueTask ReceiveInternalAsync(string subject, string? protected override void TryComplete() => _msgs.Writer.TryComplete(); } -public sealed class NatsSub : NatsSubBase +public sealed class NatsSub : NatsSubBase, INatsSub { private readonly Channel> _msgs; diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs index ac79faaf1..31be14793 100644 --- a/src/NATS.Client.JetStream/NatsJSConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs @@ -52,10 +52,17 @@ public async ValueTask DeleteAsync(CancellationToken cancellationToken = d fetch = new ConsumerGetnextRequest { Batch = prefetch - lowWatermark }; } - await using var sub = await _context.Nats.SubAsync( + await using var sub = new NatsJSSub( + connection: _context.Nats, + manager: _context.Nats.SubscriptionManager, subject: inbox, opts: requestOpts, - builder: NatsJSSubModelBuilder.For(requestOpts.Serializer ?? _context.Nats.Options.Serializer), + serializer: requestOpts.Serializer ?? _context.Nats.Options.Serializer); + + await _context.Nats.SubAsync( + subject: inbox, + opts: requestOpts, + sub: sub, cancellationToken); static async ValueTask MsgNextAsync(NatsJSContext context, string stream, string consumer, ConsumerGetnextRequest request, string inbox, CancellationToken cancellationtoken) @@ -140,10 +147,11 @@ internal async IAsyncEnumerable ConsumeRawAsync( { var inbox = $"_INBOX.{Guid.NewGuid():n}"; - await using var sub = await _context.Nats.SubAsync( + await using var sub = new NatsJSSub(_context.Nats, _context.Nats.SubscriptionManager, inbox, requestOpts); + await _context.Nats.SubAsync( subject: inbox, opts: requestOpts, - builder: NatsJSSubBuilder.Default, + sub: sub, cancellationToken); await _context.Nats.PubModelAsync( @@ -158,11 +166,6 @@ await _context.Nats.PubModelAsync( { yield return msg; } - - if (sub is { EndReason: NatsSubEndReason.Exception, Exception: not null }) - { - throw sub.Exception; - } } internal async IAsyncEnumerable> ConsumeRawAsync( @@ -172,10 +175,11 @@ await _context.Nats.PubModelAsync( { var inbox = $"_INBOX.{Guid.NewGuid():n}"; - await using var sub = await _context.Nats.SubAsync( + await using var sub = new NatsJSSub(_context.Nats, _context.Nats.SubscriptionManager, inbox, requestOpts, requestOpts.Serializer ?? _context.Nats.Options.Serializer); + await _context.Nats.SubAsync( subject: inbox, opts: requestOpts, - builder: NatsJSSubModelBuilder.For(requestOpts.Serializer ?? _context.Nats.Options.Serializer), + sub, cancellationToken); await _context.Nats.PubModelAsync( @@ -190,11 +194,6 @@ await _context.Nats.PubModelAsync( { yield return msg; } - - if (sub is { EndReason: NatsSubEndReason.Exception, Exception: not null }) - { - throw sub.Exception; - } } private void ThrowIfDeleted() diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index 11fd81c95..f427f44fa 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -55,11 +55,6 @@ public async ValueTask PublishAsync( } } - if (sub is { EndReason: NatsSubEndReason.Exception, Exception: not null }) - { - throw sub.Exception; - } - throw new NatsJSException("No response received"); } @@ -108,14 +103,14 @@ internal async ValueTask> JSRequestAsync(default, jsError.Error); } - throw sub.Exception; + throw sb.Exception; } throw new NatsJSException("No response received"); diff --git a/src/NATS.Client.JetStream/NatsJSSub.cs b/src/NATS.Client.JetStream/NatsJSSub.cs index 129f82140..211c97443 100644 --- a/src/NATS.Client.JetStream/NatsJSSub.cs +++ b/src/NATS.Client.JetStream/NatsJSSub.cs @@ -83,22 +83,6 @@ await _msgs.Writer.WriteAsync(new NatsJSControlMsg protected override void TryComplete() => _msgs.Writer.TryComplete(); } -internal class NatsJSSubModelBuilder : INatsSubBuilder> -{ - private static readonly ConcurrentDictionary> Cache = new(); - private readonly INatsSerializer _serializer; - - public NatsJSSubModelBuilder(INatsSerializer serializer) => _serializer = serializer; - - public static NatsJSSubModelBuilder For(INatsSerializer serializer) => - Cache.GetOrAdd(serializer, static s => new NatsJSSubModelBuilder(s)); - - public NatsJSSub Build(string subject, NatsSubOpts? opts, NatsConnection connection, ISubscriptionManager manager) - { - return new NatsJSSub(connection, manager, subject, opts, _serializer); - } -} - /// /// NATS JetStream Subscription with JetStream control message support. /// @@ -166,13 +150,3 @@ await _msgs.Writer.WriteAsync(new NatsJSControlMsg protected override void TryComplete() => _msgs.Writer.TryComplete(); } - -internal class NatsJSSubBuilder : INatsSubBuilder -{ - public static readonly NatsJSSubBuilder Default = new(); - - public NatsJSSub Build(string subject, NatsSubOpts? opts, NatsConnection connection, ISubscriptionManager manager) - { - return new NatsJSSub(connection, manager, subject, opts); - } -} diff --git a/tests/NATS.Client.Core.Tests/LowLevelApiTest.cs b/tests/NATS.Client.Core.Tests/LowLevelApiTest.cs index 7308e109d..4f85763d5 100644 --- a/tests/NATS.Client.Core.Tests/LowLevelApiTest.cs +++ b/tests/NATS.Client.Core.Tests/LowLevelApiTest.cs @@ -15,8 +15,10 @@ public async Task Sub_custom_builder_test() await using var server = NatsServer.Start(); var nats = server.CreateClientConnection(); + var subject = "foo.*"; var builder = new NatsSubCustomTestBuilder(_output); - var sub = await nats.SubAsync("foo.*", opts: default, builder); + var sub = builder.Build(subject, default, nats, nats.SubscriptionManager); + await nats.SubAsync(subject, opts: default, sub); await Retry.Until( "subscription is ready", @@ -37,34 +39,19 @@ await Retry.Until( await sub.DisposeAsync(); } - private class NatsSubTest : INatsSub + private class NatsSubTest : NatsSubBase { private readonly NatsSubCustomTestBuilder _builder; private readonly ITestOutputHelper _output; - private readonly ISubscriptionManager _manager; - public NatsSubTest(NatsSubCustomTestBuilder builder, ITestOutputHelper output, ISubscriptionManager manager) + public NatsSubTest(string subject, NatsConnection connection, NatsSubCustomTestBuilder builder, ITestOutputHelper output, ISubscriptionManager manager) + : base(connection, manager, subject, default) { _builder = builder; _output = output; - _manager = manager; } - public string Subject => string.Empty; - - public string QueueGroup => string.Empty; - - public int? PendingMsgs => 0; - - public int Sid => 0; - - public void Ready() - { - } - - public ValueTask DisposeAsync() => ValueTask.CompletedTask; - - public ValueTask ReceiveAsync(string subject, string? replyTo, ReadOnlySequence? headersBuffer, ReadOnlySequence payloadBuffer) + protected override ValueTask ReceiveInternalAsync(string subject, string? replyTo, ReadOnlySequence? headersBuffer, ReadOnlySequence payloadBuffer) { if (subject.EndsWith(".sync")) { @@ -95,9 +82,13 @@ public ValueTask ReceiveAsync(string subject, string? replyTo, ReadOnlySequence< return ValueTask.CompletedTask; } + + protected override void TryComplete() + { + } } - private class NatsSubCustomTestBuilder : INatsSubBuilder + private class NatsSubCustomTestBuilder { private readonly ITestOutputHelper _output; private readonly WaitSignal _done = new(); @@ -121,7 +112,7 @@ public IEnumerable Messages public NatsSubTest Build(string subject, NatsSubOpts? opts, NatsConnection connection, ISubscriptionManager manager) { - return new NatsSubTest(builder: this, _output, manager); + return new NatsSubTest(subject, connection, builder: this, _output, manager); } public void Sync() => Interlocked.Exchange(ref _sync, 1); diff --git a/tests/NATS.Client.Core.Tests/ProtocolTest.cs b/tests/NATS.Client.Core.Tests/ProtocolTest.cs index 87ff213cb..43edfaf78 100644 --- a/tests/NATS.Client.Core.Tests/ProtocolTest.cs +++ b/tests/NATS.Client.Core.Tests/ProtocolTest.cs @@ -202,7 +202,7 @@ void Log(string text) } Assert.Equal(maxMsgs, count); - Assert.Equal(NatsSubEndReason.MaxMsgs, sub.EndReason); + Assert.Equal(NatsSubEndReason.MaxMsgs, ((NatsSubBase)sub).EndReason); } Log("### Manual unsubscribe"); @@ -235,7 +235,7 @@ void Log(string text) } Assert.Equal(0, count); - Assert.Equal(NatsSubEndReason.None, sub.EndReason); + Assert.Equal(NatsSubEndReason.None, ((NatsSubBase)sub).EndReason); } Log("### Reconnect"); @@ -258,7 +258,7 @@ void Log(string text) await Retry.Until("received", () => Volatile.Read(ref count) == pubMsgs); var pending = maxMsgs - pubMsgs; - Assert.Equal(pending, ((INatsSub)sub).PendingMsgs); + Assert.Equal(pending, ((NatsSubBase)sub).PendingMsgs); proxy.Reset(); @@ -281,7 +281,7 @@ await Retry.Until( await Retry.Until( "unsubscribed with max-msgs", - () => sub.EndReason == NatsSubEndReason.MaxMsgs); + () => ((NatsSubBase)sub).EndReason == NatsSubEndReason.MaxMsgs); Assert.Equal(maxMsgs, Volatile.Read(ref count)); diff --git a/tests/NATS.Client.Core.Tests/RequestReplyTest.cs b/tests/NATS.Client.Core.Tests/RequestReplyTest.cs index 8cba50fcb..29370f7d4 100644 --- a/tests/NATS.Client.Core.Tests/RequestReplyTest.cs +++ b/tests/NATS.Client.Core.Tests/RequestReplyTest.cs @@ -118,7 +118,7 @@ public async Task Request_reply_many_test_overall_timeout() } Assert.Equal(2, count); - Assert.Equal(NatsSubEndReason.Timeout, rep.EndReason); + Assert.Equal(NatsSubEndReason.Timeout, ((NatsSubBase)rep).EndReason); await sub.DisposeAsync(); await reg; @@ -152,7 +152,7 @@ public async Task Request_reply_many_test_idle_timeout() } Assert.Equal(2, count); - Assert.Equal(NatsSubEndReason.IdleTimeout, rep.EndReason); + Assert.Equal(NatsSubEndReason.IdleTimeout, ((NatsSubBase)rep).EndReason); await sub.DisposeAsync(); await reg; @@ -182,7 +182,7 @@ public async Task Request_reply_many_test_start_up_timeout() } Assert.Equal(0, count); - Assert.Equal(NatsSubEndReason.StartUpTimeout, rep.EndReason); + Assert.Equal(NatsSubEndReason.StartUpTimeout, ((NatsSubBase)rep).EndReason); await sub.DisposeAsync(); await reg; @@ -215,7 +215,7 @@ public async Task Request_reply_many_test_max_count() } Assert.Equal(2, count); - Assert.Equal(NatsSubEndReason.MaxMsgs, rep.EndReason); + Assert.Equal(NatsSubEndReason.MaxMsgs, ((NatsSubBase)rep).EndReason); await sub.DisposeAsync(); await reg; @@ -263,22 +263,24 @@ static string ToStr(ReadOnlyMemory input) return Encoding.ASCII.GetString(input.Span); } + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await using var server = NatsServer.Start(); await using var nats = server.CreateClientConnection(); - await using var sub = await nats.SubscribeAsync("foo"); + await using var sub = await nats.SubscribeAsync("foo", cancellationToken: cts.Token); var reg = sub.Register(async m => { if (ToStr(m.Data) == "1") { - await m.ReplyAsync(payload: ToSeq("qw")); - await m.ReplyAsync(payload: ToSeq("er")); - await m.ReplyAsync(payload: ToSeq("ty")); - await m.ReplyAsync(payload: default); // sentinel + await m.ReplyAsync(payload: ToSeq("qw"), cancellationToken: cts.Token); + await m.ReplyAsync(payload: ToSeq("er"), cancellationToken: cts.Token); + await m.ReplyAsync(payload: ToSeq("ty"), cancellationToken: cts.Token); + await m.ReplyAsync(payload: default, cancellationToken: cts.Token); // sentinel } }); var writer = new ArrayBufferWriter(); - await foreach (var msg in nats.RequestManyAsync("foo", ToSeq("1"))) + await foreach (var msg in nats.RequestManyAsync("foo", ToSeq("1"), cancellationToken: cts.Token)) { writer.Write(msg.Data.Span); } diff --git a/tests/NATS.Client.Core.Tests/SubscriptionTest.cs b/tests/NATS.Client.Core.Tests/SubscriptionTest.cs index bbababd89..6922fbeff 100644 --- a/tests/NATS.Client.Core.Tests/SubscriptionTest.cs +++ b/tests/NATS.Client.Core.Tests/SubscriptionTest.cs @@ -114,7 +114,7 @@ public async Task Auto_unsubscribe_test() } Assert.Equal(maxMsgs, count); - Assert.Equal(NatsSubEndReason.MaxMsgs, sub.EndReason); + Assert.Equal(NatsSubEndReason.MaxMsgs, ((NatsSubBase)sub).EndReason); } // Auto unsubscribe on timeout @@ -132,7 +132,7 @@ public async Task Auto_unsubscribe_test() count++; } - Assert.Equal(NatsSubEndReason.Timeout, sub.EndReason); + Assert.Equal(NatsSubEndReason.Timeout, ((NatsSubBase)sub).EndReason); Assert.Equal(0, count); } @@ -160,7 +160,7 @@ public async Task Auto_unsubscribe_test() count++; } - Assert.Equal(NatsSubEndReason.IdleTimeout, sub.EndReason); + Assert.Equal(NatsSubEndReason.IdleTimeout, ((NatsSubBase)sub).EndReason); Assert.Equal(4, count); } @@ -186,7 +186,7 @@ public async Task Auto_unsubscribe_test() } Assert.Equal(0, count); - Assert.Equal(NatsSubEndReason.None, sub.EndReason); + Assert.Equal(NatsSubEndReason.None, ((NatsSubBase)sub).EndReason); } // Auto unsubscribe on max messages with Inbox Subscription @@ -212,7 +212,7 @@ public async Task Auto_unsubscribe_test() } Assert.Equal(1, count1); - Assert.Equal(NatsSubEndReason.MaxMsgs, sub1.EndReason); + Assert.Equal(NatsSubEndReason.MaxMsgs, ((NatsSubBase)sub1).EndReason); var count2 = 0; await foreach (var natsMsg in sub2.Msgs.ReadAllAsync(cancellationToken)) @@ -222,7 +222,7 @@ public async Task Auto_unsubscribe_test() } Assert.Equal(2, count2); - Assert.Equal(NatsSubEndReason.MaxMsgs, sub2.EndReason); + Assert.Equal(NatsSubEndReason.MaxMsgs, ((NatsSubBase)sub2).EndReason); } } } diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs index 7c222e2e3..16f175728 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs @@ -11,7 +11,7 @@ public class ConsumerConsumeTest [Fact] public async Task Consume_test() { - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); await using var server = NatsServer.Start( outputHelper: _output, options: new NatsServerOptionsBuilder() diff --git a/tests/NATS.Client.TestUtilities/Utils.cs b/tests/NATS.Client.TestUtilities/Utils.cs index 7387bb9a0..517349f22 100644 --- a/tests/NATS.Client.TestUtilities/Utils.cs +++ b/tests/NATS.Client.TestUtilities/Utils.cs @@ -47,7 +47,7 @@ public static void WaitForTcpPortToClose(int port) public static class NatsMsgTestUtils { - public static Task Register(this NatsSub? sub, Action> action) + public static Task Register(this INatsSub? sub, Action> action) { if (sub == null) return Task.CompletedTask; @@ -60,7 +60,7 @@ public static Task Register(this NatsSub? sub, Action> action) }); } - public static Task Register(this NatsSub? sub, Func, Task> action) + public static Task Register(this INatsSub? sub, Func, Task> action) { if (sub == null) return Task.CompletedTask; @@ -73,7 +73,7 @@ public static Task Register(this NatsSub? sub, Func, Task> act }); } - public static Task Register(this NatsSub? sub, Action action) + public static Task Register(this INatsSub? sub, Action action) { if (sub == null) return Task.CompletedTask; @@ -86,7 +86,7 @@ public static Task Register(this NatsSub? sub, Action action) }); } - public static Task Register(this NatsSub? sub, Func action) + public static Task Register(this INatsSub? sub, Func action) { if (sub == null) return Task.CompletedTask;