diff --git a/src/NATS.Client.Core/Commands/PublishCommand.cs b/src/NATS.Client.Core/Commands/PublishCommand.cs index ecea042ed..04b6d27f4 100644 --- a/src/NATS.Client.Core/Commands/PublishCommand.cs +++ b/src/NATS.Client.Core/Commands/PublishCommand.cs @@ -10,6 +10,7 @@ internal sealed class PublishCommand : CommandBase> private NatsHeaders? _headers; private T? _value; private INatsSerializer? _serializer; + private Action? _errorHandler; private CancellationToken _cancellationToken; private PublishCommand() @@ -18,7 +19,7 @@ private PublishCommand() public override bool IsCanceled => _cancellationToken.IsCancellationRequested; - public static PublishCommand Create(ObjectPool pool, string subject, string? replyTo, NatsHeaders? headers, T? value, INatsSerializer serializer, CancellationToken cancellationToken) + public static PublishCommand Create(ObjectPool pool, string subject, string? replyTo, NatsHeaders? headers, T? value, INatsSerializer serializer, Action? errorHandler, CancellationToken cancellationToken) { if (!TryRent(pool, out var result)) { @@ -30,6 +31,7 @@ public static PublishCommand Create(ObjectPool pool, string subject, string? result._headers = headers; result._value = value; result._serializer = serializer; + result._errorHandler = errorHandler; result._cancellationToken = cancellationToken; return result; @@ -37,7 +39,32 @@ public static PublishCommand Create(ObjectPool pool, string subject, string? public override void Write(ProtocolWriter writer) { - writer.WritePublish(_subject!, _replyTo, _headers, _value, _serializer!); + try + { + writer.WritePublish(_subject!, _replyTo, _headers, _value, _serializer!); + } + catch (Exception e) + { + if (_errorHandler is { } errorHandler) + { + ThreadPool.UnsafeQueueUserWorkItem( + state => + { + try + { + state.handler(state.exception); + } + catch + { + // ignore + } + }, + (handler: errorHandler, exception: e), + preferLocal: false); + } + + throw; + } } protected override void Reset() @@ -46,6 +73,7 @@ protected override void Reset() _headers = default; _value = default; _serializer = null; + _errorHandler = default; _cancellationToken = default; } } diff --git a/src/NATS.Client.Core/Internal/NatsPipeliningWriteProtocolProcessor.cs b/src/NATS.Client.Core/Internal/NatsPipeliningWriteProtocolProcessor.cs index 8ea96dcdd..2f7515127 100644 --- a/src/NATS.Client.Core/Internal/NatsPipeliningWriteProtocolProcessor.cs +++ b/src/NATS.Client.Core/Internal/NatsPipeliningWriteProtocolProcessor.cs @@ -129,14 +129,27 @@ private async Task WriteLoopAsync() continue; } - if (command is IBatchCommand batch) + try { - count += batch.Write(protocolWriter); + if (command is IBatchCommand batch) + { + count += batch.Write(protocolWriter); + } + else + { + command.Write(protocolWriter); + count++; + } } - else + catch (Exception e) { - command.Write(protocolWriter); - count++; + // flag potential serialization exceptions + if (command is IPromise promise) + { + promise.SetException(e); + } + + throw; } if (command is IPromise p) diff --git a/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs b/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs index c010d43d1..c6e1c09cb 100644 --- a/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs +++ b/src/NATS.Client.Core/NatsConnection.LowLevelApi.cs @@ -28,20 +28,20 @@ internal ValueTask PubPostAsync(string subject, string? replyTo = default, ReadO } } - internal ValueTask PubModelPostAsync(string subject, T? data, INatsSerializer serializer, string? replyTo = default, NatsHeaders? headers = default, CancellationToken cancellationToken = default) + internal ValueTask PubModelPostAsync(string subject, T? data, INatsSerializer serializer, string? replyTo = default, NatsHeaders? headers = default, Action? errorHandler = default, CancellationToken cancellationToken = default) { headers?.SetReadOnly(); if (ConnectionState == NatsConnectionState.Open) { - var command = PublishCommand.Create(_pool, subject, replyTo, headers, data, serializer, cancellationToken); + var command = PublishCommand.Create(_pool, subject, replyTo, headers, data, serializer, errorHandler, cancellationToken); return EnqueueCommandAsync(command); } else { - return WithConnectAsync(subject, replyTo, headers, data, serializer, cancellationToken, static (self, s, r, h, d, ser, c) => + return WithConnectAsync(subject, replyTo, headers, data, serializer, errorHandler, cancellationToken, static (self, s, r, h, d, ser, eh, c) => { - var command = PublishCommand.Create(self._pool, s, r, h, d, ser, c); + var command = PublishCommand.Create(self._pool, s, r, h, d, ser, eh, c); return self.EnqueueCommandAsync(command); }); } diff --git a/src/NATS.Client.Core/NatsConnection.Publish.cs b/src/NATS.Client.Core/NatsConnection.Publish.cs index 388908106..79094fa97 100644 --- a/src/NATS.Client.Core/NatsConnection.Publish.cs +++ b/src/NATS.Client.Core/NatsConnection.Publish.cs @@ -33,7 +33,7 @@ public ValueTask PublishAsync(string subject, T? data, NatsHeaders? headers = } else { - return PubModelPostAsync(subject, data, serializer, replyTo, headers, cancellationToken); + return PubModelPostAsync(subject, data, serializer, replyTo, headers, opts?.ErrorHandler, cancellationToken); } } diff --git a/src/NATS.Client.Core/NatsConnection.RequestSub.cs b/src/NATS.Client.Core/NatsConnection.RequestSub.cs index cbdfa5320..654348e35 100644 --- a/src/NATS.Client.Core/NatsConnection.RequestSub.cs +++ b/src/NATS.Client.Core/NatsConnection.RequestSub.cs @@ -49,7 +49,7 @@ internal async ValueTask> RequestSubAsync( } else { - await PubModelPostAsync(subject, data, serializer, replyTo, headers, cancellationToken).ConfigureAwait(false); + await PubModelPostAsync(subject, data, serializer, replyTo, headers, requestOpts?.ErrorHandler, cancellationToken).ConfigureAwait(false); } return sub; diff --git a/src/NATS.Client.Core/NatsConnection.cs b/src/NATS.Client.Core/NatsConnection.cs index cf578cbee..0d5d6c9d0 100644 --- a/src/NATS.Client.Core/NatsConnection.cs +++ b/src/NATS.Client.Core/NatsConnection.cs @@ -747,6 +747,12 @@ private async ValueTask WithConnectAsync(T1 item1, T2 it await coreAsync(this, item1, item2, item3, item4, item5, item6).ConfigureAwait(false); } + private async ValueTask WithConnectAsync(T1 item1, T2 item2, T3 item3, T4 item4, T5 item5, T6 item6, T7 item7, Func coreAsync) + { + await ConnectAsync().ConfigureAwait(false); + await coreAsync(this, item1, item2, item3, item4, item5, item6, item7).ConfigureAwait(false); + } + private async ValueTask WithConnectAsync(Func> coreAsync) { await ConnectAsync().ConfigureAwait(false); diff --git a/src/NATS.Client.Core/NatsPubOpts.cs b/src/NATS.Client.Core/NatsPubOpts.cs index dc1d14539..f1996254d 100644 --- a/src/NATS.Client.Core/NatsPubOpts.cs +++ b/src/NATS.Client.Core/NatsPubOpts.cs @@ -9,4 +9,13 @@ public record NatsPubOpts /// Default value is false, and calls to PublishAsync will complete after the publish command has been written to the Command Channel /// public bool? WaitUntilSent { get; init; } + + /// + /// Optional callback to handle serialization exceptions. + /// + /// + /// When WaitUntilSent is set to false serialization exceptions won't propagate + /// to the caller but this callback will be called with the exception thrown by the serializer. + /// + public Action? ErrorHandler { get; init; } } diff --git a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs index 64b7e2df6..e8c268b9b 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSConsume.cs @@ -165,6 +165,7 @@ internal override IEnumerable GetReconnectCommands(int sid) headers: default, value: request, serializer: NatsJsonSerializer.Default, + errorHandler: default, cancellationToken: default); } diff --git a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs index c04049b15..a189be65e 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSFetch.cs @@ -151,6 +151,7 @@ internal override IEnumerable GetReconnectCommands(int sid) headers: default, value: request, serializer: NatsJsonSerializer.Default, + errorHandler: default, cancellationToken: default); } diff --git a/tests/NATS.Client.Core.Tests/SerializerTest.cs b/tests/NATS.Client.Core.Tests/SerializerTest.cs new file mode 100644 index 000000000..834f978ea --- /dev/null +++ b/tests/NATS.Client.Core.Tests/SerializerTest.cs @@ -0,0 +1,72 @@ +using System.Buffers; + +namespace NATS.Client.Core.Tests; + +public class SerializerTest +{ + private readonly ITestOutputHelper _output; + + public SerializerTest(ITestOutputHelper output) => _output = output; + + [Fact] + public async Task Serializer_exceptions() + { + await using var server = NatsServer.Start(); + await using var nats = server.CreateClientConnection(); + + await Assert.ThrowsAsync(async () => + { + var signal = new WaitSignal(); + + var opts = new NatsPubOpts + { + Serializer = new TestSerializer(), + WaitUntilSent = false, + ErrorHandler = e => + { + signal.Pulse(e); + }, + }; + + await nats.PublishAsync( + "foo", + 0, + opts: opts); + + throw await signal; + }); + + await Assert.ThrowsAsync(async () => + { + await nats.PublishAsync( + "foo", + 0, + opts: new NatsPubOpts { Serializer = new TestSerializer(), WaitUntilSent = true }); + }); + + // Check that our connection isn't affected by the exceptions + await using var sub = await nats.SubscribeAsync("foo"); + + var rtt = await nats.PingAsync(); + Assert.True(rtt > TimeSpan.Zero); + + await nats.PublishAsync("foo", 1); + + var result = (await sub.Msgs.ReadAsync()).Data; + + Assert.Equal(1, result); + } +} + +public class TestSerializer : INatsSerializer +{ + public int Serialize(ICountableBufferWriter bufferWriter, T? value) => throw new TestSerializerException(); + + public T? Deserialize(in ReadOnlySequence buffer) => throw new TestSerializerException(); + + public object? Deserialize(in ReadOnlySequence buffer, Type type) => throw new TestSerializerException(); +} + +public class TestSerializerException : Exception +{ +}