From 44c7cfd3fcd2fa257fe129ae00150d31f2e36f32 Mon Sep 17 00:00:00 2001 From: Havret Date: Wed, 12 Jun 2024 12:04:58 +0200 Subject: [PATCH] Extend logging --- .../AutoRecoveringAnonymousProducer.cs | 4 +- .../AutoRecoveringConnection.cs | 48 +++++++++++--- .../AutoRecovering/AutoRecoveringConsumer.cs | 32 ++++++--- .../AutoRecovering/AutoRecoveringProducer.cs | 6 +- .../AutoRecoveringProducerBase.cs | 40 ++++++++---- .../AutoRecoveringRequestReplyClient.cs | 16 ++--- .../AutoRecovering/ConnectCommand.cs | 2 +- src/ArtemisNetClient/Consumer.cs | 65 ++++++++++++------- 8 files changed, 142 insertions(+), 71 deletions(-) diff --git a/src/ArtemisNetClient/AutoRecovering/AutoRecoveringAnonymousProducer.cs b/src/ArtemisNetClient/AutoRecovering/AutoRecoveringAnonymousProducer.cs index c873f103..5824ad44 100644 --- a/src/ArtemisNetClient/AutoRecovering/AutoRecoveringAnonymousProducer.cs +++ b/src/ArtemisNetClient/AutoRecovering/AutoRecoveringAnonymousProducer.cs @@ -32,7 +32,7 @@ public async Task SendAsync(string address, RoutingType? routingType, Message me { HandleProducerClosed(); await WaitAsync(cancellationToken).ConfigureAwait(false); - Log.RetryingSendAsync(Logger); + Log.RetryingSendAsync(Logger, address); } } } @@ -52,7 +52,7 @@ public void Send(string address, RoutingType? routingType, Message message, Canc { HandleProducerClosed(); Wait(cancellationToken); - Log.RetryingSendAsync(Logger); + Log.RetryingSendAsync(Logger, address); } } } diff --git a/src/ArtemisNetClient/AutoRecovering/AutoRecoveringConnection.cs b/src/ArtemisNetClient/AutoRecovering/AutoRecoveringConnection.cs index 183f478e..72013449 100644 --- a/src/ArtemisNetClient/AutoRecovering/AutoRecoveringConnection.cs +++ b/src/ArtemisNetClient/AutoRecovering/AutoRecoveringConnection.cs @@ -78,7 +78,7 @@ private AsyncRetryPolicy CreateConnectionRetryPolicy(IRecoveryPolic }, (result, _, context) => { var retryCount = context.GetRetryCount(); - var endpoint =context.GetEndpoint(); + var endpoint = context.GetEndpoint(); if (result.Exception != null) { Log.FailedToEstablishConnection(_logger, endpoint, retryCount, result.Exception); @@ -159,9 +159,13 @@ private void OnConnectionClosed(object sender, ConnectionClosedEventArgs args) { if (args.ClosedByPeer) { - Log.ConnectionClosed(_logger, args.Error); + Log.ConnectionClosedByPeer(_logger, args.Error); _writer.TryWrite(ConnectCommand.Instance); } + else + { + Log.ConnectionClosed(_logger); + } ConnectionClosed?.Invoke(sender, args); } @@ -182,12 +186,10 @@ private Task CreateConnection(CancellationToken cancellationToken) var endpoint = GetNextEndpoint(retryCount); context.SetEndpoint(endpoint); var connectionBuilder = new ConnectionBuilder(_loggerFactory, _messageIdPolicyFactory, _clientIdFactory, _sslSettings, _tcpSettings); + + Log.TryingToEstablishedConnection(_logger, endpoint, retryCount); var connection = await connectionBuilder.CreateAsync(endpoint, ct).ConfigureAwait(false); - - if (retryCount > 0) - { - Log.ConnectionEstablished(_logger, endpoint, retryCount); - } + Log.ConnectionEstablished(_logger, endpoint, retryCount); return connection; }, ctx, cancellationToken); @@ -298,16 +300,26 @@ private static class Log 0, "Main recovery loop threw unexpected exception."); - private static readonly Action _connectionClosed = LoggerMessage.Define( + private static readonly Action _connectionClosedByPeer = LoggerMessage.Define( LogLevel.Warning, 0, "Connection closed due to {error}. Reconnect scheduled."); + + private static readonly Action _connectionClosed = LoggerMessage.Define( + LogLevel.Information, + 0, + "Connection closed. Reconnect won't be scheduled."); private static readonly Action _connectionRecovered = LoggerMessage.Define( LogLevel.Information, 0, "Connection recovered."); + private static readonly Action _tryingToEstablishedConnection = LoggerMessage.Define( + LogLevel.Information, + 0, + "Trying to establish connection to {endpoint}. Attempt: {attempt}."); + public static void ConnectionEstablished(ILogger logger, Endpoint endpoint, int attempt) { if (logger.IsEnabled(LogLevel.Information)) @@ -332,11 +344,19 @@ public static void MainRecoveryLoopException(ILogger logger, Exception e) } } - public static void ConnectionClosed(ILogger logger, string error) + public static void ConnectionClosedByPeer(ILogger logger, string error) { if (logger.IsEnabled(LogLevel.Warning)) { - _connectionClosed(logger, error, null); + _connectionClosedByPeer(logger, error, null); + } + } + + public static void ConnectionClosed(ILogger logger) + { + if (logger.IsEnabled(LogLevel.Information)) + { + _connectionClosed(logger, null); } } @@ -347,6 +367,14 @@ public static void ConnectionRecovered(ILogger logger) _connectionRecovered(logger, null); } } + + public static void TryingToEstablishedConnection(ILogger logger, Endpoint endpoint, int attempt) + { + if (logger.IsEnabled(LogLevel.Information)) + { + _tryingToEstablishedConnection(logger, endpoint.ToString(), attempt, null); + } + } } } } \ No newline at end of file diff --git a/src/ArtemisNetClient/AutoRecovering/AutoRecoveringConsumer.cs b/src/ArtemisNetClient/AutoRecovering/AutoRecoveringConsumer.cs index d4b76365..592808d9 100644 --- a/src/ArtemisNetClient/AutoRecovering/AutoRecoveringConsumer.cs +++ b/src/ArtemisNetClient/AutoRecovering/AutoRecoveringConsumer.cs @@ -13,7 +13,7 @@ internal class AutoRecoveringConsumer : IConsumer, IRecoverable { private readonly ILogger _logger; private readonly ConsumerConfiguration _configuration; - private readonly AsyncManualResetEvent _manualResetEvent = new AsyncManualResetEvent(true); + private readonly AsyncManualResetEvent _manualResetEvent = new(true); private bool _closed; private volatile Exception _failureCause; private volatile IConsumer _consumer; @@ -125,6 +125,7 @@ public async Task TerminateAsync(Exception exception) _closed = true; _failureCause = exception; _manualResetEvent.Set(); + Log.ConsumerTerminated(_logger, exception); await DisposeUnderlyingConsumerSafe(_consumer).ConfigureAwait(false); } @@ -161,28 +162,33 @@ private static async ValueTask DisposeUnderlyingConsumer(IConsumer consumer) private static class Log { private static readonly Action _retryingConsumeAsync = LoggerMessage.Define( - LogLevel.Trace, + LogLevel.Warning, 0, "Retrying receive after Consumer reestablished."); private static readonly Action _consumerRecovered = LoggerMessage.Define( - LogLevel.Trace, + LogLevel.Information, 0, "Consumer recovered."); private static readonly Action _consumerSuspended = LoggerMessage.Define( - LogLevel.Trace, + LogLevel.Warning, 0, "Consumer suspended."); private static readonly Action _consumerResumed = LoggerMessage.Define( - LogLevel.Trace, + LogLevel.Information, 0, "Consumer resumed."); + private static readonly Action _consumerTerminated = LoggerMessage.Define( + LogLevel.Error, + 0, + "Consumer terminated."); + public static void RetryingReceiveAsync(ILogger logger) { - if (logger.IsEnabled(LogLevel.Trace)) + if (logger.IsEnabled(LogLevel.Warning)) { _retryingConsumeAsync(logger, null); } @@ -190,7 +196,7 @@ public static void RetryingReceiveAsync(ILogger logger) public static void ConsumerRecovered(ILogger logger) { - if (logger.IsEnabled(LogLevel.Trace)) + if (logger.IsEnabled(LogLevel.Information)) { _consumerRecovered(logger, null); } @@ -198,7 +204,7 @@ public static void ConsumerRecovered(ILogger logger) public static void ConsumerSuspended(ILogger logger) { - if (logger.IsEnabled(LogLevel.Trace)) + if (logger.IsEnabled(LogLevel.Warning)) { _consumerSuspended(logger, null); } @@ -206,11 +212,19 @@ public static void ConsumerSuspended(ILogger logger) public static void ConsumerResumed(ILogger logger) { - if (logger.IsEnabled(LogLevel.Trace)) + if (logger.IsEnabled(LogLevel.Information)) { _consumerResumed(logger, null); } } + + public static void ConsumerTerminated(ILogger logger, Exception exception) + { + if (logger.IsEnabled(LogLevel.Error)) + { + _consumerTerminated(logger, exception); + } + } } } } \ No newline at end of file diff --git a/src/ArtemisNetClient/AutoRecovering/AutoRecoveringProducer.cs b/src/ArtemisNetClient/AutoRecovering/AutoRecoveringProducer.cs index 7f7746a8..0d17c145 100644 --- a/src/ArtemisNetClient/AutoRecovering/AutoRecoveringProducer.cs +++ b/src/ArtemisNetClient/AutoRecovering/AutoRecoveringProducer.cs @@ -33,14 +33,14 @@ public async Task SendAsync(Message message, Transaction transaction, Cancellati { await TerminateAsync(e).ConfigureAwait(false); - // Producer does not have have permissions to send on specified address + // Producer does not have permissions to send on specified address throw; } catch (ProducerClosedException) { HandleProducerClosed(); await WaitAsync(cancellationToken).ConfigureAwait(false); - Log.RetryingSendAsync(Logger); + Log.RetryingSendAsync(Logger, _configuration.Address); } } } @@ -60,7 +60,7 @@ public void Send(Message message, CancellationToken cancellationToken) { HandleProducerClosed(); Wait(cancellationToken); - Log.RetryingSendAsync(Logger); + Log.RetryingSendAsync(Logger, _configuration.Address); } } } diff --git a/src/ArtemisNetClient/AutoRecovering/AutoRecoveringProducerBase.cs b/src/ArtemisNetClient/AutoRecovering/AutoRecoveringProducerBase.cs index 3e2bc5e1..0dc9f1f3 100644 --- a/src/ArtemisNetClient/AutoRecovering/AutoRecoveringProducerBase.cs +++ b/src/ArtemisNetClient/AutoRecovering/AutoRecoveringProducerBase.cs @@ -10,7 +10,7 @@ namespace ActiveMQ.Artemis.Client.AutoRecovering internal abstract class AutoRecoveringProducerBase : IRecoverable { protected readonly ILogger Logger; - private readonly AsyncManualResetEvent _manualResetEvent = new AsyncManualResetEvent(true); + private readonly AsyncManualResetEvent _manualResetEvent = new(true); private bool _closed; private Exception _failureCause; @@ -78,6 +78,7 @@ public async Task TerminateAsync(Exception exception) _closed = true; _failureCause = exception; _manualResetEvent.Set(); + Log.ProducerTerminated(Logger, exception); await DisposeResourceSafe(UnderlyingResource).ConfigureAwait(false); } @@ -127,37 +128,42 @@ protected void CheckState() protected static class Log { - private static readonly Action _retryingProduceAsync = LoggerMessage.Define( - LogLevel.Trace, + private static readonly Action _retryingProduceAsync = LoggerMessage.Define( + LogLevel.Warning, 0, - "Retrying send after Producer reestablished."); + "Retrying send to address {0} after Producer reestablished."); private static readonly Action _producerRecovered = LoggerMessage.Define( - LogLevel.Trace, + LogLevel.Information, 0, "Producer recovered."); private static readonly Action _producerSuspended = LoggerMessage.Define( - LogLevel.Trace, + LogLevel.Warning, 0, "Producer suspended."); private static readonly Action _producerResumed = LoggerMessage.Define( - LogLevel.Trace, + LogLevel.Information, 0, "Producer resumed."); + + private static readonly Action _producerTerminated = LoggerMessage.Define( + LogLevel.Error, + 0, + "Producer terminated."); - public static void RetryingSendAsync(ILogger logger) + public static void RetryingSendAsync(ILogger logger, string address) { - if (logger.IsEnabled(LogLevel.Trace)) + if (logger.IsEnabled(LogLevel.Warning)) { - _retryingProduceAsync(logger, null); + _retryingProduceAsync(logger, address, null); } } public static void ProducerRecovered(ILogger logger) { - if (logger.IsEnabled(LogLevel.Trace)) + if (logger.IsEnabled(LogLevel.Information)) { _producerRecovered(logger, null); } @@ -165,7 +171,7 @@ public static void ProducerRecovered(ILogger logger) public static void ProducerSuspended(ILogger logger) { - if (logger.IsEnabled(LogLevel.Trace)) + if (logger.IsEnabled(LogLevel.Warning)) { _producerSuspended(logger, null); } @@ -173,11 +179,19 @@ public static void ProducerSuspended(ILogger logger) public static void ProducerResumed(ILogger logger) { - if (logger.IsEnabled(LogLevel.Trace)) + if (logger.IsEnabled(LogLevel.Information)) { _producerResumed(logger, null); } } + + public static void ProducerTerminated(ILogger logger, Exception exception) + { + if (logger.IsEnabled(LogLevel.Error)) + { + _producerTerminated(logger, exception); + } + } } } } \ No newline at end of file diff --git a/src/ArtemisNetClient/AutoRecovering/AutoRecoveringRequestReplyClient.cs b/src/ArtemisNetClient/AutoRecovering/AutoRecoveringRequestReplyClient.cs index b8b273e1..a7819df7 100644 --- a/src/ArtemisNetClient/AutoRecovering/AutoRecoveringRequestReplyClient.cs +++ b/src/ArtemisNetClient/AutoRecovering/AutoRecoveringRequestReplyClient.cs @@ -135,28 +135,28 @@ private static async ValueTask DisposeUnderlyingRpcClient(IRequestReplyClient re private static class Log { private static readonly Action _retryingSendAsync = LoggerMessage.Define( - LogLevel.Trace, + LogLevel.Warning, 0, "Retrying send after RequestReplyClient reestablished."); private static readonly Action _requestReplyClientRecovered = LoggerMessage.Define( - LogLevel.Trace, + LogLevel.Information, 0, "RequestReplyClient recovered."); private static readonly Action _requestReplyClientClientSuspended = LoggerMessage.Define( - LogLevel.Trace, + LogLevel.Warning, 0, "RequestReplyClient suspended."); private static readonly Action _requestReplyClientClientResumed = LoggerMessage.Define( - LogLevel.Trace, + LogLevel.Information, 0, "RequestReplyClient resumed."); public static void RetryingSendAsync(ILogger logger) { - if (logger.IsEnabled(LogLevel.Trace)) + if (logger.IsEnabled(LogLevel.Warning)) { _retryingSendAsync(logger, null); } @@ -164,7 +164,7 @@ public static void RetryingSendAsync(ILogger logger) public static void RequestReplyClientRecovered(ILogger logger) { - if (logger.IsEnabled(LogLevel.Trace)) + if (logger.IsEnabled(LogLevel.Information)) { _requestReplyClientRecovered(logger, null); } @@ -172,7 +172,7 @@ public static void RequestReplyClientRecovered(ILogger logger) public static void RequestReplyClientSuspended(ILogger logger) { - if (logger.IsEnabled(LogLevel.Trace)) + if (logger.IsEnabled(LogLevel.Warning)) { _requestReplyClientClientSuspended(logger, null); } @@ -180,7 +180,7 @@ public static void RequestReplyClientSuspended(ILogger logger) public static void RequestReplyClientResumed(ILogger logger) { - if (logger.IsEnabled(LogLevel.Trace)) + if (logger.IsEnabled(LogLevel.Information)) { _requestReplyClientClientResumed(logger, null); } diff --git a/src/ArtemisNetClient/AutoRecovering/ConnectCommand.cs b/src/ArtemisNetClient/AutoRecovering/ConnectCommand.cs index d5f9a42a..692a6cbd 100644 --- a/src/ArtemisNetClient/AutoRecovering/ConnectCommand.cs +++ b/src/ArtemisNetClient/AutoRecovering/ConnectCommand.cs @@ -3,6 +3,6 @@ internal class ConnectCommand { private ConnectCommand() { } - public static readonly ConnectCommand Instance = new ConnectCommand(); + public static readonly ConnectCommand Instance = new(); } } \ No newline at end of file diff --git a/src/ArtemisNetClient/Consumer.cs b/src/ArtemisNetClient/Consumer.cs index 317da49a..0deac838 100644 --- a/src/ArtemisNetClient/Consumer.cs +++ b/src/ArtemisNetClient/Consumer.cs @@ -35,11 +35,11 @@ public Consumer(ILoggerFactory loggerFactory, ReceiverLink receiverLink, Transac var message = new Message(m); if (_writer.TryWrite(message)) { - Log.MessageBuffered(_logger, message); + Log.MessageBuffered(_logger, message, _receiverLink); } else { - Log.FailedToBufferMessage(_logger, message); + Log.FailedToBufferMessage(_logger, message, _receiverLink); } }); _receiverLink.Closed += OnClosed; @@ -51,7 +51,7 @@ public async ValueTask ReceiveAsync(CancellationToken cancellationToken try { - Log.ReceivingMessage(_logger); + Log.ReceivingMessage(_logger, _receiverLink); return await _reader.ReadAsync(cancellationToken).ConfigureAwait(false); } catch (ChannelClosedException e) when (e.InnerException is ConsumerClosedException) @@ -73,7 +73,7 @@ public async ValueTask AcceptAsync(Message message, Transaction transaction, Can ? (DeliveryState) new TransactionalState { Outcome = new Accepted(), TxnId = txnId } : new Accepted(); _receiverLink.Complete(message.InnerMessage, deliveryState); - Log.MessageAccepted(_logger, message); + Log.MessageAccepted(_logger, message, _receiverLink); } public void Reject(Message message, bool undeliverableHere) @@ -81,7 +81,7 @@ public void Reject(Message message, bool undeliverableHere) CheckState(); _receiverLink.Modify(message.InnerMessage, deliveryFailed: true, undeliverableHere: undeliverableHere); - Log.MessageRejected(_logger, message); + Log.MessageRejected(_logger, message, _receiverLink); } private void CheckState() @@ -100,6 +100,7 @@ private void CheckState() private void OnClosed(IAmqpObject sender, Error error) { var consumerClosedException = GetConsumerClosedException(error); + Log.ConsumerClosed(_logger, _receiverLink, consumerClosedException); _writer.TryComplete(consumerClosedException); } @@ -139,68 +140,82 @@ public async ValueTask DisposeAsync() private static class Log { - private static readonly Action _messageBuffered = LoggerMessage.Define( + private static readonly Action _messageBuffered = LoggerMessage.Define( LogLevel.Trace, 0, - "Message buffered. MessageId: '{0}'."); + "Message buffered. MessageId: '{0}' ConsumerName: '{1}'."); - private static readonly Action _failedToBufferMessage = LoggerMessage.Define( + private static readonly Action _failedToBufferMessage = LoggerMessage.Define( LogLevel.Warning, 0, - "Failed to buffer message. MessageId: '{0}'."); + "Failed to buffer message. MessageId: '{0}' ConsumerName: '{1}'."); - private static readonly Action _receivingMessage = LoggerMessage.Define( + private static readonly Action _receivingMessage = LoggerMessage.Define( LogLevel.Trace, 0, - "Receiving message."); + "Receiving message. ConsumerName: '{0}'."); - private static readonly Action _messageAccepted = LoggerMessage.Define( + private static readonly Action _messageAccepted = LoggerMessage.Define( LogLevel.Trace, 0, - "Message accepted. MessageId: '{0}'."); + "Message accepted. MessageId: '{0}' ConsumerName: '{1}'."); - private static readonly Action _messageRejected = LoggerMessage.Define( + private static readonly Action _messageRejected = LoggerMessage.Define( LogLevel.Trace, 0, - "Message rejected. MessageId: '{0}'."); + "Message rejected. MessageId: '{0}' ConsumerName: '{1}'."); - public static void MessageBuffered(ILogger logger, Message message) + private static readonly Action _consumerClosed = LoggerMessage.Define( + LogLevel.Information, + 0, + "Consumer closed. ConsumerName: '{0}'." + ); + + public static void MessageBuffered(ILogger logger, Message message, ReceiverLink receiverLink) { if (logger.IsEnabled(LogLevel.Trace)) { - _messageBuffered(logger, message.GetMessageId(), null); + _messageBuffered(logger, message.GetMessageId(), receiverLink.Name, null); } } - public static void FailedToBufferMessage(ILogger logger, Message message) + public static void FailedToBufferMessage(ILogger logger, Message message, ReceiverLink receiverLink) { if (logger.IsEnabled(LogLevel.Warning)) { - _failedToBufferMessage(logger, message.GetMessageId(), null); + _failedToBufferMessage(logger, message.GetMessageId(), receiverLink.Name, null); } } - public static void ReceivingMessage(ILogger logger) + public static void ReceivingMessage(ILogger logger, ReceiverLink receiverLink) { if (logger.IsEnabled(LogLevel.Trace)) { - _receivingMessage(logger, null); + _receivingMessage(logger, receiverLink.Name, null); } } - public static void MessageAccepted(ILogger logger, Message message) + public static void MessageAccepted(ILogger logger, Message message, ReceiverLink receiverLink) { if (logger.IsEnabled(LogLevel.Trace)) { - _messageAccepted(logger, message.GetMessageId(), null); + _messageAccepted(logger, message.GetMessageId(), receiverLink.Name, null); } } - public static void MessageRejected(ILogger logger, Message message) + public static void MessageRejected(ILogger logger, Message message, ReceiverLink receiverLink) { if (logger.IsEnabled(LogLevel.Trace)) { - _messageRejected(logger, message.GetMessageId(), null); + _messageRejected(logger, message.GetMessageId(), receiverLink.Name, null); + } + } + + public static void ConsumerClosed(ILogger logger, ReceiverLink receiverLink, ConsumerClosedException exception) + { + if (logger.IsEnabled(LogLevel.Information)) + { + _consumerClosed(logger, receiverLink.Name, exception); } } }