Skip to content

Commit

Permalink
GH-3033: Register ObservationRegistry for Dynamic MessageChannels
Browse files Browse the repository at this point in the history
Fixes: gh-3033

* ensure `ObservationRegistry` is registered on dynamically created `MessageChannel` instances in `StreamBridge`
  • Loading branch information
agustino-lim committed Nov 8, 2024
1 parent e14bc1c commit 0ebc94c
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,21 @@ void dynamicProducerDestination() {
assertThat(new String(message.getPayload())).isEqualTo("JOHN DOE");
}

@Test
void test_3033() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
EmptyConfiguration.class)).web(WebApplicationType.NONE).run(
"--spring.cloud.stream.source=outputA",
"--spring.jmx.enabled=false")) {
StreamBridge streamBridge = context.getBean(StreamBridge.class);
streamBridge.send("outputA", MessageBuilder.withPayload("A").build());

OutputDestination output = context.getBean(OutputDestination.class);
assertThat(output.receive(1000, "outputA").getHeaders().containsKey("traceparent")).isTrue();
}
}

@EnableAutoConfiguration
public static class DynamicProducerDestinationConfig {
@Bean
Expand Down
8 changes: 8 additions & 0 deletions core/spring-cloud-stream-test-binder/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.Arrays;
import java.util.List;

import io.micrometer.observation.ObservationRegistry;

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.cloud.stream.binder.Binder;
Expand Down Expand Up @@ -107,4 +109,8 @@ public TestChannelBinderProvisioner springIntegrationProvisioner() {
return new TestChannelBinderProvisioner();
}

@Bean
public ObservationRegistry observationRegistry() {
return ObservationRegistry.create();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.stream.StreamSupport;

import io.micrometer.context.ContextSnapshotFactory;
import io.micrometer.observation.ObservationRegistry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
Expand All @@ -46,6 +47,7 @@

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.boot.autoconfigure.AutoConfiguration;
Expand Down Expand Up @@ -151,8 +153,10 @@ public class FunctionConfiguration {
@Bean
public StreamBridge streamBridgeUtils(FunctionCatalog functionCatalog,
BindingServiceProperties bindingServiceProperties, ConfigurableApplicationContext applicationContext,
@Nullable NewDestinationBindingCallback callback) {
return new StreamBridge(functionCatalog, bindingServiceProperties, applicationContext, callback);
@Nullable NewDestinationBindingCallback callback,
ObjectProvider<ObservationRegistry> observationRegistries) {
return new StreamBridge(functionCatalog, bindingServiceProperties, applicationContext, callback,
observationRegistries);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@

import io.micrometer.context.ContextExecutorService;
import io.micrometer.context.ContextSnapshotFactory;
import io.micrometer.observation.ObservationRegistry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionRegistration;
Expand Down Expand Up @@ -122,6 +124,7 @@ public final class StreamBridge implements StreamOperations, SmartInitializingSi

private static final ReentrantLock lock = new ReentrantLock();

private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;
/**
*
* @param functionCatalog instance of {@link FunctionCatalog}
Expand All @@ -130,7 +133,7 @@ public final class StreamBridge implements StreamOperations, SmartInitializingSi
*/
@SuppressWarnings("serial")
StreamBridge(FunctionCatalog functionCatalog, BindingServiceProperties bindingServiceProperties,
ConfigurableApplicationContext applicationContext, @Nullable NewDestinationBindingCallback destinationBindingCallback) {
ConfigurableApplicationContext applicationContext, @Nullable NewDestinationBindingCallback destinationBindingCallback, ObjectProvider<ObservationRegistry> observationRegistries) {
this.executorService = Executors.newCachedThreadPool();
Assert.notNull(functionCatalog, "'functionCatalog' must not be null");
Assert.notNull(applicationContext, "'applicationContext' must not be null");
Expand All @@ -155,6 +158,7 @@ protected boolean removeEldestEntry(Map.Entry<String, MessageChannel> eldest) {
};
this.functionInvocationHelper = applicationContext.getBean(FunctionInvocationHelper.class);
this.streamBridgeFunctionCache = new HashMap<>();
observationRegistries.ifAvailable(registry -> this.observationRegistry = registry);
}

@Override
Expand Down Expand Up @@ -291,6 +295,7 @@ MessageChannel resolveDestination(String destinationName, ProducerProperties pro
messageChannel = this.isAsync() ? new ExecutorChannel(this.executorService) : new DirectWithAttributesChannel();
((AbstractSubscribableChannel) messageChannel).setApplicationContext(applicationContext);
((AbstractSubscribableChannel) messageChannel).setComponentName(destinationName);
((AbstractSubscribableChannel) messageChannel).registerObservationRegistry(observationRegistry);
if (this.destinationBindingCallback != null) {
Object extendedProducerProperties = this.bindingService
.getExtendedProducerProperties(messageChannel, destinationName);
Expand Down

0 comments on commit 0ebc94c

Please sign in to comment.