Skip to content

Commit

Permalink
Handle publish serialization exceptions (#144)
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk authored Oct 5, 2023
1 parent b527abe commit 6a9515d
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 13 deletions.
32 changes: 30 additions & 2 deletions src/NATS.Client.Core/Commands/PublishCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ internal sealed class PublishCommand<T> : CommandBase<PublishCommand<T>>
private NatsHeaders? _headers;
private T? _value;
private INatsSerializer? _serializer;
private Action<Exception>? _errorHandler;
private CancellationToken _cancellationToken;

private PublishCommand()
Expand All @@ -18,7 +19,7 @@ private PublishCommand()

public override bool IsCanceled => _cancellationToken.IsCancellationRequested;

public static PublishCommand<T> Create(ObjectPool pool, string subject, string? replyTo, NatsHeaders? headers, T? value, INatsSerializer serializer, CancellationToken cancellationToken)
public static PublishCommand<T> Create(ObjectPool pool, string subject, string? replyTo, NatsHeaders? headers, T? value, INatsSerializer serializer, Action<Exception>? errorHandler, CancellationToken cancellationToken)
{
if (!TryRent(pool, out var result))
{
Expand All @@ -30,14 +31,40 @@ public static PublishCommand<T> Create(ObjectPool pool, string subject, string?
result._headers = headers;
result._value = value;
result._serializer = serializer;
result._errorHandler = errorHandler;
result._cancellationToken = cancellationToken;

return result;
}

public override void Write(ProtocolWriter writer)
{
writer.WritePublish(_subject!, _replyTo, _headers, _value, _serializer!);
try
{
writer.WritePublish(_subject!, _replyTo, _headers, _value, _serializer!);
}
catch (Exception e)
{
if (_errorHandler is { } errorHandler)
{
ThreadPool.UnsafeQueueUserWorkItem(
state =>
{
try
{
state.handler(state.exception);
}
catch
{
// ignore
}
},
(handler: errorHandler, exception: e),
preferLocal: false);
}

throw;
}
}

protected override void Reset()
Expand All @@ -46,6 +73,7 @@ protected override void Reset()
_headers = default;
_value = default;
_serializer = null;
_errorHandler = default;
_cancellationToken = default;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,27 @@ private async Task WriteLoopAsync()
continue;
}

if (command is IBatchCommand batch)
try
{
count += batch.Write(protocolWriter);
if (command is IBatchCommand batch)
{
count += batch.Write(protocolWriter);
}
else
{
command.Write(protocolWriter);
count++;
}
}
else
catch (Exception e)
{
command.Write(protocolWriter);
count++;
// flag potential serialization exceptions
if (command is IPromise promise)
{
promise.SetException(e);
}

throw;
}

if (command is IPromise p)
Expand Down
8 changes: 4 additions & 4 deletions src/NATS.Client.Core/NatsConnection.LowLevelApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,20 @@ internal ValueTask PubPostAsync(string subject, string? replyTo = default, ReadO
}
}

internal ValueTask PubModelPostAsync<T>(string subject, T? data, INatsSerializer serializer, string? replyTo = default, NatsHeaders? headers = default, CancellationToken cancellationToken = default)
internal ValueTask PubModelPostAsync<T>(string subject, T? data, INatsSerializer serializer, string? replyTo = default, NatsHeaders? headers = default, Action<Exception>? errorHandler = default, CancellationToken cancellationToken = default)
{
headers?.SetReadOnly();

if (ConnectionState == NatsConnectionState.Open)
{
var command = PublishCommand<T>.Create(_pool, subject, replyTo, headers, data, serializer, cancellationToken);
var command = PublishCommand<T>.Create(_pool, subject, replyTo, headers, data, serializer, errorHandler, cancellationToken);
return EnqueueCommandAsync(command);
}
else
{
return WithConnectAsync(subject, replyTo, headers, data, serializer, cancellationToken, static (self, s, r, h, d, ser, c) =>
return WithConnectAsync(subject, replyTo, headers, data, serializer, errorHandler, cancellationToken, static (self, s, r, h, d, ser, eh, c) =>
{
var command = PublishCommand<T>.Create(self._pool, s, r, h, d, ser, c);
var command = PublishCommand<T>.Create(self._pool, s, r, h, d, ser, eh, c);
return self.EnqueueCommandAsync(command);
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/NatsConnection.Publish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public ValueTask PublishAsync<T>(string subject, T? data, NatsHeaders? headers =
}
else
{
return PubModelPostAsync<T>(subject, data, serializer, replyTo, headers, cancellationToken);
return PubModelPostAsync<T>(subject, data, serializer, replyTo, headers, opts?.ErrorHandler, cancellationToken);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/NatsConnection.RequestSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ internal async ValueTask<INatsSub<TReply>> RequestSubAsync<TRequest, TReply>(
}
else
{
await PubModelPostAsync(subject, data, serializer, replyTo, headers, cancellationToken).ConfigureAwait(false);
await PubModelPostAsync(subject, data, serializer, replyTo, headers, requestOpts?.ErrorHandler, cancellationToken).ConfigureAwait(false);
}

return sub;
Expand Down
6 changes: 6 additions & 0 deletions src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,12 @@ private async ValueTask WithConnectAsync<T1, T2, T3, T4, T5, T6>(T1 item1, T2 it
await coreAsync(this, item1, item2, item3, item4, item5, item6).ConfigureAwait(false);
}

private async ValueTask WithConnectAsync<T1, T2, T3, T4, T5, T6, T7>(T1 item1, T2 item2, T3 item3, T4 item4, T5 item5, T6 item6, T7 item7, Func<NatsConnection, T1, T2, T3, T4, T5, T6, T7, ValueTask> coreAsync)
{
await ConnectAsync().ConfigureAwait(false);
await coreAsync(this, item1, item2, item3, item4, item5, item6, item7).ConfigureAwait(false);
}

private async ValueTask<T> WithConnectAsync<T>(Func<NatsConnection, ValueTask<T>> coreAsync)
{
await ConnectAsync().ConfigureAwait(false);
Expand Down
9 changes: 9 additions & 0 deletions src/NATS.Client.Core/NatsPubOpts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,13 @@ public record NatsPubOpts
/// Default value is false, and calls to PublishAsync will complete after the publish command has been written to the Command Channel
/// </summary>
public bool? WaitUntilSent { get; init; }

/// <summary>
/// Optional callback to handle serialization exceptions.
/// </summary>
/// <remarks>
/// When <c>WaitUntilSent</c> is set to <c>false</c> serialization exceptions won't propagate
/// to the caller but this callback will be called with the exception thrown by the serializer.
/// </remarks>
public Action<Exception>? ErrorHandler { get; init; }
}
1 change: 1 addition & 0 deletions src/NATS.Client.JetStream/Internal/NatsJSConsume.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ internal override IEnumerable<ICommand> GetReconnectCommands(int sid)
headers: default,
value: request,
serializer: NatsJsonSerializer.Default,
errorHandler: default,
cancellationToken: default);
}

Expand Down
1 change: 1 addition & 0 deletions src/NATS.Client.JetStream/Internal/NatsJSFetch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ internal override IEnumerable<ICommand> GetReconnectCommands(int sid)
headers: default,
value: request,
serializer: NatsJsonSerializer.Default,
errorHandler: default,
cancellationToken: default);
}

Expand Down
72 changes: 72 additions & 0 deletions tests/NATS.Client.Core.Tests/SerializerTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
using System.Buffers;

namespace NATS.Client.Core.Tests;

public class SerializerTest
{
private readonly ITestOutputHelper _output;

public SerializerTest(ITestOutputHelper output) => _output = output;

[Fact]
public async Task Serializer_exceptions()
{
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection();

await Assert.ThrowsAsync<TestSerializerException>(async () =>
{
var signal = new WaitSignal<Exception>();
var opts = new NatsPubOpts
{
Serializer = new TestSerializer(),
WaitUntilSent = false,
ErrorHandler = e =>
{
signal.Pulse(e);
},
};
await nats.PublishAsync(
"foo",
0,
opts: opts);
throw await signal;
});

await Assert.ThrowsAsync<TestSerializerException>(async () =>
{
await nats.PublishAsync(
"foo",
0,
opts: new NatsPubOpts { Serializer = new TestSerializer(), WaitUntilSent = true });
});

// Check that our connection isn't affected by the exceptions
await using var sub = await nats.SubscribeAsync<int>("foo");

var rtt = await nats.PingAsync();
Assert.True(rtt > TimeSpan.Zero);

await nats.PublishAsync("foo", 1);

var result = (await sub.Msgs.ReadAsync()).Data;

Assert.Equal(1, result);
}
}

public class TestSerializer : INatsSerializer
{
public int Serialize<T>(ICountableBufferWriter bufferWriter, T? value) => throw new TestSerializerException();

public T? Deserialize<T>(in ReadOnlySequence<byte> buffer) => throw new TestSerializerException();

public object? Deserialize(in ReadOnlySequence<byte> buffer, Type type) => throw new TestSerializerException();
}

public class TestSerializerException : Exception
{
}

0 comments on commit 6a9515d

Please sign in to comment.