From 33538ddf09d542236633973296dc4b1f2eb450c1 Mon Sep 17 00:00:00 2001 From: Andrew El Date: Mon, 17 Jul 2023 20:46:04 -0400 Subject: [PATCH] Allow Configuration of Subscription Channels --- src/NATS.Client.Core/NatsSub.cs | 75 +++++++++++++++------- src/NATS.Client.Core/NatsSubChannelOpts.cs | 20 ++++++ src/NATS.Client.Core/NatsSubOpts.cs | 7 ++ 3 files changed, 80 insertions(+), 22 deletions(-) create mode 100644 src/NATS.Client.Core/NatsSubChannelOpts.cs diff --git a/src/NATS.Client.Core/NatsSub.cs b/src/NATS.Client.Core/NatsSub.cs index fe7c0ad16..bec82fe33 100644 --- a/src/NATS.Client.Core/NatsSub.cs +++ b/src/NATS.Client.Core/NatsSub.cs @@ -104,6 +104,8 @@ internal NatsSubBase( // since INatsSub is marked as internal. int? INatsSub.PendingMsgs => _pendingMsgs == -1 ? null : Volatile.Read(ref _pendingMsgs); + public Exception? Exception => Volatile.Read(ref _exception); + internal NatsSubEndReason EndReason => (NatsSubEndReason)Volatile.Read(ref _endReasonRaw); protected NatsConnection Connection { get; } @@ -172,8 +174,6 @@ ValueTask INatsSub.ReceiveAsync( protected abstract ValueTask ReceiveInternalAsync(string subject, string? replyTo, ReadOnlySequence? headersBuffer, ReadOnlySequence payloadBuffer); - public Exception? Exception => Volatile.Read(ref _exception); - protected void SetException(Exception exception) { Interlocked.Exchange(ref _exception, exception); @@ -224,21 +224,47 @@ private void EndSubscription(NatsSubEndReason reason) public sealed class NatsSub : NatsSubBase { - private readonly Channel _msgs = Channel.CreateBounded(new BoundedChannelOptions(1_000) - { - FullMode = BoundedChannelFullMode.Wait, - SingleWriter = true, - SingleReader = false, - AllowSynchronousContinuations = false, - }); + private static readonly BoundedChannelOptions DefaultChannelOptions = + new BoundedChannelOptions(1_000) + { + FullMode = BoundedChannelFullMode.Wait, + SingleWriter = true, + SingleReader = false, + AllowSynchronousContinuations = false, + }; + + private readonly Channel _msgs; internal NatsSub(NatsConnection connection, ISubscriptionManager manager, string subject, NatsSubOpts? opts, CancellationToken cancellationToken = default) - : base(connection, manager, subject, opts, cancellationToken) - { - } + : base(connection, manager, subject, opts, cancellationToken) => + _msgs = Channel.CreateBounded( + GetChannelOptions(opts?.ChannelOptions)); public ChannelReader Msgs => _msgs.Reader; + internal static BoundedChannelOptions GetChannelOptions( + NatsSubChannelOpts? subChannelOpts) + { + if (subChannelOpts != null) + { + var overrideOpts = subChannelOpts.Value; + return new BoundedChannelOptions(overrideOpts.Capacity ?? + DefaultChannelOptions.Capacity) + { + AllowSynchronousContinuations = + DefaultChannelOptions.AllowSynchronousContinuations, + FullMode = + overrideOpts.FullMode ?? DefaultChannelOptions.FullMode, + SingleWriter = DefaultChannelOptions.SingleWriter, + SingleReader = DefaultChannelOptions.SingleReader, + }; + } + else + { + return DefaultChannelOptions; + } + } + protected override async ValueTask ReceiveInternalAsync(string subject, string? replyTo, ReadOnlySequence? headersBuffer, ReadOnlySequence payloadBuffer) { ResetIdleTimeout(); @@ -261,17 +287,22 @@ protected override async ValueTask ReceiveInternalAsync(string subject, string? public sealed class NatsSub : NatsSubBase { - private readonly Channel> _msgs = Channel.CreateBounded>( - new BoundedChannelOptions(capacity: 1_000) - { - FullMode = BoundedChannelFullMode.Wait, - SingleWriter = true, - SingleReader = false, - AllowSynchronousContinuations = false, - }); + private readonly Channel> _msgs; - internal NatsSub(NatsConnection connection, ISubscriptionManager manager, string subject, NatsSubOpts? opts, INatsSerializer serializer, CancellationToken cancellationToken = default) - : base(connection, manager, subject, opts, cancellationToken) => Serializer = serializer; + internal NatsSub( + NatsConnection connection, + ISubscriptionManager manager, + string subject, + NatsSubOpts? opts, + INatsSerializer serializer, + CancellationToken cancellationToken = default) + : base(connection, manager, subject, opts, cancellationToken) + { + _msgs = Channel.CreateBounded>( + NatsSub.GetChannelOptions(opts?.ChannelOptions)); + + Serializer = serializer; + } public ChannelReader> Msgs => _msgs.Reader; diff --git a/src/NATS.Client.Core/NatsSubChannelOpts.cs b/src/NATS.Client.Core/NatsSubChannelOpts.cs new file mode 100644 index 000000000..9c8f50229 --- /dev/null +++ b/src/NATS.Client.Core/NatsSubChannelOpts.cs @@ -0,0 +1,20 @@ +using System.Threading.Channels; + +namespace NATS.Client.Core; + +/// +/// Options For setting the FullMode and Capacity for a the created for Subscriptions +/// +public readonly record struct NatsSubChannelOpts +{ + /// + /// The Behavior of the Subscription's Channel when the Capacity has been reached. + /// By default, the behavior is + /// + public BoundedChannelFullMode? FullMode { get; init; } + + /// + /// The Maximum Capacity for the channel. If not specified, a default of 1000 is used. + /// + public int? Capacity { get; init; } +} diff --git a/src/NATS.Client.Core/NatsSubOpts.cs b/src/NATS.Client.Core/NatsSubOpts.cs index 0b7e92454..649718901 100644 --- a/src/NATS.Client.Core/NatsSubOpts.cs +++ b/src/NATS.Client.Core/NatsSubOpts.cs @@ -1,3 +1,5 @@ +using System.Threading.Channels; + namespace NATS.Client.Core; public readonly record struct NatsSubOpts @@ -65,4 +67,9 @@ public readonly record struct NatsSubOpts /// server subscription request. /// public bool? CanBeCancelled { get; init; } + + /// + /// Allows Configuration of options for a subscription + /// + public NatsSubChannelOpts? ChannelOptions { get; init; } }