From 5399ac828cd424b111a7df0a90a007137a40f6ad Mon Sep 17 00:00:00 2001 From: pramithas Date: Thu, 23 Jan 2025 23:11:36 +0545 Subject: [PATCH] Use try with resources when creating a producer --- .../clients/producer/KafkaProducerTest.java | 193 +++++++++--------- 1 file changed, 101 insertions(+), 92 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 0f497b7936d4c..af504454864e5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -468,16 +468,14 @@ public void testMetricsReporterAutoGeneratedClientId() { Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); - KafkaProducer producer = new KafkaProducer<>( - props, new StringSerializer(), new StringSerializer()); - - assertEquals(2, producer.metrics.reporters().size()); - - MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) producer.metrics.reporters().stream() - .filter(reporter -> reporter instanceof MockMetricsReporter).findFirst().get(); - assertEquals(producer.getClientId(), mockMetricsReporter.clientId); + try (KafkaProducer producer = new KafkaProducer<>( + props, new StringSerializer(), new StringSerializer())) { + assertEquals(2, producer.metrics.reporters().size()); - producer.close(); + MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) producer.metrics.reporters().stream() + .filter(reporter -> reporter instanceof MockMetricsReporter).findFirst().get(); + assertEquals(producer.getClientId(), mockMetricsReporter.clientId); + } } @Test @@ -486,9 +484,9 @@ public void testDisableJmxAndClientTelemetryReporter() { props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, ""); props.setProperty(ProducerConfig.ENABLE_METRICS_PUSH_CONFIG, "false"); - KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); - assertTrue(producer.metrics.reporters().isEmpty()); - producer.close(); + try (KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer())) { + assertTrue(producer.metrics.reporters().isEmpty()); + } } @Test @@ -497,10 +495,10 @@ public void testExplicitlyOnlyEnableJmxReporter() { props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, "org.apache.kafka.common.metrics.JmxReporter"); props.setProperty(ProducerConfig.ENABLE_METRICS_PUSH_CONFIG, "false"); - KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); - assertEquals(1, producer.metrics.reporters().size()); - assertInstanceOf(JmxReporter.class, producer.metrics.reporters().get(0)); - producer.close(); + try (KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer())) { + assertEquals(1, producer.metrics.reporters().size()); + assertInstanceOf(JmxReporter.class, producer.metrics.reporters().get(0)); + } } @Test @@ -508,10 +506,10 @@ public void testExplicitlyOnlyEnableClientTelemetryReporter() { Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, ""); - KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); - assertEquals(1, producer.metrics.reporters().size()); - assertInstanceOf(ClientTelemetryReporter.class, producer.metrics.reporters().get(0)); - producer.close(); + try (KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer())) { + assertEquals(1, producer.metrics.reporters().size()); + assertInstanceOf(ClientTelemetryReporter.class, producer.metrics.reporters().get(0)); + } } @Test @@ -523,15 +521,26 @@ public void testConstructorWithSerializers() { @Test public void testNoSerializerProvided() { + + // Test 1: No serializer provided in producerProps Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); - assertThrows(ConfigException.class, () -> new KafkaProducer(producerProps)); + assertThrows(ConfigException.class, () -> { + try (KafkaProducer producer = new KafkaProducer<>(producerProps)) { + // KafkaProducer will be closed automatically after the block + } + }); + + // Test 2: Invalid config (missing serializer) final Map configs = new HashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); - // Invalid value null for configuration key.serializer: must be non-null. - assertThrows(ConfigException.class, () -> new KafkaProducer(configs)); + assertThrows(ConfigException.class, () -> { + try (KafkaProducer producer = new KafkaProducer<>(configs)) { + // KafkaProducer will be closed automatically after the block + } + }); } @Test @@ -604,15 +613,14 @@ public void testInterceptorConstructClose() { props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MockProducerInterceptor.class.getName()); props.setProperty(MockProducerInterceptor.APPEND_STRING_PROP, "something"); - KafkaProducer producer = new KafkaProducer<>( - props, new StringSerializer(), new StringSerializer()); - assertEquals(1, MockProducerInterceptor.INIT_COUNT.get()); - assertEquals(0, MockProducerInterceptor.CLOSE_COUNT.get()); - - // Cluster metadata will only be updated on calling onSend. - assertNull(MockProducerInterceptor.CLUSTER_META.get()); + try (KafkaProducer producer = new KafkaProducer<>( + props, new StringSerializer(), new StringSerializer())) { + assertEquals(1, MockProducerInterceptor.INIT_COUNT.get()); + assertEquals(0, MockProducerInterceptor.CLOSE_COUNT.get()); - producer.close(); + // Cluster metadata will only be updated on calling onSend. + assertNull(MockProducerInterceptor.CLUSTER_META.get()); + } assertEquals(1, MockProducerInterceptor.INIT_COUNT.get()); assertEquals(1, MockProducerInterceptor.CLOSE_COUNT.get()); } finally { @@ -652,12 +660,12 @@ public void testPartitionerClose() { MockPartitioner.resetCounters(); props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, MockPartitioner.class.getName()); - KafkaProducer producer = new KafkaProducer<>( - props, new StringSerializer(), new StringSerializer()); - assertEquals(1, MockPartitioner.INIT_COUNT.get()); - assertEquals(0, MockPartitioner.CLOSE_COUNT.get()); + try (KafkaProducer producer = new KafkaProducer<>( + props, new StringSerializer(), new StringSerializer())) { + assertEquals(1, MockPartitioner.INIT_COUNT.get()); + assertEquals(0, MockPartitioner.CLOSE_COUNT.get()); + } - producer.close(); assertEquals(1, MockPartitioner.INIT_COUNT.get()); assertEquals(1, MockPartitioner.CLOSE_COUNT.get()); } finally { @@ -1875,11 +1883,10 @@ public void testClientInstanceIdInvalidTimeout() { Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); - KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); - Exception exception = assertThrows(IllegalArgumentException.class, () -> producer.clientInstanceId(Duration.ofMillis(-1))); - assertEquals("The timeout cannot be negative.", exception.getMessage()); - - producer.close(); + try (KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer())) { + Exception exception = assertThrows(IllegalArgumentException.class, () -> producer.clientInstanceId(Duration.ofMillis(-1))); + assertEquals("The timeout cannot be negative.", exception.getMessage()); + } } @Test @@ -1888,11 +1895,10 @@ public void testClientInstanceIdNoTelemetryReporterRegistered() { props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.setProperty(ProducerConfig.ENABLE_METRICS_PUSH_CONFIG, "false"); - KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); - Exception exception = assertThrows(IllegalStateException.class, () -> producer.clientInstanceId(Duration.ofMillis(0))); - assertEquals("Telemetry is not enabled. Set config `enable.metrics.push` to `true`.", exception.getMessage()); - - producer.close(); + try (KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer())) { + Exception exception = assertThrows(IllegalStateException.class, () -> producer.clientInstanceId(Duration.ofMillis(0))); + assertEquals("Telemetry is not enabled. Set config `enable.metrics.push` to `true`.", exception.getMessage()); + } } private void verifyInvalidGroupMetadata(ConsumerGroupMetadata groupMetadata) { @@ -2168,15 +2174,14 @@ public void testProducerJmxPrefix() throws Exception { props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.put("client.id", "client-1"); - KafkaProducer producer = new KafkaProducer<>( - props, new StringSerializer(), new StringSerializer()); - - MBeanServer server = ManagementFactory.getPlatformMBeanServer(); - MetricName testMetricName = producer.metrics.metricName("test-metric", - "grp1", "test metric"); - producer.metrics.addMetric(testMetricName, new Avg()); - assertNotNull(server.getObjectInstance(new ObjectName("kafka.producer:type=grp1,client-id=client-1"))); - producer.close(); + try (KafkaProducer producer = new KafkaProducer<>( + props, new StringSerializer(), new StringSerializer())) { + MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + MetricName testMetricName = producer.metrics.metricName("test-metric", + "grp1", "test metric"); + producer.metrics.addMetric(testMetricName, new Avg()); + assertNotNull(server.getObjectInstance(new ObjectName("kafka.producer:type=grp1,client-id=client-1"))); + } } private static ProducerMetadata newMetadata(long refreshBackoffMs, long refreshBackoffMaxMs, long expirationMs) { @@ -2587,14 +2592,14 @@ public void testSubscribingCustomMetricsDoesntAffectProducerMetrics() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); - KafkaProducer producer = new KafkaProducer<>( - props, new StringSerializer(), new StringSerializer()); - - Map customMetrics = customMetrics(); - customMetrics.forEach((name, metric) -> producer.registerMetricForSubscription(metric)); + try (KafkaProducer producer = new KafkaProducer<>( + props, new StringSerializer(), new StringSerializer())) { + Map customMetrics = customMetrics(); + customMetrics.forEach((name, metric) -> producer.registerMetricForSubscription(metric)); - Map producerMetrics = producer.metrics(); - customMetrics.forEach((name, metric) -> assertFalse(producerMetrics.containsKey(name))); + Map producerMetrics = producer.metrics(); + customMetrics.forEach((name, metric) -> assertFalse(producerMetrics.containsKey(name))); + } } @Test @@ -2602,12 +2607,12 @@ public void testUnSubscribingNonExisingMetricsDoesntCauseError() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); - KafkaProducer producer = new KafkaProducer<>( - props, new StringSerializer(), new StringSerializer()); - - Map customMetrics = customMetrics(); - //Metrics never registered but removed should not cause an error - customMetrics.forEach((name, metric) -> assertDoesNotThrow(() -> producer.unregisterMetricFromSubscription(metric))); + try (KafkaProducer producer = new KafkaProducer<>( + props, new StringSerializer(), new StringSerializer())) { + Map customMetrics = customMetrics(); + //Metrics never registered but removed should not cause an error + customMetrics.forEach((name, metric) -> assertDoesNotThrow(() -> producer.unregisterMetricFromSubscription(metric))); + } } @Test @@ -2616,12 +2621,13 @@ public void testSubscribingCustomMetricsWithSameNameDoesntAffectProducerMetrics( appender.setClassLogger(KafkaProducer.class, Level.DEBUG); Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); - KafkaProducer producer = new KafkaProducer<>( - props, new StringSerializer(), new StringSerializer()); - KafkaMetric existingMetricToAdd = (KafkaMetric) producer.metrics().entrySet().iterator().next().getValue(); - producer.registerMetricForSubscription(existingMetricToAdd); - final String expectedMessage = String.format("Skipping registration for metric %s. Existing producer metrics cannot be overwritten.", existingMetricToAdd.metricName()); - assertTrue(appender.getMessages().stream().anyMatch(m -> m.contains(expectedMessage))); + try (KafkaProducer producer = new KafkaProducer<>( + props, new StringSerializer(), new StringSerializer())) { + KafkaMetric existingMetricToAdd = (KafkaMetric) producer.metrics().entrySet().iterator().next().getValue(); + producer.registerMetricForSubscription(existingMetricToAdd); + final String expectedMessage = String.format("Skipping registration for metric %s. Existing producer metrics cannot be overwritten.", existingMetricToAdd.metricName()); + assertTrue(appender.getMessages().stream().anyMatch(m -> m.contains(expectedMessage))); + } } } @@ -2631,12 +2637,13 @@ public void testUnsubscribingCustomMetricWithSameNameAsExistingMetricDoesntAffec appender.setClassLogger(KafkaProducer.class, Level.DEBUG); Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); - KafkaProducer producer = new KafkaProducer<>( - props, new StringSerializer(), new StringSerializer()); - KafkaMetric existingMetricToRemove = (KafkaMetric) producer.metrics().entrySet().iterator().next().getValue(); - producer.unregisterMetricFromSubscription(existingMetricToRemove); - final String expectedMessage = String.format("Skipping unregistration for metric %s. Existing producer metrics cannot be removed.", existingMetricToRemove.metricName()); - assertTrue(appender.getMessages().stream().anyMatch(m -> m.contains(expectedMessage))); + try (KafkaProducer producer = new KafkaProducer<>( + props, new StringSerializer(), new StringSerializer())) { + KafkaMetric existingMetricToRemove = (KafkaMetric) producer.metrics().entrySet().iterator().next().getValue(); + producer.unregisterMetricFromSubscription(existingMetricToRemove); + final String expectedMessage = String.format("Skipping unregistration for metric %s. Existing producer metrics cannot be removed.", existingMetricToRemove.metricName()); + assertTrue(appender.getMessages().stream().anyMatch(m -> m.contains(expectedMessage))); + } } } @@ -2649,12 +2656,13 @@ public void testShouldOnlyCallMetricReporterMetricChangeOnceWithExistingProducer Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); - KafkaProducer producer = new KafkaProducer<>( - props, new StringSerializer(), new StringSerializer()); - KafkaMetric existingMetric = (KafkaMetric) producer.metrics().entrySet().iterator().next().getValue(); - producer.registerMetricForSubscription(existingMetric); - // This test would fail without the check as the exising metric is registered in the producer on startup - Mockito.verify(clientTelemetryReporter, atMostOnce()).metricChange(existingMetric); + try (KafkaProducer producer = new KafkaProducer<>( + props, new StringSerializer(), new StringSerializer())) { + KafkaMetric existingMetric = (KafkaMetric) producer.metrics().entrySet().iterator().next().getValue(); + producer.registerMetricForSubscription(existingMetric); + // This test would fail without the check as the exising metric is registered in the producer on startup + Mockito.verify(clientTelemetryReporter, atMostOnce()).metricChange(existingMetric); + } } } @@ -2667,12 +2675,13 @@ public void testShouldNotCallMetricReporterMetricRemovalWithExistingProducerMetr Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); - KafkaProducer producer = new KafkaProducer<>( - props, new StringSerializer(), new StringSerializer()); - KafkaMetric existingMetric = (KafkaMetric) producer.metrics().entrySet().iterator().next().getValue(); - producer.unregisterMetricFromSubscription(existingMetric); - // This test would fail without the check as the exising metric is registered in the consumer on startup - Mockito.verify(clientTelemetryReporter, never()).metricRemoval(existingMetric); + try (KafkaProducer producer = new KafkaProducer<>( + props, new StringSerializer(), new StringSerializer())) { + KafkaMetric existingMetric = (KafkaMetric) producer.metrics().entrySet().iterator().next().getValue(); + producer.unregisterMetricFromSubscription(existingMetric); + // This test would fail without the check as the exising metric is registered in the consumer on startup + Mockito.verify(clientTelemetryReporter, never()).metricRemoval(existingMetric); + } } }