Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JetStream consumer options #107

Merged
merged 14 commits into from
Aug 9, 2023
3 changes: 3 additions & 0 deletions src/NATS.Client.Core/Commands/CommandConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ internal static class CommandConstants
// string.Join(",", Encoding.ASCII.GetBytes("PONG\r\n"))
public static ReadOnlySpan<byte> PongNewLine => new byte[] { 80, 79, 78, 71, 13, 10 };

// string.Join(",", Encoding.ASCII.GetBytes("NATS/1.0"))
public static ReadOnlySpan<byte> NatsHeaders10 => new byte[] { 78, 65, 84, 83, 47, 49, 46, 48 };

mtmk marked this conversation as resolved.
Show resolved Hide resolved
// string.Join(",", Encoding.ASCII.GetBytes("NATS/1.0\r\n"))
public static ReadOnlySpan<byte> NatsHeaders10NewLine => new byte[] { 78, 65, 84, 83, 47, 49, 46, 48, 13, 10 };
}
94 changes: 83 additions & 11 deletions src/NATS.Client.Core/HeaderParser.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
// Adapted from https://github.com/dotnet/aspnetcore/blob/v6.0.18/src/Servers/Kestrel/Core/src/Internal/Http/HttpParser.cs

using System.Buffers;
using System.Buffers.Text;
using System.Diagnostics;
using System.Text;
using Microsoft.Extensions.Primitives;
using NATS.Client.Core.Commands;
using NATS.Client.Core.Internal;

// ReSharper disable ConditionIsAlwaysTrueOrFalse
// ReSharper disable PossiblyImpureMethodCallOnReadonlyVariable
namespace NATS.Client.Core;

public class HeaderParser
Expand All @@ -18,13 +22,12 @@ public class HeaderParser

private readonly Encoding _encoding;

public HeaderParser(Encoding encoding)
{
_encoding = encoding;
}
public HeaderParser(Encoding encoding) => _encoding = encoding;

public bool ParseHeaders(in SequenceReader<byte> reader, NatsHeaders headers)
{
var isVersionLineRead = false;

while (!reader.End)
{
var span = reader.UnreadSpan;
Expand Down Expand Up @@ -73,7 +76,7 @@ public bool ParseHeaders(in SequenceReader<byte> reader, NatsHeaders headers)
// Headers don't end in CRLF line.
Debug.Assert(readAhead == 0 || readAhead == 2, "readAhead == 0 || readAhead == 2");

throw new NatsException($"Protocol error: invalid headers, no ending CRLFCRLF");
throw new NatsException($"Protocol error: invalid headers, no ending CRLF+CRLF");
}

var length = 0;
Expand Down Expand Up @@ -113,7 +116,7 @@ public bool ParseHeaders(in SequenceReader<byte> reader, NatsHeaders headers)
length < 5 ||

// Exclude the CRLF from the headerLine and parse the header name:value pair
!TryTakeSingleHeader(span[..(length - 2)], headers))
!TryTakeSingleHeader(span[..(length - 2)], headers, ref isVersionLineRead))
{
// Sequence needs to be CRLF and not contain an inner CR not part of terminator.
// Less than min possible headerSpan of 5 bytes a:b\r\n
Expand Down Expand Up @@ -145,7 +148,7 @@ public bool ParseHeaders(in SequenceReader<byte> reader, NatsHeaders headers)
reader.Rewind(readAhead);
}

length = ParseMultiSpanHeader(reader, headers);
length = ParseMultiSpanHeader(reader, headers, ref isVersionLineRead);
if (length < 0)
{
// Not there
Expand All @@ -163,7 +166,7 @@ public bool ParseHeaders(in SequenceReader<byte> reader, NatsHeaders headers)
return false;
}

private int ParseMultiSpanHeader(in SequenceReader<byte> reader, NatsHeaders headers)
private int ParseMultiSpanHeader(in SequenceReader<byte> reader, NatsHeaders headers, ref bool isVersionLineRead)
{
var currentSlice = reader.UnreadSequence;
var lineEndPosition = currentSlice.PositionOfAny(ByteCR, ByteLF);
Expand Down Expand Up @@ -211,7 +214,7 @@ private int ParseMultiSpanHeader(in SequenceReader<byte> reader, NatsHeaders hea
if (headerSpan[^1] != ByteLF ||

// Exclude the CRLF from the headerLine and parse the header name:value pair
!TryTakeSingleHeader(headerSpan[..^2], headers))
!TryTakeSingleHeader(headerSpan[..^2], headers, ref isVersionLineRead))
{
// Sequence needs to be CRLF and not contain an inner CR not part of terminator.
// Not parsable as a valid name:value header pair.
Expand All @@ -221,8 +224,77 @@ private int ParseMultiSpanHeader(in SequenceReader<byte> reader, NatsHeaders hea
return headerSpan.Length;
}

private bool TryTakeSingleHeader(ReadOnlySpan<byte> headerLine, NatsHeaders headers)
private bool TryTakeSingleHeader(ReadOnlySpan<byte> headerLine, NatsHeaders headers, ref bool isVersionLineRead)
{
// We are first looking for a version line
// e.g. NATS/1.0 100 Idle Heartbeat
if (!isVersionLineRead)
{
headerLine.Split(out var versionBytes, out headerLine);

if (!versionBytes.SequenceEqual(CommandConstants.NatsHeaders10))
{
throw new NatsException("Protocol error: header version mismatch");
}

if (headerLine.Length != 0)
{
headerLine.Split(out var codeBytes, out headerLine);
if (!Utf8Parser.TryParse(codeBytes, out int code, out _))
throw new NatsException("Protocol error: header code is not a number");
headers.Code = code;
}

if (headerLine.Length != 0)
{
// We can reduce string allocations by detecting commonly used
// header messages.
if (headerLine.SequenceEqual(NatsHeaders.MessageIdleHeartbeat))
{
headers.Message = NatsHeaders.Messages.IdleHeartbeat;
headers.MessageText = NatsHeaders.MessageIdleHeartbeatStr;
}
else if (headerLine.SequenceEqual(NatsHeaders.MessageBadRequest))
{
headers.Message = NatsHeaders.Messages.BadRequest;
headers.MessageText = NatsHeaders.MessageBadRequestStr;
}
else if (headerLine.SequenceEqual(NatsHeaders.MessageConsumerDeleted))
{
headers.Message = NatsHeaders.Messages.ConsumerDeleted;
headers.MessageText = NatsHeaders.MessageConsumerDeletedStr;
}
else if (headerLine.SequenceEqual(NatsHeaders.MessageConsumerIsPushBased))
{
headers.Message = NatsHeaders.Messages.ConsumerIsPushBased;
headers.MessageText = NatsHeaders.MessageConsumerIsPushBasedStr;
}
else if (headerLine.SequenceEqual(NatsHeaders.MessageNoMessages))
{
headers.Message = NatsHeaders.Messages.NoMessages;
headers.MessageText = NatsHeaders.MessageNoMessagesStr;
}
else if (headerLine.SequenceEqual(NatsHeaders.MessageRequestTimeout))
{
headers.Message = NatsHeaders.Messages.RequestTimeout;
headers.MessageText = NatsHeaders.MessageRequestTimeoutStr;
}
else if (headerLine.SequenceEqual(NatsHeaders.MessageMessageSizeExceedsMaxBytes))
{
headers.Message = NatsHeaders.Messages.MessageSizeExceedsMaxBytes;
headers.MessageText = NatsHeaders.MessageMessageSizeExceedsMaxBytesStr;
}
else
{
headers.Message = NatsHeaders.Messages.Text;
headers.MessageText = _encoding.GetString(headerLine);
}
}

isVersionLineRead = true;
return true;
}

// We are looking for a colon to terminate the header name.
// However, the header name cannot contain a space or tab so look for all three
// and see which is found first.
Expand Down Expand Up @@ -332,5 +404,5 @@ private bool TryTakeSingleHeader(ReadOnlySpan<byte> headerLine, NatsHeaders head
[StackTraceHidden]
private void RejectRequestHeader(ReadOnlySpan<byte> headerLine)
=> throw new NatsException(
$"Protocol error: invalid request header line '{_encoding.GetString(headerLine)}'");
$"Protocol error: invalid request header line '{headerLine.Dump()}'");
}
15 changes: 15 additions & 0 deletions src/NATS.Client.Core/Internal/BufferExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,21 @@ namespace NATS.Client.Core.Internal;

internal static class BufferExtensions
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void Split(this ReadOnlySpan<byte> span, out ReadOnlySpan<byte> left, out ReadOnlySpan<byte> right)
{
var i = span.IndexOf((byte)' ');
if (i == -1)
{
left = span;
right = default;
return;
}

left = span.Slice(0, i);
right = span.Slice(i + 1);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ReadOnlySpan<byte> ToSpan(this ReadOnlySequence<byte> buffer)
{
Expand Down
1 change: 1 addition & 0 deletions src/NATS.Client.Core/Internal/DebuggingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public static string Dump(this NatsHeaders? headers)
return "<NULL>";

var sb = new StringBuilder();
sb.AppendLine($"{headers.Version} {headers.Code} {headers.Message} {headers.MessageText}");
foreach (var (key, stringValues) in headers)
{
foreach (var value in stringValues)
Expand Down
15 changes: 8 additions & 7 deletions src/NATS.Client.Core/Internal/InboxSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ public InboxSub(
_connection = connection;
}

protected override ValueTask ReceiveInternalAsync(
string subject,
string? replyTo,
ReadOnlySequence<byte>? headersBuffer,
ReadOnlySequence<byte> payloadBuffer) =>
// Avoid base class error handling since inboxed subscribers will be responsible for that.
public override ValueTask ReceiveAsync(string subject, string? replyTo, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer) =>
_inbox.ReceivedAsync(subject, replyTo, headersBuffer, payloadBuffer, _connection);

// Not used. Dummy implementation to keep base happy.
protected override ValueTask ReceiveInternalAsync(string subject, string? replyTo, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer)
=> ValueTask.CompletedTask;

protected override void TryComplete()
{
}
Expand All @@ -46,7 +47,7 @@ public InboxSub Build(string subject, NatsSubOpts? opts, NatsConnection connecti
return new InboxSub(this, subject, opts, connection, manager);
}

public void Register(NatsSubBase sub)
public ValueTask RegisterAsync(NatsSubBase sub)
{
_bySubject.AddOrUpdate(
sub.Subject,
Expand All @@ -70,7 +71,7 @@ public void Register(NatsSubBase sub)
},
sub);

sub.Ready();
return sub.ReadyAsync();
}

public async ValueTask ReceivedAsync(string subject, string? replyTo, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer, NatsConnection connection)
Expand Down
40 changes: 9 additions & 31 deletions src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,21 +115,6 @@ private static string ParseError(in ReadOnlySequence<byte> errorSlice)
return Encoding.UTF8.GetString(errorSlice.Slice(5));
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void Split(ReadOnlySpan<byte> span, out ReadOnlySpan<byte> left, out ReadOnlySpan<byte> right)
{
var i = span.IndexOf((byte)' ');
if (i == -1)
{
left = span;
right = default;
return;
}

left = span.Slice(0, i);
right = span.Slice(i + 1);
}

private async Task ReadLoopAsync()
{
while (true)
Expand Down Expand Up @@ -275,14 +260,7 @@ private async Task ReadLoopAsync()
// Prepare buffer for the next message by removing 'headers + payload + \r\n' from it
buffer = buffer.Slice(buffer.GetPosition(2, totalSlice.End));

var versionLength = CommandConstants.NatsHeaders10NewLine.Length;
var versionSlice = totalSlice.Slice(0, versionLength);
if (!versionSlice.ToSpan().SequenceEqual(CommandConstants.NatsHeaders10NewLine))
{
throw new NatsException("Protocol error: header version mismatch");
}

var headerSlice = totalSlice.Slice(versionLength, headersLength - versionLength);
var headerSlice = totalSlice.Slice(0, headersLength);
var payloadSlice = totalSlice.Slice(headersLength, payloadLength);

await _connection.PublishToClientHandlersAsync(subject, replyTo, sid, headerSlice, payloadSlice)
Expand Down Expand Up @@ -446,9 +424,9 @@ private async ValueTask<ReadOnlySequence<byte>> DispatchCommandAsync(int code, R
private (string subject, int sid, int payloadLength, string? replyTo) ParseMessageHeader(ReadOnlySpan<byte> msgHeader)
{
msgHeader = msgHeader.Slice(4);
Split(msgHeader, out var subjectBytes, out msgHeader);
Split(msgHeader, out var sidBytes, out msgHeader);
Split(msgHeader, out var replyToOrSizeBytes, out msgHeader);
msgHeader.Split(out var subjectBytes, out msgHeader);
msgHeader.Split(out var sidBytes, out msgHeader);
msgHeader.Split(out var replyToOrSizeBytes, out msgHeader);

var subject = Encoding.ASCII.GetString(subjectBytes);

Expand Down Expand Up @@ -498,12 +476,12 @@ private async ValueTask<ReadOnlySequence<byte>> DispatchCommandAsync(int code, R
private (string subject, int sid, string? replyTo, int headersLength, int totalLength) ParseHMessageHeader(ReadOnlySpan<byte> msgHeader)
{
// 'HMSG' literal
Split(msgHeader, out _, out msgHeader);
msgHeader.Split(out _, out msgHeader);

Split(msgHeader, out var subjectBytes, out msgHeader);
Split(msgHeader, out var sidBytes, out msgHeader);
Split(msgHeader, out var replyToOrHeaderLenBytes, out msgHeader);
Split(msgHeader, out var headerLenOrTotalLenBytes, out msgHeader);
msgHeader.Split(out var subjectBytes, out msgHeader);
msgHeader.Split(out var sidBytes, out msgHeader);
msgHeader.Split(out var replyToOrHeaderLenBytes, out msgHeader);
msgHeader.Split(out var headerLenOrTotalLenBytes, out msgHeader);

var subject = Encoding.ASCII.GetString(subjectBytes);
var sid = GetInt32(sidBytes);
Expand Down
17 changes: 13 additions & 4 deletions src/NATS.Client.Core/Internal/SubscriptionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public SubscriptionManager(NatsConnection connection, string inboxPrefix)

public async ValueTask SubscribeAsync(string subject, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken)
{
if (subject.StartsWith(_inboxPrefix, StringComparison.Ordinal))
if (IsInboxSubject(subject))
{
await SubscribeInboxAsync(subject, opts, sub, cancellationToken).ConfigureAwait(false);
}
Expand Down Expand Up @@ -140,11 +140,18 @@ public async ValueTask ReconnectAsync(CancellationToken cancellationToken)
await _connection
.SubscribeCoreAsync(sid, sub.Subject, sub.QueueGroup, sub.PendingMsgs, cancellationToken)
.ConfigureAwait(false);
sub.Ready();
await sub.ReadyAsync().ConfigureAwait(false);
}
}
}

public ISubscriptionManager GetManagerFor(string subject)
{
if (IsInboxSubject(subject))
return InboxSubBuilder;
return this;
}

private async ValueTask SubscribeInboxAsync(string subject, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken)
{
if (Interlocked.CompareExchange(ref _inboxSub, _inboxSubSentinel, _inboxSubSentinel) == _inboxSubSentinel)
Expand All @@ -169,7 +176,7 @@ await SubscribeInternalAsync(
}
}

InboxSubBuilder.Register(sub);
await InboxSubBuilder.RegisterAsync(sub).ConfigureAwait(false);
}

private async ValueTask SubscribeInternalAsync(string subject, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken)
Expand All @@ -185,7 +192,7 @@ private async ValueTask SubscribeInternalAsync(string subject, NatsSubOpts? opts
{
await _connection.SubscribeCoreAsync(sid, subject, opts?.QueueGroup, opts?.MaxMsgs, cancellationToken)
.ConfigureAwait(false);
sub.Ready();
await sub.ReadyAsync().ConfigureAwait(false);
}
catch
{
Expand Down Expand Up @@ -241,4 +248,6 @@ private async ValueTask UnsubscribeSidsAsync(List<int> sids)
}
}
}

private bool IsInboxSubject(string subject) => subject.StartsWith(_inboxPrefix, StringComparison.Ordinal);
}
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/NatsConnection.Subscribe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public async ValueTask<INatsSub> SubscribeAsync(string subject, NatsSubOpts? opt
public async ValueTask<INatsSub<T>> SubscribeAsync<T>(string subject, NatsSubOpts? opts = default, CancellationToken cancellationToken = default)
{
var serializer = opts?.Serializer ?? Options.Serializer;
var sub = new NatsSub<T>(this, SubscriptionManager, subject, opts, serializer);
var sub = new NatsSub<T>(this, SubscriptionManager.GetManagerFor(subject), subject, opts, serializer);
await SubAsync(subject, opts, sub, cancellationToken).ConfigureAwait(false);
return sub;
}
Expand Down
Loading