Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#6 Added Error Metrics #12

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import com.deviceinsight.kafka.health.config.KafkaHealthProperties;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
Expand All @@ -18,6 +20,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health;

Expand All @@ -40,10 +43,13 @@
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import io.micrometer.core.instrument.MeterRegistry;

public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {

private static final Logger logger = LoggerFactory.getLogger(KafkaConsumingHealthIndicator.class);
private static final String CONSUMER_GROUP_PREFIX = "health-check-";
private static final String KAFKA_EXCEPTION = "kafka-exceptions";
antrix190 marked this conversation as resolved.
Show resolved Hide resolved

private final Consumer<String, String> consumer;

Expand All @@ -60,15 +66,18 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {

private KafkaCommunicationResult kafkaCommunicationResult;

@Autowired
antrix190 marked this conversation as resolved.
Show resolved Hide resolved
private MeterRegistry meterRegistry;

public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties,
Map<String, Object> kafkaConsumerProperties, Map<String, Object> kafkaProducerProperties) {
Map<String, Object> kafkaConsumerProperties, Map<String, Object> kafkaProducerProperties,
@Autowired MeterRegistry meterRegistry) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The @Autowired annotation is also unnecessary here. This class should be initialized explicitly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what i'm trying to do here is create a bean in MetricsConfig and autowiring it here through construtor injection. Let me know if you want me to change this approach.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I understand. But the class should be instantiated as described in the README rather than by autowiring dependencies, since this might cause issues for some projects that have multiple MeterRegistry beans, for example.


logger.info("Initializing kafka health check with properties: {}", kafkaHealthProperties);
this.topic = kafkaHealthProperties.getTopic();
this.sendReceiveTimeout = kafkaHealthProperties.getSendReceiveTimeout();
this.pollTimeout = kafkaHealthProperties.getPollTimeout();
this.subscriptionTimeout = kafkaHealthProperties.getSubscriptionTimeout();

Map<String, Object> kafkaConsumerPropertiesCopy = new HashMap<>(kafkaConsumerProperties);

setConsumerGroup(kafkaConsumerPropertiesCopy);
Expand All @@ -81,20 +90,20 @@ public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties

this.executor = Executors.newSingleThreadExecutor();
this.running = new AtomicBoolean(true);
this.cache =
Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeout).build();
this.cache = Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeout).build();

this.kafkaCommunicationResult = KafkaCommunicationResult
.failure(new RejectedExecutionException("Kafka Health Check is starting."));

this.kafkaCommunicationResult =
KafkaCommunicationResult.failure(new RejectedExecutionException("Kafka Health Check is starting."));
this.meterRegistry = meterRegistry;
}

@PostConstruct
void subscribeAndSendMessage() throws InterruptedException {
subscribeToTopic();

if (kafkaCommunicationResult.isFailure()) {
throw new BeanInitializationException("Kafka health check failed",
kafkaCommunicationResult.getException());
throw new BeanInitializationException("Kafka health check failed", kafkaCommunicationResult.getException());
}

executor.submit(() -> {
Expand Down Expand Up @@ -187,7 +196,7 @@ private String sendKafkaMessage() throws InterruptedException, ExecutionExceptio
protected void doHealthCheck(Health.Builder builder) {
String expectedMessage = sendMessage();
if (expectedMessage == null) {
goDown(builder);
goDown(builder, kafkaCommunicationResult.getException());
return;
}

Expand All @@ -201,19 +210,18 @@ protected void doHealthCheck(Health.Builder builder) {
} else if (System.currentTimeMillis() - startTime > sendReceiveTimeout.toMillis()) {

if (kafkaCommunicationResult.isFailure()) {
goDown(builder);
goDown(builder, kafkaCommunicationResult.getException());
} else {
builder.down(new TimeoutException(
"Sending and receiving took longer than " + sendReceiveTimeout ))
builder.down(new TimeoutException("Sending and receiving took longer than " + sendReceiveTimeout))
.withDetail("topic", topic);
}

return;
}
}
}

private void goDown(Health.Builder builder) {
builder.down(kafkaCommunicationResult.getException()).withDetail("topic", topic);
private void goDown(Health.Builder builder, Exception exception) {
builder.down(exception).withDetail("topic", topic);
meterRegistry.counter(KAFKA_EXCEPTION).increment();
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.deviceinsight.kafka.health;
package com.deviceinsight.kafka.health.config;

import java.time.Duration;

Expand Down Expand Up @@ -43,7 +43,7 @@ public void setSubscriptionTimeout(Duration subscriptionTimeout) {

@Override
public String toString() {
return "KafkaHealthProperties{" + "topic='" + topic + '\'' + ", sendReceiveTimeout=" + sendReceiveTimeout +
", pollTimeout=" + pollTimeout + ", subscriptionTimeout=" + subscriptionTimeout + '}';
return "KafkaHealthProperties{" + "topic='" + topic + '\'' + ", sendReceiveTimeout=" + sendReceiveTimeout
+ ", pollTimeout=" + pollTimeout + ", subscriptionTimeout=" + subscriptionTimeout + '}';
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert the changes to this file as they're unnecessary.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this file to separate package. Health.config
Should this be reverted.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please

Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.deviceinsight.kafka.health.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.actuate.autoconfigure.metrics.MeterRegistryCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.config.MeterFilter;

@Configuration
public class MetricsConfig {

@Value("${host}")
private String host;

@Value("${service}")
private String service;

@Value("${region}")
private String region;

@Bean
MeterRegistryCustomizer<MeterRegistry> meterRegistry() {
return registry -> registry.config().commonTags("host", host, "service", service, "region", region)
.meterFilter(MeterFilter.deny(id -> {
String uri = id.getTag("uri");
return uri != null && uri.startsWith("/actuator");
})).meterFilter(MeterFilter.deny(id -> {
String uri = id.getTag("uri");
return uri != null && uri.contains("favicon");
}));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you filter out the /actuator and favicon URLs? This library doesn't have anything to do with spring-webmvc, so I guess this would be the task of the service that uses the library. Maybe the class can be deleted entirely?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay i will remove the favicon and actuator url filtering but this Class defines a bean configuration to return MetricRegistryCustomizer. Not required to get the host region etc as a default.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do the values for ${host}, ${service} and ${region} come from?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Essentially from properties file.

}
}
antrix190 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import static com.deviceinsight.kafka.health.KafkaConsumingHealthIndicatorTest.TOPIC;
import static org.assertj.core.api.Assertions.assertThat;

import kafka.server.KafkaServer;
import com.deviceinsight.kafka.health.config.KafkaHealthProperties;
import com.deviceinsight.kafka.health.config.MetricsConfig;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.awaitility.Awaitility;
Expand All @@ -20,15 +22,22 @@
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.core.BrokerAddress;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;

import kafka.server.KafkaServer;

@ExtendWith(SpringExtension.class)
@EmbeddedKafka(topics = TOPIC)
@ContextConfiguration(classes = {MetricsConfig.class})
public class KafkaConsumingHealthIndicatorTest {

static final String TOPIC = "health-checks";
Expand All @@ -38,6 +47,8 @@ public class KafkaConsumingHealthIndicatorTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;

private MeterRegistry meterRegistry;

@BeforeEach
public void setUp() {
Map<String, Object> consumerConfigs =
Expand All @@ -46,6 +57,7 @@ public void setUp() {
new StringDeserializer()).createConsumer();
consumer.subscribe(Collections.singletonList(TOPIC));
consumer.poll(Duration.ofSeconds(1));
meterRegistry = new SimpleMeterRegistry();
}

@AfterEach
Expand All @@ -66,7 +78,7 @@ public void kafkaIsDown() throws Exception {

final KafkaConsumingHealthIndicator healthIndicator =
new KafkaConsumingHealthIndicator(kafkaHealthProperties, kafkaProperties.buildConsumerProperties(),
kafkaProperties.buildProducerProperties());
kafkaProperties.buildProducerProperties(), meterRegistry);
healthIndicator.subscribeAndSendMessage();

Health health = healthIndicator.health();
Expand Down