Skip to content

Commit

Permalink
Use try with resources when creating a producer
Browse files Browse the repository at this point in the history
  • Loading branch information
pramithas committed Jan 23, 2025
1 parent 8000d04 commit 5399ac8
Showing 1 changed file with 101 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -468,16 +468,14 @@ public void testMetricsReporterAutoGeneratedClientId() {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer());

assertEquals(2, producer.metrics.reporters().size());

MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) producer.metrics.reporters().stream()
.filter(reporter -> reporter instanceof MockMetricsReporter).findFirst().get();
assertEquals(producer.getClientId(), mockMetricsReporter.clientId);
try (KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer())) {
assertEquals(2, producer.metrics.reporters().size());

producer.close();
MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) producer.metrics.reporters().stream()
.filter(reporter -> reporter instanceof MockMetricsReporter).findFirst().get();
assertEquals(producer.getClientId(), mockMetricsReporter.clientId);
}
}

@Test
Expand All @@ -486,9 +484,9 @@ public void testDisableJmxAndClientTelemetryReporter() {
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, "");
props.setProperty(ProducerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
assertTrue(producer.metrics.reporters().isEmpty());
producer.close();
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer())) {
assertTrue(producer.metrics.reporters().isEmpty());
}
}

@Test
Expand All @@ -497,21 +495,21 @@ public void testExplicitlyOnlyEnableJmxReporter() {
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, "org.apache.kafka.common.metrics.JmxReporter");
props.setProperty(ProducerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
assertEquals(1, producer.metrics.reporters().size());
assertInstanceOf(JmxReporter.class, producer.metrics.reporters().get(0));
producer.close();
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer())) {
assertEquals(1, producer.metrics.reporters().size());
assertInstanceOf(JmxReporter.class, producer.metrics.reporters().get(0));
}
}

@Test
public void testExplicitlyOnlyEnableClientTelemetryReporter() {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, "");
KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
assertEquals(1, producer.metrics.reporters().size());
assertInstanceOf(ClientTelemetryReporter.class, producer.metrics.reporters().get(0));
producer.close();
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer())) {
assertEquals(1, producer.metrics.reporters().size());
assertInstanceOf(ClientTelemetryReporter.class, producer.metrics.reporters().get(0));
}
}

@Test
Expand All @@ -523,15 +521,26 @@ public void testConstructorWithSerializers() {

@Test
public void testNoSerializerProvided() {

// Test 1: No serializer provided in producerProps
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
assertThrows(ConfigException.class, () -> new KafkaProducer(producerProps));

assertThrows(ConfigException.class, () -> {
try (KafkaProducer<?, ?> producer = new KafkaProducer<>(producerProps)) {
// KafkaProducer will be closed automatically after the block
}
});

// Test 2: Invalid config (missing serializer)
final Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");

// Invalid value null for configuration key.serializer: must be non-null.
assertThrows(ConfigException.class, () -> new KafkaProducer<String, String>(configs));
assertThrows(ConfigException.class, () -> {
try (KafkaProducer<String, String> producer = new KafkaProducer<>(configs)) {
// KafkaProducer will be closed automatically after the block
}
});
}

@Test
Expand Down Expand Up @@ -604,15 +613,14 @@ public void testInterceptorConstructClose() {
props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MockProducerInterceptor.class.getName());
props.setProperty(MockProducerInterceptor.APPEND_STRING_PROP, "something");

KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer());
assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
assertEquals(0, MockProducerInterceptor.CLOSE_COUNT.get());

// Cluster metadata will only be updated on calling onSend.
assertNull(MockProducerInterceptor.CLUSTER_META.get());
try (KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer())) {
assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
assertEquals(0, MockProducerInterceptor.CLOSE_COUNT.get());

producer.close();
// Cluster metadata will only be updated on calling onSend.
assertNull(MockProducerInterceptor.CLUSTER_META.get());
}
assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
assertEquals(1, MockProducerInterceptor.CLOSE_COUNT.get());
} finally {
Expand Down Expand Up @@ -652,12 +660,12 @@ public void testPartitionerClose() {
MockPartitioner.resetCounters();
props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, MockPartitioner.class.getName());

KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer());
assertEquals(1, MockPartitioner.INIT_COUNT.get());
assertEquals(0, MockPartitioner.CLOSE_COUNT.get());
try (KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer())) {
assertEquals(1, MockPartitioner.INIT_COUNT.get());
assertEquals(0, MockPartitioner.CLOSE_COUNT.get());
}

producer.close();
assertEquals(1, MockPartitioner.INIT_COUNT.get());
assertEquals(1, MockPartitioner.CLOSE_COUNT.get());
} finally {
Expand Down Expand Up @@ -1875,11 +1883,10 @@ public void testClientInstanceIdInvalidTimeout() {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");

KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
Exception exception = assertThrows(IllegalArgumentException.class, () -> producer.clientInstanceId(Duration.ofMillis(-1)));
assertEquals("The timeout cannot be negative.", exception.getMessage());

producer.close();
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer())) {
Exception exception = assertThrows(IllegalArgumentException.class, () -> producer.clientInstanceId(Duration.ofMillis(-1)));
assertEquals("The timeout cannot be negative.", exception.getMessage());
}
}

@Test
Expand All @@ -1888,11 +1895,10 @@ public void testClientInstanceIdNoTelemetryReporterRegistered() {
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ProducerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");

KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
Exception exception = assertThrows(IllegalStateException.class, () -> producer.clientInstanceId(Duration.ofMillis(0)));
assertEquals("Telemetry is not enabled. Set config `enable.metrics.push` to `true`.", exception.getMessage());

producer.close();
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer())) {
Exception exception = assertThrows(IllegalStateException.class, () -> producer.clientInstanceId(Duration.ofMillis(0)));
assertEquals("Telemetry is not enabled. Set config `enable.metrics.push` to `true`.", exception.getMessage());
}
}

private void verifyInvalidGroupMetadata(ConsumerGroupMetadata groupMetadata) {
Expand Down Expand Up @@ -2168,15 +2174,14 @@ public void testProducerJmxPrefix() throws Exception {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.put("client.id", "client-1");

KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer());

MBeanServer server = ManagementFactory.getPlatformMBeanServer();
MetricName testMetricName = producer.metrics.metricName("test-metric",
"grp1", "test metric");
producer.metrics.addMetric(testMetricName, new Avg());
assertNotNull(server.getObjectInstance(new ObjectName("kafka.producer:type=grp1,client-id=client-1")));
producer.close();
try (KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer())) {
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
MetricName testMetricName = producer.metrics.metricName("test-metric",
"grp1", "test metric");
producer.metrics.addMetric(testMetricName, new Avg());
assertNotNull(server.getObjectInstance(new ObjectName("kafka.producer:type=grp1,client-id=client-1")));
}
}

private static ProducerMetadata newMetadata(long refreshBackoffMs, long refreshBackoffMaxMs, long expirationMs) {
Expand Down Expand Up @@ -2587,27 +2592,27 @@ public void testSubscribingCustomMetricsDoesntAffectProducerMetrics() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");

KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer());

Map<MetricName, KafkaMetric> customMetrics = customMetrics();
customMetrics.forEach((name, metric) -> producer.registerMetricForSubscription(metric));
try (KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer())) {
Map<MetricName, KafkaMetric> customMetrics = customMetrics();
customMetrics.forEach((name, metric) -> producer.registerMetricForSubscription(metric));

Map<MetricName, ? extends Metric> producerMetrics = producer.metrics();
customMetrics.forEach((name, metric) -> assertFalse(producerMetrics.containsKey(name)));
Map<MetricName, ? extends Metric> producerMetrics = producer.metrics();
customMetrics.forEach((name, metric) -> assertFalse(producerMetrics.containsKey(name)));
}
}

@Test
public void testUnSubscribingNonExisingMetricsDoesntCauseError() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");

KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer());

Map<MetricName, KafkaMetric> customMetrics = customMetrics();
//Metrics never registered but removed should not cause an error
customMetrics.forEach((name, metric) -> assertDoesNotThrow(() -> producer.unregisterMetricFromSubscription(metric)));
try (KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer())) {
Map<MetricName, KafkaMetric> customMetrics = customMetrics();
//Metrics never registered but removed should not cause an error
customMetrics.forEach((name, metric) -> assertDoesNotThrow(() -> producer.unregisterMetricFromSubscription(metric)));
}
}

@Test
Expand All @@ -2616,12 +2621,13 @@ public void testSubscribingCustomMetricsWithSameNameDoesntAffectProducerMetrics(
appender.setClassLogger(KafkaProducer.class, Level.DEBUG);
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer());
KafkaMetric existingMetricToAdd = (KafkaMetric) producer.metrics().entrySet().iterator().next().getValue();
producer.registerMetricForSubscription(existingMetricToAdd);
final String expectedMessage = String.format("Skipping registration for metric %s. Existing producer metrics cannot be overwritten.", existingMetricToAdd.metricName());
assertTrue(appender.getMessages().stream().anyMatch(m -> m.contains(expectedMessage)));
try (KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer())) {
KafkaMetric existingMetricToAdd = (KafkaMetric) producer.metrics().entrySet().iterator().next().getValue();
producer.registerMetricForSubscription(existingMetricToAdd);
final String expectedMessage = String.format("Skipping registration for metric %s. Existing producer metrics cannot be overwritten.", existingMetricToAdd.metricName());
assertTrue(appender.getMessages().stream().anyMatch(m -> m.contains(expectedMessage)));
}
}
}

Expand All @@ -2631,12 +2637,13 @@ public void testUnsubscribingCustomMetricWithSameNameAsExistingMetricDoesntAffec
appender.setClassLogger(KafkaProducer.class, Level.DEBUG);
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer());
KafkaMetric existingMetricToRemove = (KafkaMetric) producer.metrics().entrySet().iterator().next().getValue();
producer.unregisterMetricFromSubscription(existingMetricToRemove);
final String expectedMessage = String.format("Skipping unregistration for metric %s. Existing producer metrics cannot be removed.", existingMetricToRemove.metricName());
assertTrue(appender.getMessages().stream().anyMatch(m -> m.contains(expectedMessage)));
try (KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer())) {
KafkaMetric existingMetricToRemove = (KafkaMetric) producer.metrics().entrySet().iterator().next().getValue();
producer.unregisterMetricFromSubscription(existingMetricToRemove);
final String expectedMessage = String.format("Skipping unregistration for metric %s. Existing producer metrics cannot be removed.", existingMetricToRemove.metricName());
assertTrue(appender.getMessages().stream().anyMatch(m -> m.contains(expectedMessage)));
}
}
}

Expand All @@ -2649,12 +2656,13 @@ public void testShouldOnlyCallMetricReporterMetricChangeOnceWithExistingProducer

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer());
KafkaMetric existingMetric = (KafkaMetric) producer.metrics().entrySet().iterator().next().getValue();
producer.registerMetricForSubscription(existingMetric);
// This test would fail without the check as the exising metric is registered in the producer on startup
Mockito.verify(clientTelemetryReporter, atMostOnce()).metricChange(existingMetric);
try (KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer())) {
KafkaMetric existingMetric = (KafkaMetric) producer.metrics().entrySet().iterator().next().getValue();
producer.registerMetricForSubscription(existingMetric);
// This test would fail without the check as the exising metric is registered in the producer on startup
Mockito.verify(clientTelemetryReporter, atMostOnce()).metricChange(existingMetric);
}
}
}

Expand All @@ -2667,12 +2675,13 @@ public void testShouldNotCallMetricReporterMetricRemovalWithExistingProducerMetr

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer());
KafkaMetric existingMetric = (KafkaMetric) producer.metrics().entrySet().iterator().next().getValue();
producer.unregisterMetricFromSubscription(existingMetric);
// This test would fail without the check as the exising metric is registered in the consumer on startup
Mockito.verify(clientTelemetryReporter, never()).metricRemoval(existingMetric);
try (KafkaProducer<String, String> producer = new KafkaProducer<>(
props, new StringSerializer(), new StringSerializer())) {
KafkaMetric existingMetric = (KafkaMetric) producer.metrics().entrySet().iterator().next().getValue();
producer.unregisterMetricFromSubscription(existingMetric);
// This test would fail without the check as the exising metric is registered in the consumer on startup
Mockito.verify(clientTelemetryReporter, never()).metricRemoval(existingMetric);
}
}
}

Expand Down

0 comments on commit 5399ac8

Please sign in to comment.