From 3b5bb2e0d9a89c3b9ae9152c99268460ca7ed442 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Fri, 26 Jul 2024 15:57:12 -0400 Subject: [PATCH] Remove redundant `delaySubscription` from `FunctionConfiguration` Related to: https://github.com/spring-projects/spring-integration/issues/9362 After the fix in Spring Integration: https://github.com/spring-projects/spring-integration/commit/bdcb856a9081bc091d56be16d51f0f4d561bc9ce we ended up in a deadlock situation with a `beginPublishingTrigger` in the `FunctionConfiguration` used for the `delaySubscription()` on an original `Publisher`. The `FluxMessageChannel` uses its own `delaySubscription()` until the channel has its subscribers. Apparently the logic before was with errors, so the `FluxMessageChannel` was marked as active even if its subscriber is not ready yet, leading to famous `Dispatcher does not have subscribers` error. So, looks like this `beginPublishingTrigger` was introduced back in days in Spring Cloud Stream to mitigate that situation until we really emit a `BindingCreatedEvent`. The deadlock (and the flaw, respectively) is with the `setupBindingTrigger()` method implementation where `FluxMessageChannel` now "really" delays a subscription to the provided `Publisher`, therefore not triggering that `Mono.create()` fulfilment immediately. The `BindingCreatedEvent` arrives earlier, than we have a subscriber on the channel, but `triggerRef.get()` is `null`, so we don't `success()` it and in the end don't subscribe to an original `Publisher` since `delaySubscription()` on it is never completed. Since `FunctionConfiguration` fully relies on `IntegrationFlow.from(Publisher)`, which ends up with the mentioned `FluxMessageChannel.subscribeTo()` and its own `delaySubscription()` (which, in turn, apparently fixed now), we don't need our own `delaySubscription()` any more. Therefore the fix in this PR is to propose to remove `beginPublishingTrigger` logic altogether. --- .../function/FunctionConfiguration.java | 32 ++++--------------- 1 file changed, 6 insertions(+), 26 deletions(-) diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java index 888a7de28..c051e777b 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java @@ -40,7 +40,6 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoSink; import reactor.util.function.Tuples; import org.springframework.beans.BeansException; @@ -65,7 +64,6 @@ import org.springframework.cloud.function.context.message.MessageUtils; import org.springframework.cloud.stream.binder.BinderFactory; import org.springframework.cloud.stream.binder.BinderHeaders; -import org.springframework.cloud.stream.binder.BindingCreatedEvent; import org.springframework.cloud.stream.binder.ConsumerProperties; import org.springframework.cloud.stream.binder.PartitionHandler; import org.springframework.cloud.stream.binder.ProducerProperties; @@ -129,6 +127,7 @@ * @author Byungjun You * @author Ivan Shapoval * @author Patrik Péter Süli + * @author Artem Bilan * @since 2.1 */ @Lazy(false) @@ -224,8 +223,6 @@ InitializingBean supplierInitializer(FunctionCatalog functionCatalog, StreamFunc functionWrapper = functionCatalog.lookup(proxyFactory.getFunctionDefinition(), contentTypes.toArray(new String[0])); } - Publisher beginPublishingTrigger = setupBindingTrigger(context); - if (!functionProperties.isComposeFrom() && !functionProperties.isComposeTo()) { String integrationFlowName = proxyFactory.getFunctionDefinition() + "_integrationflow"; @@ -239,7 +236,7 @@ InitializingBean supplierInitializer(FunctionCatalog functionCatalog, StreamFunc if (functionWrapper != null) { IntegrationFlow integrationFlow = integrationFlowFromProvidedSupplier(new PartitionAwareFunctionWrapper(functionWrapper, context, producerProperties), - beginPublishingTrigger, pollable, context, taskScheduler, producerProperties, outputName) + pollable, context, taskScheduler, producerProperties, outputName) .route(Message.class, message -> { if (message.getHeaders().get("spring.cloud.stream.sendto.destination") != null) { String destinationName = (String) message.getHeaders().get("spring.cloud.stream.sendto.destination"); @@ -253,7 +250,7 @@ InitializingBean supplierInitializer(FunctionCatalog functionCatalog, StreamFunc } else { IntegrationFlow integrationFlow = integrationFlowFromProvidedSupplier(new PartitionAwareFunctionWrapper(supplier, context, producerProperties), - beginPublishingTrigger, pollable, context, taskScheduler, producerProperties, outputName) + pollable, context, taskScheduler, producerProperties, outputName) .channel(c -> c.direct()) .fluxTransform((Function>, ? extends Publisher>) function) .route(Message.class, message -> { @@ -274,26 +271,9 @@ InitializingBean supplierInitializer(FunctionCatalog functionCatalog, StreamFunc }; } - - /* - * Creates a publishing trigger to ensure Supplier does not begin publishing until binding is created - */ - private Publisher setupBindingTrigger(GenericApplicationContext context) { - AtomicReference> triggerRef = new AtomicReference<>(); - Publisher beginPublishingTrigger = Mono.create(triggerRef::set); - context.addApplicationListener(event -> { - if (event instanceof BindingCreatedEvent) { - if (triggerRef.get() != null) { - triggerRef.get().success(); - } - } - }); - return beginPublishingTrigger; - } - @SuppressWarnings({ "rawtypes", "unchecked" }) private IntegrationFlowBuilder integrationFlowFromProvidedSupplier(Supplier supplier, - Publisher beginPublishingTrigger, PollableBean pollable, GenericApplicationContext context, + PollableBean pollable, GenericApplicationContext context, TaskScheduler taskScheduler, ProducerProperties producerProperties, String bindingName) { IntegrationFlowBuilder integrationFlowBuilder; @@ -309,8 +289,8 @@ private IntegrationFlowBuilder integrationFlowFromProvidedSupplier(Supplier s if (pollable == null && reactive) { Publisher publisher = (Publisher) supplier.get(); publisher = publisher instanceof Mono - ? ((Mono) publisher).delaySubscription(beginPublishingTrigger).map(this::wrapToMessageIfNecessary) - : ((Flux) publisher).delaySubscription(beginPublishingTrigger).map(this::wrapToMessageIfNecessary); + ? ((Mono) publisher).map(this::wrapToMessageIfNecessary) + : ((Flux) publisher).map(this::wrapToMessageIfNecessary); integrationFlowBuilder = IntegrationFlow.from(publisher);