Skip to content

Commit

Permalink
Add other client extensions (#637)
Browse files Browse the repository at this point in the history
* Add client extension methods and refactor interfaces

Refactor to use interfaces for enhanced flexibility and testability across various components like NatsJSContext and NatsConnection. Added new extension methods to easily create contexts for Object Store, Key-Value Store, and Services on NATS client and connection instances.

* Add public `Context` properties to store interfaces

Updated INatsObjStore, INatsKVContext, and INatsSvcContext interfaces to include public `Context` properties. This change ensures consistent access to the underlying context objects across various components.

* dotnet format

* Refactor context creation to use JetStream directly

* Make extensions namespace NATS.Net

* Removed debug print

* Rename to JetStreamContext

* Fix inheritdoc

* Fix build warnings

* Fix build warnings and add test

* Fix test
  • Loading branch information
mtmk authored Oct 7, 2024
1 parent d5f28f7 commit 3083db5
Show file tree
Hide file tree
Showing 28 changed files with 321 additions and 286 deletions.
19 changes: 19 additions & 0 deletions sandbox/Example.Client/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

using System.Text;
using NATS.Client.JetStream;
using NATS.Client.KeyValueStore;
using NATS.Client.ObjectStore;
using NATS.Client.Services;
using NATS.Net;

CancellationTokenSource cts = new();
Expand Down Expand Up @@ -95,6 +98,22 @@
Console.WriteLine($"JetStream Stream: {stream.Info.Config.Name}");
}

// Use KeyValueStore by referencing NATS.Client.KeyValueStore package
var kv1 = client.CreateKeyValueStoreContext();
var kv2 = js.CreateKeyValueStoreContext();
await kv1.CreateStoreAsync("store1");
await kv2.CreateStoreAsync("store1");

// Use ObjectStore by referencing NATS.Client.ObjectStore package
var obj1 = client.CreateObjectStoreContext();
var obj2 = js.CreateObjectStoreContext();
await obj1.CreateObjectStoreAsync("store1");
await obj2.CreateObjectStoreAsync("store1");

// Use Services by referencing NATS.Client.Services package
var svc = client.CreateServicesContext();
await svc.AddServiceAsync("service1", "1.0.0");

await cts.CancelAsync();

await Task.WhenAll(tasks);
Expand Down
27 changes: 27 additions & 0 deletions src/NATS.Client.JetStream/INatsJSContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ public interface INatsJSContext
/// </summary>
INatsConnection Connection { get; }

/// <summary>
/// Provides configuration options for the JetStream context.
/// </summary>
NatsJSOpts Opts { get; }

/// <summary>
/// Creates new ordered consumer.
/// </summary>
Expand Down Expand Up @@ -296,4 +301,26 @@ ValueTask<NatsJSPublishConcurrentFuture> PublishConcurrentAsync<T>(
NatsJSPubOpts? opts = default,
NatsHeaders? headers = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Generates a new base inbox string using the connection's inbox prefix.
/// </summary>
/// <returns>A new inbox string.</returns>
string NewBaseInbox();

/// <summary>
/// Sends a request message to a JetStream subject and waits for a response.
/// </summary>
/// <param name="subject">The JetStream API subject to send the request to.</param>
/// <param name="request">The request message object.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <typeparam name="TRequest">The type of the request message.</typeparam>
/// <typeparam name="TResponse">The type of the response message.</typeparam>
/// <returns>A task representing the asynchronous operation, with a result of type <typeparamref name="TResponse"/>.</returns>
ValueTask<TResponse> JSRequestResponseAsync<TRequest, TResponse>(
string subject,
TRequest? request,
CancellationToken cancellationToken = default)
where TRequest : class
where TResponse : class;
}
10 changes: 5 additions & 5 deletions src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ internal class NatsJSOrderedPushConsumer<T>
{
private readonly ILogger _logger;
private readonly bool _debug;
private readonly NatsJSContext _context;
private readonly INatsJSContext _context;
private readonly string _stream;
private readonly string _filter;
private readonly INatsDeserialize<T> _serializer;
Expand All @@ -68,7 +68,7 @@ internal class NatsJSOrderedPushConsumer<T>
private int _done;

public NatsJSOrderedPushConsumer(
NatsJSContext context,
INatsJSContext context,
string stream,
string filter,
INatsDeserialize<T> serializer,
Expand Down Expand Up @@ -417,23 +417,23 @@ private void CreateSub(string origin)

internal class NatsJSOrderedPushConsumerSub<T> : NatsSubBase
{
private readonly NatsJSContext _context;
private readonly INatsJSContext _context;
private readonly CancellationToken _cancellationToken;
private readonly INatsConnection _nats;
private readonly NatsHeaderParser _headerParser;
private readonly INatsDeserialize<T> _serializer;
private readonly ChannelWriter<NatsJSOrderedPushConsumerMsg<T>> _commands;

public NatsJSOrderedPushConsumerSub(
NatsJSContext context,
INatsJSContext context,
Channel<NatsJSOrderedPushConsumerMsg<T>> commandChannel,
INatsDeserialize<T> serializer,
NatsSubOpts? opts,
CancellationToken cancellationToken)
: base(
connection: context.Connection,
manager: context.Connection.SubscriptionManager,
subject: context.NewInbox(),
subject: context.NewBaseInbox(),
queueGroup: default,
opts)
{
Expand Down
14 changes: 13 additions & 1 deletion src/NATS.Client.JetStream/NatsClientExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
using NATS.Client.Core;
using NATS.Client.JetStream;

namespace NATS.Client.JetStream;
// ReSharper disable once CheckNamespace
namespace NATS.Net;

public static class NatsClientExtensions
{
/// <summary>
/// Creates a JetStream context using the provided NATS client.
/// </summary>
/// <param name="client">The NATS client used to create the JetStream context.</param>
/// <returns>Returns an instance of <see cref="INatsJSContext"/> for interacting with JetStream.</returns>
public static INatsJSContext CreateJetStreamContext(this INatsClient client)
=> CreateJetStreamContext(client.Connection);

/// <summary>
/// Creates a JetStream context using the provided NATS connection.
/// </summary>
/// <param name="connection">The NATS connection used to create the JetStream context.</param>
/// <returns>Returns an instance of <see cref="INatsJSContext"/> for interacting with JetStream.</returns>
public static INatsJSContext CreateJetStreamContext(this INatsConnection connection)
=> new NatsJSContext(connection);
}
6 changes: 3 additions & 3 deletions src/NATS.Client.JetStream/NatsJSConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ internal async ValueTask<NatsJSConsume<T>> ConsumeInternalAsync<T>(INatsDeserial

opts ??= new NatsJSConsumeOpts();
serializer ??= _context.Connection.Opts.SerializerRegistry.GetDeserializer<T>();
var inbox = _context.NewInbox();
var inbox = _context.NewBaseInbox();

var max = NatsJSOptsDefaults.SetMax(opts.MaxMsgs, opts.MaxBytes, opts.ThresholdMsgs, opts.ThresholdBytes);
var timeouts = NatsJSOptsDefaults.SetTimeouts(opts.Expires, opts.IdleHeartbeat);
Expand Down Expand Up @@ -332,7 +332,7 @@ internal async ValueTask<NatsJSOrderedConsume<T>> OrderedConsumeInternalAsync<T>
ThrowIfDeleted();

serializer ??= _context.Connection.Opts.SerializerRegistry.GetDeserializer<T>();
var inbox = _context.NewInbox();
var inbox = _context.NewBaseInbox();

var max = NatsJSOptsDefaults.SetMax(opts.MaxMsgs, opts.MaxBytes, opts.ThresholdMsgs, opts.ThresholdBytes);
var timeouts = NatsJSOptsDefaults.SetTimeouts(opts.Expires, opts.IdleHeartbeat);
Expand Down Expand Up @@ -382,7 +382,7 @@ internal async ValueTask<NatsJSFetch<T>> FetchInternalAsync<T>(
ThrowIfDeleted();
serializer ??= _context.Connection.Opts.SerializerRegistry.GetDeserializer<T>();

var inbox = _context.NewInbox();
var inbox = _context.NewBaseInbox();

var max = NatsJSOptsDefaults.SetMax(opts.MaxMsgs, opts.MaxBytes);
var timeouts = NatsJSOptsDefaults.SetTimeouts(opts.Expires, opts.IdleHeartbeat);
Expand Down
33 changes: 18 additions & 15 deletions src/NATS.Client.JetStream/NatsJSContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public NatsJSContext(INatsConnection connection, NatsJSOpts opts)

public INatsConnection Connection { get; }

internal NatsJSOpts Opts { get; }
/// <inheritdoc />
public NatsJSOpts Opts { get; }

/// <summary>
/// Calls JetStream Account Info API.
Expand Down Expand Up @@ -238,6 +239,22 @@ public async ValueTask<NatsJSPublishConcurrentFuture> PublishConcurrentAsync<T>(
return new NatsJSPublishConcurrentFuture(sub);
}

/// <inheritdoc />
public string NewBaseInbox() => NatsConnection.NewInbox(Connection.Opts.InboxPrefix);

/// <inheritdoc />
public async ValueTask<TResponse> JSRequestResponseAsync<TRequest, TResponse>(
string subject,
TRequest? request,
CancellationToken cancellationToken = default)
where TRequest : class
where TResponse : class
{
var response = await JSRequestAsync<TRequest, TResponse>(subject, request, cancellationToken);
response.EnsureSuccess();
return response.Response!;
}

internal static void ThrowIfInvalidStreamName([NotNull] string? name, [CallerArgumentExpression("name")] string? paramName = null)
{
#if NETSTANDARD
Expand All @@ -262,20 +279,6 @@ internal static void ThrowIfInvalidStreamName([NotNull] string? name, [CallerArg
}
}

internal string NewInbox() => NatsConnection.NewInbox(Connection.Opts.InboxPrefix);

internal async ValueTask<TResponse> JSRequestResponseAsync<TRequest, TResponse>(
string subject,
TRequest? request,
CancellationToken cancellationToken = default)
where TRequest : class
where TResponse : class
{
var response = await JSRequestAsync<TRequest, TResponse>(subject, request, cancellationToken);
response.EnsureSuccess();
return response.Response!;
}

internal async ValueTask<NatsJSResponse<TResponse>> JSRequestAsync<TRequest, TResponse>(
string subject,
TRequest? request,
Expand Down
4 changes: 2 additions & 2 deletions src/NATS.Client.JetStream/NatsJSMsg.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ public interface INatsJSMsg<out T>
/// <typeparam name="T">User message type</typeparam>
public readonly struct NatsJSMsg<T> : INatsJSMsg<T>
{
private readonly NatsJSContext _context;
private readonly INatsJSContext _context;
private readonly NatsMsg<T> _msg;
private readonly Lazy<NatsJSMsgMetadata?> _replyToDateTimeAndSeq;

public NatsJSMsg(NatsMsg<T> msg, NatsJSContext context)
public NatsJSMsg(NatsMsg<T> msg, INatsJSContext context)
{
_msg = msg;
_context = context;
Expand Down
5 changes: 5 additions & 0 deletions src/NATS.Client.KeyValueStore/INatsKVContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ namespace NATS.Client.KeyValueStore;

public interface INatsKVContext
{
/// <summary>
/// Provides access to the JetStream context associated with the Key-Value Store operations.
/// </summary>
INatsJSContext JetStreamContext { get; }

/// <summary>
/// Create a new Key Value Store or get an existing one
/// </summary>
Expand Down
6 changes: 6 additions & 0 deletions src/NATS.Client.KeyValueStore/INatsKVStore.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
using NATS.Client.Core;
using NATS.Client.JetStream;

namespace NATS.Client.KeyValueStore;

public interface INatsKVStore
{
/// <summary>
/// Provides access to the JetStream context associated with the Object Store operations.
/// </summary>
INatsJSContext JetStreamContext { get; }

/// <summary>
/// Name of the Key Value Store bucket
/// </summary>
Expand Down
6 changes: 3 additions & 3 deletions src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,23 @@ namespace NATS.Client.KeyValueStore.Internal;

internal class NatsKVWatchSub<T> : NatsSubBase
{
private readonly NatsJSContext _context;
private readonly INatsJSContext _context;
private readonly CancellationToken _cancellationToken;
private readonly INatsConnection _nats;
private readonly NatsHeaderParser _headerParser;
private readonly INatsDeserialize<T> _serializer;
private readonly ChannelWriter<NatsKVWatchCommandMsg<T>> _commands;

public NatsKVWatchSub(
NatsJSContext context,
INatsJSContext context,
Channel<NatsKVWatchCommandMsg<T>> commandChannel,
INatsDeserialize<T> serializer,
NatsSubOpts? opts,
CancellationToken cancellationToken)
: base(
connection: context.Connection,
manager: context.Connection.SubscriptionManager,
subject: context.NewInbox(),
subject: context.NewBaseInbox(),
queueGroup: default,
opts)
{
Expand Down
4 changes: 2 additions & 2 deletions src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ internal sealed class NatsKVWatcher<T> : IAsyncDisposable
{
private readonly ILogger _logger;
private readonly bool _debug;
private readonly NatsJSContext _context;
private readonly INatsJSContext _context;
private readonly string _bucket;
private readonly INatsDeserialize<T> _serializer;
private readonly NatsKVWatchOpts _opts;
Expand All @@ -53,7 +53,7 @@ internal sealed class NatsKVWatcher<T> : IAsyncDisposable
private INatsJSConsumer? _initialConsumer;

public NatsKVWatcher(
NatsJSContext context,
INatsJSContext context,
string bucket,
IEnumerable<string> keys,
INatsDeserialize<T> serializer,
Expand Down
33 changes: 33 additions & 0 deletions src/NATS.Client.KeyValueStore/NatsClientExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.KeyValueStore;

// ReSharper disable once CheckNamespace
namespace NATS.Net;

public static class NatsClientExtensions
{
/// <summary>
/// Creates a NATS Key-Value Store context using the specified NATS client.
/// </summary>
/// <param name="client">The NATS client instance.</param>
/// <returns>An instance of <see cref="INatsKVContext"/> which can be used to interact with the Key-Value Store.</returns>
public static INatsKVContext CreateKeyValueStoreContext(this INatsClient client)
=> CreateKeyValueStoreContext(client.CreateJetStreamContext());

/// <summary>
/// Creates a NATS Key-Value Store context using the specified NATS connection.
/// </summary>
/// <param name="connection">The NATS connection instance.</param>
/// <returns>An instance of <see cref="INatsKVContext"/> which can be used to interact with the Key-Value Store.</returns>
public static INatsKVContext CreateKeyValueStoreContext(this INatsConnection connection)
=> CreateKeyValueStoreContext(connection.CreateJetStreamContext());

/// <summary>
/// Creates a NATS Key-Value Store context using the specified NATS JetStream context.
/// </summary>
/// <param name="context">The NATS JetStream context instance.</param>
/// <returns>An instance of <see cref="INatsKVContext"/> which can be used to interact with the Key-Value Store.</returns>
public static INatsKVContext CreateKeyValueStoreContext(this INatsJSContext context)
=> new NatsKVContext(context);
}
Loading

0 comments on commit 3083db5

Please sign in to comment.