From a9d772ff973cbeebb95181d1293da193d5b12cc9 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Tue, 19 Sep 2023 13:26:51 -0400 Subject: [PATCH] GH-2662: Kafka binder DLQ root cause message - Becasue NestedRuntimeException from Spring Framework core 6.x removed the getMessage method that included the detailMessage with cause in it, the Kafka binder DLQ records no longer include the cause message. Fix this issue by including the cause in the exception message. See this comment for more details: - https://github.com/spring-cloud/spring-cloud-stream/issues/2662#issuecomment-1722849892 This fix is based on the following Spring Kafka commit. - https://github.com/spring-projects/spring-kafka/commit/6f585058a60090f2940c563bca0a4b9208b7e44e Resolves https://github.com/spring-cloud/spring-cloud-stream/issues/2662 --- .../kafka/KafkaMessageChannelBinder.java | 26 ++++++++++++++++--- .../stream/binder/kafka/KafkaBinderTests.java | 2 +- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java index c06e4a958..f4fe0f877 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java @@ -1228,7 +1228,7 @@ private void handleRecordForDlq(ConsumerRecord record, ConsumerD if (message.getPayload() instanceof Throwable throwablePayload) { throwable = throwablePayload; - + String exceptionMessage = buildMessage(throwable, throwable.getCause()); HeaderMode headerMode = properties.getHeaderMode(); if (headerMode == null || HeaderMode.headers.equals(headerMode)) { @@ -1248,7 +1248,6 @@ private void handleRecordForDlq(ConsumerRecord record, ConsumerD .getBytes(StandardCharsets.UTF_8))); kafkaHeaders.add(new RecordHeader(X_EXCEPTION_FQCN, throwable .getClass().getName().getBytes(StandardCharsets.UTF_8))); - String exceptionMessage = throwable.getMessage(); if (exceptionMessage != null) { kafkaHeaders.add(new RecordHeader(X_EXCEPTION_MESSAGE, exceptionMessage.getBytes(StandardCharsets.UTF_8))); @@ -1271,8 +1270,7 @@ else if (HeaderMode.embeddedHeaders.equals(headerMode)) { record.timestampType().toString()); messageValues.put(X_EXCEPTION_FQCN, throwable.getClass().getName()); - messageValues.put(X_EXCEPTION_MESSAGE, - throwable.getMessage()); + messageValues.put(X_EXCEPTION_MESSAGE, exceptionMessage); messageValues.put(X_EXCEPTION_STACKTRACE, getStackTraceAsString(throwable)); @@ -1323,6 +1321,26 @@ else if (HeaderMode.embeddedHeaders.equals(headerMode)) { } } + @Nullable + private String buildMessage(Throwable exception, Throwable cause) { + String message = exception.getMessage(); + if (!exception.equals(cause)) { + if (message != null) { + message = message + "; "; + } + String causeMsg = cause.getMessage(); + if (causeMsg != null) { + if (message != null) { + message = message + causeMsg; + } + else { + message = causeMsg; + } + } + } + return message; + } + @SuppressWarnings("unchecked") @Nullable private KafkaAwareTransactionManager transactionManager(@Nullable String beanName) { diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java index bb44daa9f..701af2102 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java @@ -1300,7 +1300,7 @@ else if (!HeaderMode.none.equals(headerMode)) { else { assertThat(new String((byte[]) receivedMessage.getHeaders() .get(KafkaMessageChannelBinder.X_EXCEPTION_MESSAGE))).startsWith( - "Dispatcher failed to deliver Message"); + "Dispatcher failed to deliver Message; fail"); } assertThat(receivedMessage.getHeaders()