Skip to content

Commit

Permalink
Options tidy-up (#128)
Browse files Browse the repository at this point in the history
* Options tidy-up

* Removed variable PubOpts Headers and ReplyTo
* Added headers and replyTo params to Publish methods
* Make all Opts classes (not structs)
* Added default options for JS context

* Removed useless queue-group from request-reply

* Swapped publish reply-to and header params

* Headers is more likely to be used than reply-to
* Added reference documentation for NatsMsg

* Removed unnecessary exception throw
  • Loading branch information
mtmk authored Sep 13, 2023
1 parent 295a636 commit a91da64
Show file tree
Hide file tree
Showing 33 changed files with 346 additions and 150 deletions.
3 changes: 1 addition & 2 deletions docs/documentation/queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ var replyTasks = new List<Task>();
for (int i = 0; i < 3; i++)
{
// Create three subscriptions all on the same queue group
var opts = new NatsSubOpts { QueueGroup = "maths-service" };
var sub = await nats.SubscribeAsync<int>("math.double", opts);
var sub = await nats.SubscribeAsync<int>("math.double", queueGroup: "maths-service");

subs.Add(sub);

Expand Down
2 changes: 1 addition & 1 deletion sandbox/Example.Core.PublishHeaders/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
await connection.PublishAsync<Bar>(
subject,
new Bar { Id = i, Name = "Baz" },
new NatsPubOpts { Headers = new NatsHeaders { ["XFoo"] = $"bar{i}" } });
headers: new NatsHeaders { ["XFoo"] = $"bar{i}" });
}

void Print(string message)
Expand Down
4 changes: 2 additions & 2 deletions sandbox/Example.Core.SubscribeQueueGroup/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
await using var connection1 = new NatsConnection(options);

Print($"[1][SUB] Subscribing to subject '{subject}'...\n");
var sub1 = await connection1.SubscribeAsync(subject, new NatsSubOpts { QueueGroup = $"My-Workers" });
var sub1 = await connection1.SubscribeAsync(subject, queueGroup: "My-Workers");
var task1 = Task.Run(async () =>
{
await foreach (var msg in sub1.Msgs.ReadAllAsync())
Expand All @@ -28,7 +28,7 @@
await using var connection2 = new NatsConnection(options);

Print($"[2][SUB] Subscribing to subject '{subject}'...\n");
var sub2 = await connection2.SubscribeAsync(subject, new NatsSubOpts { QueueGroup = $"My-Workers" });
var sub2 = await connection2.SubscribeAsync(subject, queueGroup: "My-Workers");
var task2 = Task.Run(async () =>
{
await foreach (var msg in sub2.Msgs.ReadAllAsync())
Expand Down
39 changes: 31 additions & 8 deletions src/NATS.Client.Core/INatsConnection.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Buffers;
using System.Runtime.CompilerServices;

namespace NATS.Client.Core;

Expand All @@ -17,10 +16,12 @@ public interface INatsConnection
/// </summary>
/// <param name="subject">The destination subject to publish to.</param>
/// <param name="payload">The message payload data.</param>
/// <param name="headers">Optional message headers.</param>
/// <param name="replyTo">Optional reply-to subject.</param>
/// <param name="opts">A <see cref="NatsPubOpts"/> for publishing options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask PublishAsync(string subject, ReadOnlySequence<byte> payload = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);
ValueTask PublishAsync(string subject, ReadOnlySequence<byte> payload = default, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Publishes the message payload to the given subject name, optionally supplying a reply subject.
Expand All @@ -35,41 +36,55 @@ public interface INatsConnection
/// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject.
/// </summary>
/// <param name="subject">The destination subject to publish to.</param>
/// <param name="data">Serializable data object</param>
/// <param name="data">Serializable data object.</param>
/// <param name="headers">Optional message headers.</param>
/// <param name="replyTo">Optional reply-to subject.</param>
/// <param name="opts">A <see cref="NatsPubOpts"/> for publishing options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <typeparam name="T">Specifies the type of data that may be send to the NATS Server.</typeparam>
/// <typeparam name="T">Specifies the type of data that may be sent to the NATS Server.</typeparam>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask PublishAsync<T>(string subject, T data, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);
ValueTask PublishAsync<T>(string subject, T data, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject.
/// </summary>
/// <param name="msg">A <see cref="NatsMsg{T}"/> representing message details.</param>
/// <param name="opts">A <see cref="NatsPubOpts"/> for publishing options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <typeparam name="T">Specifies the type of data that may be send to the NATS Server.</typeparam>
/// <typeparam name="T">Specifies the type of data that may be sent to the NATS Server.</typeparam>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask PublishAsync<T>(in NatsMsg<T> msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Initiates a subscription to a subject, optionally joining a distributed queue group.
/// </summary>
/// <param name="subject">The subject name to subscribe to.</param>
/// <param name="queueGroup">If specified, the subscriber will join this queue group.</param>
/// <param name="opts">A <see cref="NatsSubOpts"/> for subscription options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <returns>A <see cref="ValueTask{TResult}"/> that represents the asynchronous send operation.</returns>
ValueTask<INatsSub> SubscribeAsync(string subject, NatsSubOpts? opts = default, CancellationToken cancellationToken = default);
/// <remarks>
/// Subscribers with the same queue group name, become a queue group,
/// and only one randomly chosen subscriber of the queue group will
/// consume a message each time a message is received by the queue group.
/// </remarks>
ValueTask<INatsSub> SubscribeAsync(string subject, string? queueGroup = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Initiates a subscription to a subject, optionally joining a distributed queue group.
/// </summary>
/// <param name="subject">The subject name to subscribe to.</param>
/// <param name="queueGroup">If specified, the subscriber will join this queue group.</param>
/// <param name="opts">A <see cref="NatsSubOpts"/> for subscription options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <typeparam name="T">Specifies the type of data that may be received from the NATS Server.</typeparam>
/// <returns>A <see cref="ValueTask{TResult}"/> that represents the asynchronous send operation.</returns>
ValueTask<INatsSub<T>> SubscribeAsync<T>(string subject, NatsSubOpts? opts = default, CancellationToken cancellationToken = default);
/// <remarks>
/// Subscribers with the same queue group name, become a queue group,
/// and only one randomly chosen subscriber of the queue group will
/// consume a message each time a message is received by the queue group.
/// </remarks>
ValueTask<INatsSub<T>> SubscribeAsync<T>(string subject, string? queueGroup = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Create a new inbox subject with the form {Inbox Prefix}.{Unique Connection ID}.{Unique Inbox ID}
Expand All @@ -82,6 +97,7 @@ public interface INatsConnection
/// </summary>
/// <param name="subject">Subject of the responder</param>
/// <param name="data">Data to send to responder</param>
/// <param name="headers">Optional message headers</param>
/// <param name="requestOpts">Request publish options</param>
/// <param name="replyOpts">Reply handler subscription options</param>
/// <param name="cancellationToken">Cancel this request</param>
Expand All @@ -97,6 +113,7 @@ public interface INatsConnection
ValueTask<NatsMsg<TReply?>?> RequestAsync<TRequest, TReply>(
string subject,
TRequest? data,
NatsHeaders? headers = default,
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);
Expand All @@ -106,6 +123,7 @@ public interface INatsConnection
/// </summary>
/// <param name="subject">Subject of the responder</param>
/// <param name="payload">Payload to send to responder</param>
/// <param name="headers">Optional message headers</param>
/// <param name="requestOpts">Request publish options</param>
/// <param name="replyOpts">Reply handler subscription options</param>
/// <param name="cancellationToken">Cancel this request</param>
Expand All @@ -119,6 +137,7 @@ public interface INatsConnection
ValueTask<NatsMsg?> RequestAsync(
string subject,
ReadOnlySequence<byte> payload = default,
NatsHeaders? headers = default,
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);
Expand All @@ -128,6 +147,7 @@ public interface INatsConnection
/// </summary>
/// <param name="subject">Subject of the responder</param>
/// <param name="data">Data to send to responder</param>
/// <param name="headers">Optional message headers</param>
/// <param name="requestOpts">Request publish options</param>
/// <param name="replyOpts">Reply handler subscription options</param>
/// <param name="cancellationToken">Cancel this request</param>
Expand All @@ -141,6 +161,7 @@ public interface INatsConnection
IAsyncEnumerable<NatsMsg<TReply?>> RequestManyAsync<TRequest, TReply>(
string subject,
TRequest? data,
NatsHeaders? headers = default,
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);
Expand All @@ -150,6 +171,7 @@ public interface INatsConnection
/// </summary>
/// <param name="subject">Subject of the responder</param>
/// <param name="payload">Payload to send to responder</param>
/// <param name="headers">Optional message headers</param>
/// <param name="requestOpts">Request publish options</param>
/// <param name="replyOpts">Reply handler subscription options</param>
/// <param name="cancellationToken">Cancel this request</param>
Expand All @@ -161,6 +183,7 @@ public interface INatsConnection
IAsyncEnumerable<NatsMsg> RequestManyAsync(
string subject,
ReadOnlySequence<byte> payload = default,
NatsHeaders? headers = default,
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/Internal/InboxSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public InboxSub(
NatsSubOpts? opts,
NatsConnection connection,
ISubscriptionManager manager)
: base(connection, manager, subject, opts)
: base(connection, manager, subject, queueGroup: default, opts)
{
_inbox = inbox;
_connection = connection;
Expand Down
9 changes: 5 additions & 4 deletions src/NATS.Client.Core/Internal/SubscriptionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ public SubscriptionManager(NatsConnection connection, string inboxPrefix)

internal InboxSubBuilder InboxSubBuilder { get; }

public async ValueTask SubscribeAsync(string subject, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken)
public async ValueTask SubscribeAsync(string subject, string? queueGroup, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken)
{
if (IsInboxSubject(subject))
{
await SubscribeInboxAsync(subject, opts, sub, cancellationToken).ConfigureAwait(false);
}
else
{
await SubscribeInternalAsync(subject, opts, sub, cancellationToken).ConfigureAwait(false);
await SubscribeInternalAsync(subject, queueGroup, opts, sub, cancellationToken).ConfigureAwait(false);
}
}

Expand Down Expand Up @@ -179,6 +179,7 @@ private async ValueTask SubscribeInboxAsync(string subject, NatsSubOpts? opts, N
_inboxSub = InboxSubBuilder.Build(subject, opts, _connection, manager: this);
await SubscribeInternalAsync(
inboxSubject,
queueGroup: default,
opts: default,
_inboxSub,
cancellationToken).ConfigureAwait(false);
Expand All @@ -193,7 +194,7 @@ await SubscribeInternalAsync(
await InboxSubBuilder.RegisterAsync(sub).ConfigureAwait(false);
}

private async ValueTask SubscribeInternalAsync(string subject, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken)
private async ValueTask SubscribeInternalAsync(string subject, string? queueGroup, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken)
{
var sid = GetNextSid();
lock (_gate)
Expand All @@ -204,7 +205,7 @@ private async ValueTask SubscribeInternalAsync(string subject, NatsSubOpts? opts

try
{
await _connection.SubscribeCoreAsync(sid, subject, opts?.QueueGroup, opts?.MaxMsgs, cancellationToken)
await _connection.SubscribeCoreAsync(sid, subject, queueGroup, opts?.MaxMsgs, cancellationToken)
.ConfigureAwait(false);
await sub.ReadyAsync().ConfigureAwait(false);
}
Expand Down
9 changes: 4 additions & 5 deletions src/NATS.Client.Core/NatsConnection.LowLevelApi.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System.Buffers;
using NATS.Client.Core.Commands;
using NATS.Client.Core.Internal;

namespace NATS.Client.Core;

Expand Down Expand Up @@ -100,17 +99,17 @@ internal ValueTask PubModelAsync<T>(string subject, T? data, INatsSerializer ser
}
}

internal ValueTask SubAsync(string subject, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken = default)
internal ValueTask SubAsync(string subject, string? queueGroup, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken = default)
{
if (ConnectionState == NatsConnectionState.Open)
{
return SubscriptionManager.SubscribeAsync(subject, opts, sub, cancellationToken);
return SubscriptionManager.SubscribeAsync(subject, queueGroup, opts, sub, cancellationToken);
}
else
{
return WithConnectAsync(subject, opts, sub, cancellationToken, static (self, s, o, b, token) =>
return WithConnectAsync(subject, queueGroup, opts, sub, cancellationToken, static (self, s, q, o, b, token) =>
{
return self.SubscriptionManager.SubscribeAsync(s, o, b, token);
return self.SubscriptionManager.SubscribeAsync(s, q, o, b, token);
});
}
}
Expand Down
16 changes: 8 additions & 8 deletions src/NATS.Client.Core/NatsConnection.Publish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,41 @@ namespace NATS.Client.Core;
public partial class NatsConnection
{
/// <inheritdoc />
public ValueTask PublishAsync(string subject, ReadOnlySequence<byte> payload = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
public ValueTask PublishAsync(string subject, ReadOnlySequence<byte> payload = default, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
{
if (opts?.WaitUntilSent ?? false)
{
return PubAsync(subject, opts?.ReplyTo, payload, opts?.Headers, cancellationToken);
return PubAsync(subject, replyTo, payload, headers, cancellationToken);
}
else
{
return PubPostAsync(subject, opts?.ReplyTo, payload, opts?.Headers, cancellationToken);
return PubPostAsync(subject, replyTo, payload, headers, cancellationToken);
}
}

/// <inheritdoc />
public ValueTask PublishAsync(in NatsMsg msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
{
return PublishAsync(msg.Subject, msg.Data, opts, cancellationToken);
return PublishAsync(msg.Subject, msg.Data, msg.Headers, msg.ReplyTo, opts, cancellationToken);
}

/// <inheritdoc />
public ValueTask PublishAsync<T>(string subject, T? data, NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
public ValueTask PublishAsync<T>(string subject, T? data, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
{
var serializer = opts?.Serializer ?? Opts.Serializer;
if (opts?.WaitUntilSent ?? false)
{
return PubModelAsync<T>(subject, data, serializer, opts?.ReplyTo, opts?.Headers, cancellationToken);
return PubModelAsync<T>(subject, data, serializer, replyTo, headers, cancellationToken);
}
else
{
return PubModelPostAsync<T>(subject, data, serializer, opts?.ReplyTo, opts?.Headers, cancellationToken);
return PubModelPostAsync<T>(subject, data, serializer, replyTo, headers, cancellationToken);
}
}

/// <inheritdoc />
public ValueTask PublishAsync<T>(in NatsMsg<T> msg, NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
{
return PublishAsync<T>(msg.Subject, msg.Data, opts, cancellationToken);
return PublishAsync<T>(msg.Subject, msg.Data, msg.Headers, msg.ReplyTo, opts, cancellationToken);
}
}
Loading

0 comments on commit a91da64

Please sign in to comment.