Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The StreamTransformer must remove a IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE header from the output message after resource is closed. #9792

Open
tasosz opened this issue Jan 27, 2025 · 4 comments

Comments

@tasosz
Copy link

tasosz commented Jan 27, 2025

In what version(s) of Spring Integration are you seeing this issue?

Observed with 6.1.0 and 6.3.4

Describe the bug

ObjectMapper from JacksonUtils.enhancedObjectMapper() used by KafkaProducerMessageHandler (through new DefaultKafkaHeaderMapper()) is not using spring.jackson.serialization.FAIL_ON_EMPTY_BEANS=false

To Reproduce

Add spring.jackson.serialization.FAIL_ON_EMPTY_BEANS=false to Spring Boot (tested with 3.1.0 & 3.3.4) application.properties.

Using SFTP adapter to input file (and using StreamTransformer) and then producing output to a kafka topic.
Due to a closable session (for SFTP), the MessageHeaders on the call to DefaultKafkaHeaderMapper.fromHeaders(MessageHeaders headers, Headers target), contains an extra non-String closableResource header. This is expected per SI documentation.
But this header object then fails to serialise!
In 6.1.0 it produces a debug log, but in 6.3.4 it produces an error log (both with stacktrace).

Expected behavior

Using the spring.jackson.serialization.FAIL_ON_EMPTY_BEANS=false application property, the logic should have bypassed the exception throwing, but this property has no effect.

Sample

Log entries from 6.3.4:

 - 2025-01-24T12:19:30.405Z ERROR [hexapole-file-poller,,,] 11897 --- [hexapole-file-poller] [   scheduling-1] o.s.k.support.DefaultKafkaHeaderMapper   : Could not map closeableResource with type org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession

com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class java.lang.Object and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession["clientInstance"]->org.springframework.integration.sftp.session.DefaultSftpSessionFactory$ConcurrentSftpClient["clientSession"]->org.apache.sshd.client.session.ClientSessionImpl["futureLock"])
@tasosz tasosz added status: waiting-for-triage The issue need to be evaluated and its future decided type: bug labels Jan 27, 2025
@artembilan
Copy link
Member

That's correct.
That new DefaultKafkaHeaderMapper() has no knowledge about Spring Boot auto-configuration.
That's why the mentioned KafkaProducerMessageHandler has a setter to be able to inject any custom KafkaHeaderMapper:

	/**
	 * Set the header mapper to use.
	 * @param headerMapper the mapper; can be null to disable header mapping.
	 */
	public void setHeaderMapper(KafkaHeaderMapper headerMapper) {

where you indeed ca declare a DefaultKafkaHeaderMapper with respective constructor based on the auto-configured ObjectMapper:

	/**
	 * Construct an instance with the provided object mapper and default header patterns
	 * for outbound headers; all inbound headers are mapped. The patterns are applied in
	 * order, stopping on the first match (positive or negative). Patterns are negated by
	 * preceding them with "!". The default pattern list is
	 * {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in
	 * {@link KafkaHeaders} are never mapped as headers since they represent data in
	 * consumer/producer records.
	 * @param objectMapper the object mapper.
	 * @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
	 */
	public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) {

On the other hand the mentioned Spring Integration closeableResource header is definitely not for a serialization.
Consider to use a HeaderFilter before publishing to Kafka.

I also think that the mentioned StreamTransformer could be improved to remove that header after using it.

Please, confirm how my suggestions work for you - and we can repurpose this issue for the StreamTransformer improvement.

@artembilan artembilan added status: waiting-for-reporter Needs a feedback from the reporter and removed status: waiting-for-triage The issue need to be evaluated and its future decided labels Jan 27, 2025
@tasosz
Copy link
Author

tasosz commented Jan 27, 2025

Hi Artem,

Yes, I had seen setHeaderMapper, but it felt a bit involved to manually set this.

But your confirmation that closeableResource header is definitely not for serialization as well as the suggestion of using a HeaderFilter were excellent.
I have now added a HeaderFilter before the StreamTransformer and a custom transformer I had before Kafka, and it worked really well!

For reference I have followed this: Transformer-Header Filter

Thanks!

@artembilan
Copy link
Member

but it felt a bit involved to manually set this.

Not sure why, because that is typical for any property settings on the bean.
In this case that is really not too much:

@Bean
@ServiceActivator
KafkaProducerMessageHandler<String, String> kafkaProducerMessageHandler(KafkaTemplate<String, String> kafkaTemplate, ObjectMapper objectMapper) {
    KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate);
    handler.setHeaderMapper(new DefaultKafkaHeaderMapper(objectMapper));
    return handler;
}

Thank you for confirming about the HeaderFilter!
But you have to add it after that StreamTransformer, so resource is going to be closed when stream is processed over there.
Otherwise that may lead to leaks.

Either way, changing the subject of the issue to remove closeableResource heade when StreamTransformer has it closed.

@artembilan artembilan added this to the 6.5.0-M2 milestone Jan 27, 2025
@artembilan artembilan changed the title "spring.jackson.serialization.FAIL_ON_EMPTY_BEANS=false" has no effect with KafkaProducerMessageHandler/DefaultKafkaHeaderMapper The StreamTransformer must remove a IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE header from the output message after resource is closed. Jan 27, 2025
@tasosz
Copy link
Author

tasosz commented Jan 27, 2025

In this case that is really not too much:

Calling the setter isn't much, yes. If you just pass it any ObjectMapper.
What I meant was that to replicate the ObjectMapper the way KafkaProducerMessageHandler does internally and then specify the FAIL_ON_EMPTY_BEANS on it, and after that, pass it to the setter, is a bit more involved and doesn't feel very productive.

But you have to add it after that StreamTransformer

Sorry, I had made a typo!
What I meant to type was: "I have now added a HeaderFilter between the StreamTransformer and a custom transformer"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants