Skip to content

Commit

Permalink
Add NATS client implementation (#589)
Browse files Browse the repository at this point in the history
* Add NATS client implementation

Added the initial implementation of the NATS client, including publish,
subscribe, and request/reply functionalities. NATS.Client package is
aimed at basic applications and samples or POCs, where especially serialization
setup and other params are set to a sensible defaults.

* dotnet format
  • Loading branch information
mtmk authored Aug 2, 2024
1 parent a163445 commit 1a80606
Show file tree
Hide file tree
Showing 20 changed files with 731 additions and 117 deletions.
21 changes: 21 additions & 0 deletions NATS.Client.sln
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Net.OpenTelemetry.Test
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Platform.Windows.Tests", "tests\NATS.Client.Platform.Windows.Tests\NATS.Client.Platform.Windows.Tests.csproj", "{A37994CC-A23A-415E-8B61-9468C7178A55}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client", "src\NATS.Client\NATS.Client.csproj", "{48F1F736-3D87-4453-B497-BD9C203B2385}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.Client", "sandbox\Example.Client\Example.Client.csproj", "{A15CCDD5-B707-4142-B99A-64F0AB62318A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Tests", "tests\NATS.Client.Tests\NATS.Client.Tests.csproj", "{6DAAAA87-8DDF-4E60-81CE-D8900327DE33}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -291,6 +297,18 @@ Global
{A37994CC-A23A-415E-8B61-9468C7178A55}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A37994CC-A23A-415E-8B61-9468C7178A55}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A37994CC-A23A-415E-8B61-9468C7178A55}.Release|Any CPU.Build.0 = Release|Any CPU
{48F1F736-3D87-4453-B497-BD9C203B2385}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{48F1F736-3D87-4453-B497-BD9C203B2385}.Debug|Any CPU.Build.0 = Debug|Any CPU
{48F1F736-3D87-4453-B497-BD9C203B2385}.Release|Any CPU.ActiveCfg = Release|Any CPU
{48F1F736-3D87-4453-B497-BD9C203B2385}.Release|Any CPU.Build.0 = Release|Any CPU
{A15CCDD5-B707-4142-B99A-64F0AB62318A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A15CCDD5-B707-4142-B99A-64F0AB62318A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A15CCDD5-B707-4142-B99A-64F0AB62318A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A15CCDD5-B707-4142-B99A-64F0AB62318A}.Release|Any CPU.Build.0 = Release|Any CPU
{6DAAAA87-8DDF-4E60-81CE-D8900327DE33}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6DAAAA87-8DDF-4E60-81CE-D8900327DE33}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6DAAAA87-8DDF-4E60-81CE-D8900327DE33}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6DAAAA87-8DDF-4E60-81CE-D8900327DE33}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -341,6 +359,9 @@ Global
{474BA453-9CFF-41C2-B2E7-ADD92CC93E86} = {95A69671-16CA-4133-981C-CC381B7AAA30}
{B8554582-DE19-41A2-9784-9B27C9F22429} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{A37994CC-A23A-415E-8B61-9468C7178A55} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{48F1F736-3D87-4453-B497-BD9C203B2385} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C}
{A15CCDD5-B707-4142-B99A-64F0AB62318A} = {95A69671-16CA-4133-981C-CC381B7AAA30}
{6DAAAA87-8DDF-4E60-81CE-D8900327DE33} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {8CBB7278-D093-448E-B3DE-B5991209A1AA}
Expand Down
14 changes: 14 additions & 0 deletions sandbox/Example.Client/Example.Client.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\NATS.Client\NATS.Client.csproj" />
</ItemGroup>

</Project>
98 changes: 98 additions & 0 deletions sandbox/Example.Client/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// See https://aka.ms/new-console-template for more information

using System.Text;
using NATS.Client;

CancellationTokenSource cts = new();

await using var client = new NatsClient();

// Subscribe for int, string, bytes, json
List<Task> tasks =
[
Task.Run(async () =>
{
await foreach (var msg in client.SubscribeAsync<int>("x.int", cancellationToken: cts.Token))
{
Console.WriteLine($"Received int: {msg.Data}");
}
}),

Task.Run(async () =>
{
await foreach (var msg in client.SubscribeAsync<string>("x.string", cancellationToken: cts.Token))
{
Console.WriteLine($"Received string: {msg.Data}");
}
}),

Task.Run(async () =>
{
await foreach (var msg in client.SubscribeAsync<byte[]>("x.bytes", cancellationToken: cts.Token))
{
if (msg.Data != null)
{
Console.WriteLine($"Received bytes: {Encoding.UTF8.GetString(msg.Data)}");
}
}
}),

Task.Run(async () =>
{
await foreach (var msg in client.SubscribeAsync<MyData>("x.json", cancellationToken: cts.Token))
{
Console.WriteLine($"Received data: {msg.Data}");
}
}),

Task.Run(async () =>
{
await foreach (var msg in client.SubscribeAsync<MyData>("x.service", cancellationToken: cts.Token))
{
if (msg.Data != null)
{
Console.WriteLine($"Replying to data: {msg.Data}");
await msg.ReplyAsync($"Thank you {msg.Data.Name} your Id is {msg.Data.Id}!");
}
}
}),

Task.Run(async () =>
{
var id = 0;
await foreach (var msg in client.SubscribeAsync<object>("x.service2", cancellationToken: cts.Token))
{
await msg.ReplyAsync(new MyData(id++, $"foo{id}"));
}
})
];

await Task.Delay(1000);

await client.PublishAsync("x.int", 100);
await client.PublishAsync("x.string", "Hello, World!");
await client.PublishAsync("x.bytes", new byte[] { 65, 66, 67 });
await client.PublishAsync("x.json", new MyData(30, "bar"));

// Request/Reply
{
var response = await client.RequestAsync<MyData, string>("x.service", new MyData(100, "foo"));
Console.WriteLine($"Response: {response.Data}");
}

// Request/Reply without request data
for (var i = 0; i < 3; i++)
{
var response = await client.RequestAsync<MyData>("x.service2");
Console.WriteLine($"Response[{i}]: {response.Data}");
}

// Use JetStream by referencing NATS.Client.JetStream package
// var js = client.GetJetStream();
await cts.CancelAsync();

await Task.WhenAll(tasks);

Console.WriteLine("Bye!");

public record MyData(int Id, string Name);
2 changes: 2 additions & 0 deletions sandbox/Example.NativeAot/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ public void Serialize(IBufferWriter<byte> bufferWriter, T value)

throw new NatsException($"Can't deserialize {typeof(T)}");
}

public INatsSerializer<T> CombineWith(INatsSerializer<T> next) => throw new NotImplementedException();
}

public record MyData
Expand Down
118 changes: 118 additions & 0 deletions src/NATS.Client.Core/INatsClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
namespace NATS.Client.Core;

public interface INatsClient : IAsyncDisposable
{
/// <summary>
/// Represents a connection to the NATS server.
/// </summary>
INatsConnection Connection { get; }

/// <summary>
/// Connect socket and write CONNECT command to nats server.
/// </summary>
ValueTask ConnectAsync();

/// <summary>
/// Send PING command and await PONG. Return value is similar as Round Trip Time (RTT).
/// </summary>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <returns>A <see cref="ValueTask{TResult}"/> that represents the asynchronous round trip operation.</returns>
ValueTask<TimeSpan> PingAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Publishes a serializable message payload to the given subject name, optionally supplying a reply subject.
/// </summary>
/// <param name="subject">The destination subject to publish to.</param>
/// <param name="data">Serializable data object.</param>
/// <param name="headers">Optional message headers.</param>
/// <param name="replyTo">Optional reply-to subject.</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="opts">A <see cref="NatsPubOpts"/> for publishing options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <typeparam name="T">Specifies the type of data that may be sent to the NATS Server.</typeparam>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
ValueTask PublishAsync<T>(string subject, T data, NatsHeaders? headers = default, string? replyTo = default, INatsSerialize<T>? serializer = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Publishes an empty message payload to the given subject name, optionally supplying a reply subject.
/// </summary>
/// <param name="subject">The destination subject to publish to.</param>
/// <param name="headers">Optional message headers.</param>
/// <param name="replyTo">Optional reply-to subject.</param>
/// <param name="opts">A <see cref="NatsPubOpts"/> for publishing options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous send operation.</returns>
/// <remarks>
/// Publishing a sentinel usually means a signal to the given subject which could be used to trigger an action
/// or indicate an event for example and of messages.
/// </remarks>
ValueTask PublishAsync(string subject, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Initiates a subscription to a subject, optionally joining a distributed queue group.
/// </summary>
/// <param name="subject">The subject name to subscribe to.</param>
/// <param name="queueGroup">If specified, the subscriber will join this queue group.</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="opts">A <see cref="NatsSubOpts"/> for subscription options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the command.</param>
/// <typeparam name="T">Specifies the type of data that may be received from the NATS Server.</typeparam>
/// <returns>An asynchronous enumerable of <see cref="NatsMsg{T}"/> objects</returns>
/// <remarks>
/// Subscribers with the same queue group name, become a queue group,
/// and only one randomly chosen subscriber of the queue group will
/// consume a message each time a message is received by the queue group.
/// </remarks>
IAsyncEnumerable<NatsMsg<T>> SubscribeAsync<T>(string subject, string? queueGroup = default, INatsDeserialize<T>? serializer = default, NatsSubOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Request and receive a single reply from a responder.
/// </summary>
/// <param name="subject">Subject of the responder</param>
/// <param name="data">Data to send to responder</param>
/// <param name="headers">Optional message headers</param>
/// <param name="requestSerializer">Serializer to use for the request message type.</param>
/// <param name="replySerializer">Serializer to use for the reply message type.</param>
/// <param name="requestOpts">Request publish options</param>
/// <param name="replyOpts">Reply handler subscription options</param>
/// <param name="cancellationToken">Cancel this request</param>
/// <typeparam name="TRequest">Request type</typeparam>
/// <typeparam name="TReply">Reply type</typeparam>
/// <returns>Returns the <see cref="NatsMsg{T}"/> received from the responder as reply.</returns>
/// <exception cref="OperationCanceledException">Raised when cancellation token is used</exception>
/// <remarks>
/// Response can be (null) or one <see cref="NatsMsg{T}"/>.
/// Reply option's max messages will be set to 1.
/// If reply option's timeout is not defined, then it will be set to NatsOpts.RequestTimeout.
/// </remarks>
ValueTask<NatsMsg<TReply>> RequestAsync<TRequest, TReply>(
string subject,
TRequest? data,
NatsHeaders? headers = default,
INatsSerialize<TRequest>? requestSerializer = default,
INatsDeserialize<TReply>? replySerializer = default,
NatsPubOpts? requestOpts = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Send an empty request message and await the reply message asynchronously.
/// </summary>
/// <param name="subject">Subject of the responder</param>
/// <param name="replySerializer">Serializer to use for the reply message type.</param>
/// <param name="replyOpts">Reply handler subscription options</param>
/// <param name="cancellationToken">Cancel this request</param>
/// <typeparam name="TReply">Reply type</typeparam>
/// <returns>Returns the <see cref="NatsMsg{T}"/> received from the responder as reply.</returns>
/// <exception cref="OperationCanceledException">Raised when cancellation token is used</exception>
/// <remarks>
/// Response can be (null) or one <see cref="NatsMsg{T}"/>.
/// Reply option's max messages will be set to 1.
/// If reply option's timeout is not defined, then it will be set to NatsOpts.RequestTimeout.
/// </remarks>
ValueTask<NatsMsg<TReply>> RequestAsync<TReply>(
string subject,
INatsDeserialize<TReply>? replySerializer = default,
NatsSubOpts? replyOpts = default,
CancellationToken cancellationToken = default);
}
Loading

0 comments on commit 1a80606

Please sign in to comment.