Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend logging #493

Merged
merged 1 commit into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public async Task SendAsync(string address, RoutingType? routingType, Message me
{
HandleProducerClosed();
await WaitAsync(cancellationToken).ConfigureAwait(false);
Log.RetryingSendAsync(Logger);
Log.RetryingSendAsync(Logger, address);
}
}
}
Expand All @@ -52,7 +52,7 @@ public void Send(string address, RoutingType? routingType, Message message, Canc
{
HandleProducerClosed();
Wait(cancellationToken);
Log.RetryingSendAsync(Logger);
Log.RetryingSendAsync(Logger, address);
}
}
}
Expand Down
48 changes: 38 additions & 10 deletions src/ArtemisNetClient/AutoRecovering/AutoRecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private AsyncRetryPolicy<IConnection> CreateConnectionRetryPolicy(IRecoveryPolic
}, (result, _, context) =>
{
var retryCount = context.GetRetryCount();
var endpoint =context.GetEndpoint();
var endpoint = context.GetEndpoint();
if (result.Exception != null)
{
Log.FailedToEstablishConnection(_logger, endpoint, retryCount, result.Exception);
Expand Down Expand Up @@ -159,9 +159,13 @@ private void OnConnectionClosed(object sender, ConnectionClosedEventArgs args)
{
if (args.ClosedByPeer)
{
Log.ConnectionClosed(_logger, args.Error);
Log.ConnectionClosedByPeer(_logger, args.Error);
_writer.TryWrite(ConnectCommand.Instance);
}
else
{
Log.ConnectionClosed(_logger);
}

ConnectionClosed?.Invoke(sender, args);
}
Expand All @@ -182,12 +186,10 @@ private Task<IConnection> CreateConnection(CancellationToken cancellationToken)
var endpoint = GetNextEndpoint(retryCount);
context.SetEndpoint(endpoint);
var connectionBuilder = new ConnectionBuilder(_loggerFactory, _messageIdPolicyFactory, _clientIdFactory, _sslSettings, _tcpSettings);

Log.TryingToEstablishedConnection(_logger, endpoint, retryCount);
var connection = await connectionBuilder.CreateAsync(endpoint, ct).ConfigureAwait(false);

if (retryCount > 0)
{
Log.ConnectionEstablished(_logger, endpoint, retryCount);
}
Log.ConnectionEstablished(_logger, endpoint, retryCount);

return connection;
}, ctx, cancellationToken);
Expand Down Expand Up @@ -298,16 +300,26 @@ private static class Log
0,
"Main recovery loop threw unexpected exception.");

private static readonly Action<ILogger, string, Exception> _connectionClosed = LoggerMessage.Define<string>(
private static readonly Action<ILogger, string, Exception> _connectionClosedByPeer = LoggerMessage.Define<string>(
LogLevel.Warning,
0,
"Connection closed due to {error}. Reconnect scheduled.");

private static readonly Action<ILogger, Exception> _connectionClosed = LoggerMessage.Define(
LogLevel.Information,
0,
"Connection closed. Reconnect won't be scheduled.");

private static readonly Action<ILogger, Exception> _connectionRecovered = LoggerMessage.Define(
LogLevel.Information,
0,
"Connection recovered.");

private static readonly Action<ILogger, string, int, Exception> _tryingToEstablishedConnection = LoggerMessage.Define<string, int>(
LogLevel.Information,
0,
"Trying to establish connection to {endpoint}. Attempt: {attempt}.");

public static void ConnectionEstablished(ILogger logger, Endpoint endpoint, int attempt)
{
if (logger.IsEnabled(LogLevel.Information))
Expand All @@ -332,11 +344,19 @@ public static void MainRecoveryLoopException(ILogger logger, Exception e)
}
}

public static void ConnectionClosed(ILogger logger, string error)
public static void ConnectionClosedByPeer(ILogger logger, string error)
{
if (logger.IsEnabled(LogLevel.Warning))
{
_connectionClosed(logger, error, null);
_connectionClosedByPeer(logger, error, null);
}
}

public static void ConnectionClosed(ILogger logger)
{
if (logger.IsEnabled(LogLevel.Information))
{
_connectionClosed(logger, null);
}
}

Expand All @@ -347,6 +367,14 @@ public static void ConnectionRecovered(ILogger logger)
_connectionRecovered(logger, null);
}
}

public static void TryingToEstablishedConnection(ILogger logger, Endpoint endpoint, int attempt)
{
if (logger.IsEnabled(LogLevel.Information))
{
_tryingToEstablishedConnection(logger, endpoint.ToString(), attempt, null);
}
}
}
}
}
32 changes: 23 additions & 9 deletions src/ArtemisNetClient/AutoRecovering/AutoRecoveringConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ internal class AutoRecoveringConsumer : IConsumer, IRecoverable
{
private readonly ILogger<AutoRecoveringConsumer> _logger;
private readonly ConsumerConfiguration _configuration;
private readonly AsyncManualResetEvent _manualResetEvent = new AsyncManualResetEvent(true);
private readonly AsyncManualResetEvent _manualResetEvent = new(true);
private bool _closed;
private volatile Exception _failureCause;
private volatile IConsumer _consumer;
Expand Down Expand Up @@ -125,6 +125,7 @@ public async Task TerminateAsync(Exception exception)
_closed = true;
_failureCause = exception;
_manualResetEvent.Set();
Log.ConsumerTerminated(_logger, exception);
await DisposeUnderlyingConsumerSafe(_consumer).ConfigureAwait(false);
}

Expand Down Expand Up @@ -161,56 +162,69 @@ private static async ValueTask DisposeUnderlyingConsumer(IConsumer consumer)
private static class Log
{
private static readonly Action<ILogger, Exception> _retryingConsumeAsync = LoggerMessage.Define(
LogLevel.Trace,
LogLevel.Warning,
0,
"Retrying receive after Consumer reestablished.");

private static readonly Action<ILogger, Exception> _consumerRecovered = LoggerMessage.Define(
LogLevel.Trace,
LogLevel.Information,
0,
"Consumer recovered.");

private static readonly Action<ILogger, Exception> _consumerSuspended = LoggerMessage.Define(
LogLevel.Trace,
LogLevel.Warning,
0,
"Consumer suspended.");

private static readonly Action<ILogger, Exception> _consumerResumed = LoggerMessage.Define(
LogLevel.Trace,
LogLevel.Information,
0,
"Consumer resumed.");

private static readonly Action<ILogger, Exception> _consumerTerminated = LoggerMessage.Define(
LogLevel.Error,
0,
"Consumer terminated.");

public static void RetryingReceiveAsync(ILogger logger)
{
if (logger.IsEnabled(LogLevel.Trace))
if (logger.IsEnabled(LogLevel.Warning))
{
_retryingConsumeAsync(logger, null);
}
}

public static void ConsumerRecovered(ILogger logger)
{
if (logger.IsEnabled(LogLevel.Trace))
if (logger.IsEnabled(LogLevel.Information))
{
_consumerRecovered(logger, null);
}
}

public static void ConsumerSuspended(ILogger logger)
{
if (logger.IsEnabled(LogLevel.Trace))
if (logger.IsEnabled(LogLevel.Warning))
{
_consumerSuspended(logger, null);
}
}

public static void ConsumerResumed(ILogger logger)
{
if (logger.IsEnabled(LogLevel.Trace))
if (logger.IsEnabled(LogLevel.Information))
{
_consumerResumed(logger, null);
}
}

public static void ConsumerTerminated(ILogger logger, Exception exception)
{
if (logger.IsEnabled(LogLevel.Error))
{
_consumerTerminated(logger, exception);
}
}
}
}
}
6 changes: 3 additions & 3 deletions src/ArtemisNetClient/AutoRecovering/AutoRecoveringProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ public async Task SendAsync(Message message, Transaction transaction, Cancellati
{
await TerminateAsync(e).ConfigureAwait(false);

// Producer does not have have permissions to send on specified address
// Producer does not have permissions to send on specified address
throw;
}
catch (ProducerClosedException)
{
HandleProducerClosed();
await WaitAsync(cancellationToken).ConfigureAwait(false);
Log.RetryingSendAsync(Logger);
Log.RetryingSendAsync(Logger, _configuration.Address);
}
}
}
Expand All @@ -60,7 +60,7 @@ public void Send(Message message, CancellationToken cancellationToken)
{
HandleProducerClosed();
Wait(cancellationToken);
Log.RetryingSendAsync(Logger);
Log.RetryingSendAsync(Logger, _configuration.Address);
}
}
}
Expand Down
40 changes: 27 additions & 13 deletions src/ArtemisNetClient/AutoRecovering/AutoRecoveringProducerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace ActiveMQ.Artemis.Client.AutoRecovering
internal abstract class AutoRecoveringProducerBase : IRecoverable
{
protected readonly ILogger Logger;
private readonly AsyncManualResetEvent _manualResetEvent = new AsyncManualResetEvent(true);
private readonly AsyncManualResetEvent _manualResetEvent = new(true);
private bool _closed;
private Exception _failureCause;

Expand Down Expand Up @@ -78,6 +78,7 @@ public async Task TerminateAsync(Exception exception)
_closed = true;
_failureCause = exception;
_manualResetEvent.Set();
Log.ProducerTerminated(Logger, exception);
await DisposeResourceSafe(UnderlyingResource).ConfigureAwait(false);
}

Expand Down Expand Up @@ -127,57 +128,70 @@ protected void CheckState()

protected static class Log
{
private static readonly Action<ILogger, Exception> _retryingProduceAsync = LoggerMessage.Define(
LogLevel.Trace,
private static readonly Action<ILogger, string, Exception> _retryingProduceAsync = LoggerMessage.Define<string>(
LogLevel.Warning,
0,
"Retrying send after Producer reestablished.");
"Retrying send to address {0} after Producer reestablished.");

private static readonly Action<ILogger, Exception> _producerRecovered = LoggerMessage.Define(
LogLevel.Trace,
LogLevel.Information,
0,
"Producer recovered.");

private static readonly Action<ILogger, Exception> _producerSuspended = LoggerMessage.Define(
LogLevel.Trace,
LogLevel.Warning,
0,
"Producer suspended.");

private static readonly Action<ILogger, Exception> _producerResumed = LoggerMessage.Define(
LogLevel.Trace,
LogLevel.Information,
0,
"Producer resumed.");

private static readonly Action<ILogger, Exception> _producerTerminated = LoggerMessage.Define(
LogLevel.Error,
0,
"Producer terminated.");

public static void RetryingSendAsync(ILogger logger)
public static void RetryingSendAsync(ILogger logger, string address)
{
if (logger.IsEnabled(LogLevel.Trace))
if (logger.IsEnabled(LogLevel.Warning))
{
_retryingProduceAsync(logger, null);
_retryingProduceAsync(logger, address, null);
}
}

public static void ProducerRecovered(ILogger logger)
{
if (logger.IsEnabled(LogLevel.Trace))
if (logger.IsEnabled(LogLevel.Information))
{
_producerRecovered(logger, null);
}
}

public static void ProducerSuspended(ILogger logger)
{
if (logger.IsEnabled(LogLevel.Trace))
if (logger.IsEnabled(LogLevel.Warning))
{
_producerSuspended(logger, null);
}
}

public static void ProducerResumed(ILogger logger)
{
if (logger.IsEnabled(LogLevel.Trace))
if (logger.IsEnabled(LogLevel.Information))
{
_producerResumed(logger, null);
}
}

public static void ProducerTerminated(ILogger logger, Exception exception)
{
if (logger.IsEnabled(LogLevel.Error))
{
_producerTerminated(logger, exception);
}
}
}
}
}
Loading
Loading