Skip to content

Commit

Permalink
Add KafkaErrorEvent.Handled to prevent built-in logging [#83]
Browse files Browse the repository at this point in the history
  • Loading branch information
BEagle1984 committed May 10, 2020
1 parent 47a8660 commit 4f10eff
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 6 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<Description>Silverback is a simple but feature-rich framework to build reactive/event-driven applications or microservices.</Description>
<License>MIT</License>
<Copyright>Copyright (c) 2020 Sergio Aquilini</Copyright>
<VersionSuffix>-rc.12</VersionSuffix>
<VersionSuffix>-rc.13</VersionSuffix>
<BaseVersion>2.1.1$(VersionSuffix)</BaseVersion>
<ProjectUrl>https://beagle1984.github.io/silverback/</ProjectUrl>
<RepositoryUrl>https://github.com/BEagle1984/silverback/</RepositoryUrl>
Expand Down
2 changes: 1 addition & 1 deletion docs/_docs/4-Kafka/403-kafka-events.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Event | Description
`KafkaPartitionsAssignedEvent` | The event fired when a new consumer group partition assignment has been received by a consumer. Corresponding to each of this events there will be a `KafkaPartitionsRevokedEvent`. This event is important if you need to manually set the start offset (see the sample code below in the samples section).
`KafkaPartitionsRevokedEvent` | The event fired prior to a group partition assignment being revoked. Corresponding to each of this events there will be a `KafkaPartitionsAssignedEvent`.
`KafkaOffsetsCommittedEvent` | The event fired to report the result of the offset commits.
`KafkaErrorEvent` | The event fired when an error is reported by the `Confluent.Kafka.Consumer` (e.g. connection failures or all brokers down). Note that the system (either the Kafka client itself or Silverback) will try to automatically recover from all errors automatically, so these errors have to be considered purely informational.
`KafkaErrorEvent` | The event fired when an error is reported by the `Confluent.Kafka.Consumer` (e.g. connection failures or all brokers down). Note that the system (either the Kafka client itself or Silverback) will try to automatically recover from all errors automatically, so these errors have to be considered purely informational. The subscriber can set the `Handled` property to prevent Silverback to log the error (useful if you are logging it already).
`KafkaStatisticsEvent` | The event fired when statistics are received. Statistics are provided as a JSON formatted string as defined in the [librdkafka documentation](https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md). You can enable statistics and set the statistics interval using the `StatisticsIntervalMs` configuration parameter (disabled by default).


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

namespace Silverback.Messaging.Broker
{
// TODO: Test
internal class KafkaEventsHandler
{
private readonly IServiceProvider _serviceProvider;
Expand Down Expand Up @@ -96,15 +97,29 @@ public void SetConsumerEventsHandlers(
// Ignore errors if not consuming anymore
// (lidrdkafka randomly throws some "brokers are down"
// while disconnecting)
if (!ownerConsumer.IsConsuming) return;
if (!ownerConsumer.IsConsuming)
return;
var kafkaErrorEvent = new KafkaErrorEvent(error);
try
{
CreateScopeAndPublishEvent(kafkaErrorEvent);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in KafkaErrorEvent subscriber.");
}
if (kafkaErrorEvent.Handled)
return;
_logger.Log(
error.IsFatal ? LogLevel.Critical : LogLevel.Error,
"Error in Kafka consumer: {error} (topic(s): {topics})",
error,
ownerConsumer.Endpoint.Names);

This comment has been minimized.

Copy link
@msallin

msallin May 11, 2020

Collaborator

Now that I'm using this, I realize that I want to have the very same log message but with another log level. This is currently not possible because I have no access to "ownerConsumer". Can you set this on the KafkaErrorEvent?

This comment has been minimized.

Copy link
@msallin

msallin May 11, 2020

Collaborator

Furthermore, I can not distinguish between a consumer and a producer error. Subtype for each or a enum/flag on the Event?

This comment has been minimized.

Copy link
@BEagle1984

BEagle1984 May 11, 2020

Author Owner

Oh, that's easy, I'll do it.

(I actually thought I had it already in all events 🤔)

This comment has been minimized.

Copy link
@BEagle1984

BEagle1984 May 11, 2020

Author Owner

Furthermore, I can not distinguish between a consumer and a producer error. Subtype for each or a enum/flag on the Event?

This event is just for the consumer errors. The producer doesn’t fire them.

CreateScopeAndPublishEvent(new KafkaErrorEvent(error));
})
.SetStatisticsHandler((_, statistics) =>
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (c) 2020 Sergio Aquilini
// This code is licensed under MIT license (see LICENSE file for details)

using Confluent.Kafka;

namespace Silverback.Messaging.Messages
{
/// <summary>
Expand All @@ -11,7 +13,7 @@ namespace Silverback.Messaging.Messages
/// </summary>
public class KafkaErrorEvent : IKafkaEvent
{
public KafkaErrorEvent(Confluent.Kafka.Error error)
public KafkaErrorEvent(Error error)
{
Error = error;
}
Expand All @@ -20,6 +22,12 @@ public KafkaErrorEvent(Confluent.Kafka.Error error)
/// Gets an <see cref="Confluent.Kafka.Error" /> instance representing the error that occured when
/// interacting with the Kafka broker or the librdkafka library.
/// </summary>
public Confluent.Kafka.Error Error { get; }
public Error Error { get; }

/// <summary>
/// Gets or sets a value indicating whether this error has been handled in the subscriber and doesn't need to be logged or
/// handled in any other way by Silverback.
/// </summary>
public bool Handled { get; set; }
}
}

0 comments on commit 4f10eff

Please sign in to comment.