Skip to content

Commit

Permalink
Add option to configure Kafka read isolation level
Browse files Browse the repository at this point in the history
  • Loading branch information
SoerenHenning committed Dec 19, 2023
1 parent 1ab593c commit 5753c8d
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
package com.dynatrace.research.shufflebench;

import com.sun.net.httpserver.HttpServer;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.eclipse.microprofile.config.Config;
Expand All @@ -26,9 +22,8 @@
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class LatencyExporter {
Expand All @@ -39,6 +34,8 @@ public class LatencyExporter {

private final String kafkaTopic;

private final String kafkaIsolationLevel;

private final String prometheusExporterPath;

private final int prometheusExporterPort;
Expand All @@ -51,6 +48,7 @@ public LatencyExporter() {
prometheusExporterPort = config.getValue("prometheus.exporter.port", Integer.class);
kafkaBootstrapServers = config.getValue("kafka.bootstrap.servers", String.class);
kafkaTopic = config.getValue("kafka.topic", String.class);
kafkaIsolationLevel = config.getOptionalValue("kafka.isolation.level", String.class).orElse(null);
}

public void startBlocking() {
Expand All @@ -77,6 +75,9 @@ public void startBlocking() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaBootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "shufflebench-latency-exporter");
if (this.kafkaIsolationLevel != null) {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, this.kafkaIsolationLevel);
}
try (KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new ByteArrayDeserializer())) {
consumer.subscribe(List.of(this.kafkaTopic));
while (keepRunning) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
kafka.bootstrap.servers=localhost:9092
kafka.topic=output
#kafka.isolation.level=read_uncommitted/read_committed
prometheus.exporter.path=/metrics
prometheus.exporter.port=8080

0 comments on commit 5753c8d

Please sign in to comment.