Skip to content

Commit

Permalink
Add flags to NatsMsg and JetStream Request (#652)
Browse files Browse the repository at this point in the history
* Add flags to NatsMsg

* Code comments for msg size

* Format

* Fix warnings

* Keep 503 header

* Format

* JetStream TryJSRequestAsync

* Format

* Fix tests

* Refactor JetStream method results to use NatsResult

* Refactor NatsMsg size and flags handling in tests and class

* Refactor JSON document deserialization

* Refactor NatsJSContext error handling logic.

Reordered error checks in `NatsJSContext.cs` to prioritize handling `HasNoResponders` and `null` data conditions early. Modified flag checks in `NatsMsg.cs` for `IsEmpty` and `HasNoResponders` by directly using bitwise operations instead of enum comparisons.

* Fix memory leak by disposing JsonDocument properly

* Skip slow tests for unique NUID generation
  • Loading branch information
mtmk authored Nov 22, 2024
1 parent dcf77a4 commit 3771b04
Show file tree
Hide file tree
Showing 10 changed files with 374 additions and 89 deletions.
152 changes: 138 additions & 14 deletions src/NATS.Client.Core/NatsMsg.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
using System.Buffers;
using System.Diagnostics.CodeAnalysis;
using System.Xml.Linq;
using NATS.Client.Core.Internal;

namespace NATS.Client.Core;

[Flags]
public enum NatsMsgFlags : byte
{
None = 0,
Empty = 1,
NoResponders = 2,
}

/// <summary>
/// This interface provides an optional contract when passing
/// messages to processing methods which is usually helpful in
Expand Down Expand Up @@ -103,12 +112,6 @@ public interface INatsMsg<T>
/// <summary>
/// NATS message structure as defined by the protocol.
/// </summary>
/// <param name="Subject">The destination subject to publish to.</param>
/// <param name="ReplyTo">The reply subject that subscribers can use to send a response back to the publisher/requester.</param>
/// <param name="Size">Message size in bytes.</param>
/// <param name="Headers">Pass additional information using name-value pairs.</param>
/// <param name="Data">Serializable data object.</param>
/// <param name="Connection">NATS connection this message is associated to.</param>
/// <typeparam name="T">Specifies the type of data that may be sent to the NATS Server.</typeparam>
/// <remarks>
/// <para>Connection property is used to provide reply functionality.</para>
Expand All @@ -119,20 +122,120 @@ public interface INatsMsg<T>
/// </code>
/// </para>
/// </remarks>
public readonly record struct NatsMsg<T>(
string Subject,
string? ReplyTo,
int Size,
NatsHeaders? Headers,
T? Data,
INatsConnection? Connection) : INatsMsg<T>
public readonly record struct NatsMsg<T> : INatsMsg<T>
{
/*
2 30
+--+------------------------------+
|EN| Message Size |
+--+------------------------------+
E: Empty flag
N: No responders flag
# Size is 30 bits:
Max Size: 1,073,741,823 (0x3FFFFFFF / 00111111111111111111111111111111)
Uint.Max: 4,294,967,295
Int.Max: 2,147,483,647
8mb: 8,388,608
*/
private readonly uint _flagsAndSize;

/// <summary>
/// NATS message structure as defined by the protocol.
/// </summary>
/// <param name="subject">The destination subject to publish to.</param>
/// <param name="replyTo">The reply subject that subscribers can use to send a response back to the publisher/requester.</param>
/// <param name="size">Message size in bytes.</param>
/// <param name="headers">Pass additional information using name-value pairs.</param>
/// <param name="data">Serializable data object.</param>
/// <param name="connection">NATS connection this message is associated to.</param>
/// <param name="flags">Message flags to indicate no responders and empty payloads.</param>
/// <remarks>
/// <para>Connection property is used to provide reply functionality.</para>
/// <para>
/// Message size is calculated using the same method NATS server uses:
/// <code lang="C#">
/// int size = subject.Length + replyTo.Length + headers.Length + payload.Length;
/// </code>
/// </para>
/// </remarks>
public NatsMsg(
string subject,
string? replyTo,
int size,
NatsHeaders? headers,
T? data,
INatsConnection? connection,
NatsMsgFlags flags = default)
{
Subject = subject;
ReplyTo = replyTo;
_flagsAndSize = ((uint)flags << 30) | (uint)(size & 0x3FFFFFFF);
Headers = headers;
Data = data;
Connection = connection;
}

/// <inheritdoc />
public NatsException? Error => Headers?.Error;

/// <summary>The destination subject to publish to.</summary>
public string Subject { get; init; }

/// <summary>The reply subject that subscribers can use to send a response back to the publisher/requester.</summary>
public string? ReplyTo { get; init; }

/// <summary>Message size in bytes.</summary>
public int Size
{
// Extract the lower 30 bits
get => (int)(_flagsAndSize & 0x3FFFFFFF);

// Clear the lower 30 bits and set the new number
init
{
// Mask the input value to fit within 30 bits (clear upper bits)
var numberPart = (uint)(value & 0x3FFFFFFF);

// Clear the lower 30 bits and set the new number value
// Preserve the flags, update the number
_flagsAndSize = (_flagsAndSize & 0xC0000000) | numberPart;
}
}

public NatsMsgFlags Flags
{
// Extract the two leftmost bits (31st and 30th bit)
// Mask with 0b11 to get two bits
get => (NatsMsgFlags)((_flagsAndSize >> 30) & 0b11);

init
{
// Clear the current flag bits (set to 0) and then set the new flag value
var flagsPart = (uint)value << 30;
_flagsAndSize = (_flagsAndSize & 0x3FFFFFFF) | flagsPart;
}
}

/// <summary>Pass additional information using name-value pairs.</summary>
public NatsHeaders? Headers { get; init; }

/// <summary>Serializable data object.</summary>
public T? Data { get; init; }

/// <summary>NATS connection this message is associated to.</summary>
public INatsConnection? Connection { get; init; }

public bool IsEmpty => (_flagsAndSize & 0x40000000) != 0;

public bool HasNoResponders => (_flagsAndSize & 0x80000000) != 0;

/// <inheritdoc />
public void EnsureSuccess()
{
if (HasNoResponders)
throw new NatsNoRespondersException();

if (Error != null)
throw Error;
}
Expand Down Expand Up @@ -197,6 +300,17 @@ public ValueTask ReplyAsync<TReply>(NatsMsg<TReply> msg, INatsSerialize<TReply>?
return Connection.PublishAsync(msg with { Subject = ReplyTo }, serializer, opts, cancellationToken);
}

public void Deconstruct(out string subject, out string? replyTo, out int size, out NatsHeaders? headers, out T? data, out INatsConnection? connection, out NatsMsgFlags flags)
{
subject = Subject;
replyTo = ReplyTo;
size = Size;
headers = Headers;
data = Data;
connection = Connection;
flags = Flags;
}

internal static NatsMsg<T> Build(
string subject,
string? replyTo,
Expand All @@ -207,6 +321,16 @@ internal static NatsMsg<T> Build(
INatsDeserialize<T> serializer)
{
NatsHeaders? headers = null;
var flags = NatsMsgFlags.None;

if (payloadBuffer.Length == 0)
{
flags |= NatsMsgFlags.Empty;
if (NatsSubBase.IsHeader503(headersBuffer))
{
flags |= NatsMsgFlags.NoResponders;
}
}

if (headersBuffer != null)
{
Expand Down Expand Up @@ -277,7 +401,7 @@ internal static NatsMsg<T> Build(
}
}

return new NatsMsg<T>(subject, replyTo, (int)size, headers, data, connection);
return new NatsMsg<T>(subject, replyTo, (int)size, headers, data, connection, flags);
}

[MemberNotNull(nameof(Connection))]
Expand Down
38 changes: 38 additions & 0 deletions src/NATS.Client.Core/NatsResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System.Runtime.CompilerServices;

namespace NATS.Client.Core;

public readonly struct NatsResult<T>
{
private readonly T? _value;
private readonly Exception? _error;

public NatsResult(T value)
{
_value = value;
_error = null;
}

public NatsResult(Exception error)
{
_value = default;
_error = error;
}

public T Value => _value ?? ThrowValueIsNotSetException();

public Exception Error => _error ?? ThrowErrorIsNotSetException();

public bool Success => _error == null;

public static implicit operator NatsResult<T>(T value) => new(value);

public static implicit operator NatsResult<T>(Exception error) => new(error);

private static T ThrowValueIsNotSetException() => throw CreateInvalidOperationException("Result value is not set");

private static Exception ThrowErrorIsNotSetException() => throw CreateInvalidOperationException("Result error is not set");

[MethodImpl(MethodImplOptions.NoInlining)]
private static Exception CreateInvalidOperationException(string message) => new InvalidOperationException(message);
}
6 changes: 5 additions & 1 deletion src/NATS.Client.Core/NatsSubBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public virtual async ValueTask ReceiveAsync(string subject, string? replyTo, Rea
{
switch (Opts)
{
case { ThrowIfNoResponders: true } when headersBuffer is { Length: >= 12 } && headersBuffer.Value.Slice(8, 4).ToSpan().SequenceEqual(NoRespondersHeaderSequence):
case { ThrowIfNoResponders: true } when IsHeader503(headersBuffer):
SetException(new NatsNoRespondersException());
return;
case { StopOnEmptyMsg: true }:
Expand Down Expand Up @@ -311,6 +311,10 @@ public virtual async ValueTask ReceiveAsync(string subject, string? replyTo, Rea
}
}

internal static bool IsHeader503(ReadOnlySequence<byte>? headersBuffer) =>
headersBuffer is { Length: >= 12 }
&& headersBuffer.Value.Slice(8, 4).ToSpan().SequenceEqual(NoRespondersHeaderSequence);

internal void ClearException() => Interlocked.Exchange(ref _exception, null);

/// <summary>
Expand Down

This file was deleted.

12 changes: 12 additions & 0 deletions src/NATS.Client.JetStream/Internal/NatsJSJsonDocumentSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System.Buffers;
using System.Text.Json;
using NATS.Client.Core;

namespace NATS.Client.JetStream.Internal;

internal sealed class NatsJSJsonDocumentSerializer : INatsDeserialize<JsonDocument>
{
public static readonly NatsJSJsonDocumentSerializer Default = new();

public JsonDocument? Deserialize(in ReadOnlySequence<byte> buffer) => buffer.Length == 0 ? default : JsonDocument.Parse(buffer);
}
Loading

0 comments on commit 3771b04

Please sign in to comment.