Skip to content

Commit

Permalink
GH-2932: Sanitize sensitive data on bindings endpoint
Browse files Browse the repository at this point in the history
Resolves #2932

Spring Boot provides `SanitizingFunction` to allow the applicaitons to clear out
sensitive data when using certain actuator endpoints. This feature can be
extended to custom endpoints as well. Enable the bindings actuator endpoint
to sanitze sensitive data based on user-provided logic in `SantizingFuction`
beans in the application.
  • Loading branch information
sobychacko committed Apr 17, 2024
1 parent 3d679c4 commit 7845be5
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;

import io.micrometer.core.instrument.MeterRegistry;
Expand All @@ -27,6 +28,7 @@

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.endpoint.SanitizingFunction;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.FilteredClassLoader;
import org.springframework.boot.test.context.SpringBootTest;
Expand All @@ -37,6 +39,8 @@
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.cloud.stream.config.MessageSourceCustomizer;
import org.springframework.cloud.stream.config.ProducerMessageHandlerCustomizer;
import org.springframework.cloud.stream.endpoint.BindingsEndpoint;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
Expand All @@ -63,6 +67,8 @@
properties = {
"spring.cloud.stream.bindings.input.group=" + KafkaBinderActuatorTests.TEST_CONSUMER_GROUP,
"spring.cloud.stream.function.bindings.process-in-0=input",
"management.endpoints.web.exposure.include=bindings",
"spring.cloud.stream.kafka.bindings.input.consumer.configuration.sasl.jaas.config=secret",
"spring.cloud.stream.pollable-source=input"}
)
@DirtiesContext
Expand All @@ -77,6 +83,9 @@ class KafkaBinderActuatorTests {
@Autowired
private KafkaTemplate<?, byte[]> kafkaTemplate;

@Autowired
private ApplicationContext context;

@Test
void kafkaBinderMetricsExposed() {
this.kafkaTemplate.send("input", null, "foo".getBytes());
Expand All @@ -87,6 +96,36 @@ void kafkaBinderMetricsExposed() {
.value()).isGreaterThan(0);
}

@Test
@SuppressWarnings("unchecked")
void bindingsActuatorEndpointInKafkaBinderBasedApp() {

BindingsEndpoint controller = context.getBean(BindingsEndpoint.class);
List<Map<String, Object>> bindings = controller.queryStates();
Optional<Map<String, Object>> first = bindings.stream().filter(m -> m.get("bindingName").equals("input")).findFirst();
assertThat(first.isPresent()).isTrue();
Map<String, Object> inputBindingMap = first.get();

Map<String, Object> extendedInfo = (Map<String, Object>) inputBindingMap.get("extendedInfo");
Map<String, Object> extendedConsumerProperties = (Map<String, Object>) extendedInfo.get("ExtendedConsumerProperties");
Map<String, Object> extension = (Map<String, Object>) extendedConsumerProperties.get("extension");
Map<String, Object> configuration = (Map<String, Object>) extension.get("configuration");
String saslJaasConfig = (String) configuration.get("sasl.jaas.config");

assertThat(saslJaasConfig).isEqualTo("data-scrambled!!");

List<Binding<?>> input = controller.queryState("input");
// Since the above call goes through JSON serialization, we receive the type as a map of bindings.
// The above call goes through this serialization because we provide a sanitization function.
Map<String, Object> extendedInfo1 = (Map<String, Object>) ((Map<String, Object>) input.get(0)).get("extendedInfo");
Map<String, Object> extendedConsumerProperties1 = (Map<String, Object>) extendedInfo1.get("ExtendedConsumerProperties");
Map<String, Object> extension1 = (Map<String, Object>) extendedConsumerProperties1.get("extension");
Map<String, Object> configuration1 = (Map<String, Object>) extension1.get("configuration");
String saslJaasConfig1 = (String) configuration1.get("sasl.jaas.config");

assertThat(saslJaasConfig1).isEqualTo("data-scrambled!!");
}

@Test
@Disabled
void kafkaBinderMetricsWhenNoMicrometer() {
Expand Down Expand Up @@ -169,5 +208,17 @@ public Consumer<String> process() {
};
}

@Bean
public SanitizingFunction sanitizingFunction() {
return sd -> {
if (sd.getKey().equals("sasl.jaas.config")) {
return sd.withValue("data-scrambled!!");
}
else {
return sd;
}
};
}

}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,7 +63,7 @@ void actuatorDoesNotCauseInfiniteRecursion() {

BindingsLifecycleController controller = context
.getBean(BindingsLifecycleController.class);
List<Map<?, ?>> bindings = controller.queryStates();
List<Map<String, Object>> bindings = controller.queryStates();
assertThat(bindings.size()).isEqualTo(1);
assertThat(bindings.get(0).get("bindingName")).isEqualTo("consume-in-0");
}
Expand Down Expand Up @@ -115,7 +115,7 @@ void whenTwoBindersFoundNoErrorIfBinderProvidedThroughBinding() throws Exception

BindingsLifecycleController controller = context
.getBean(BindingsLifecycleController.class);
List<Map<?, ?>> bindings = controller.queryStates();
List<Map<String, Object>> bindings = controller.queryStates();
assertThat(bindings.size()).isEqualTo(1);
assertThat(bindings.get(0).get("bindingName")).isEqualTo("consume-in-0");
assertThat(bindings.get(0).get("binderName")).isEqualTo("integration");
Expand All @@ -136,7 +136,7 @@ void whenTwoBindersFoundNoErrorWhenDefaultBinderIsProvided() throws Exception {

BindingsLifecycleController controller = context
.getBean(BindingsLifecycleController.class);
List<Map<?, ?>> bindings = controller.queryStates();
List<Map<String, Object>> bindings = controller.queryStates();
assertThat(bindings.size()).isEqualTo(1);
assertThat(bindings.get(0).get("bindingName")).isEqualTo("consume-in-0");
assertThat(bindings.get(0).get("binderName")).isEqualTo("integration1");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2021 the original author or authors.
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,6 +38,7 @@
* It is registered as a bean and once injected could be used to control the lifecycle f the bindings.
*
* @author Oleg Zhurakousky
* @author Soby Chacko
* @since 3.x
*/
public class BindingsLifecycleController {
Expand Down Expand Up @@ -71,6 +72,15 @@ public BindingsLifecycleController(List<InputBindingLifecycle> inputBindingLifec
}
}

/**
* Provide an accessor for the custom ObjectMapper created by this controller.
* @return {@link ObjectMapper}
* @since 4.1.2
*/
public ObjectMapper getObjectMapper() {
return objectMapper;
}

/**
* Convenience method to stop the binding with provided `bindingName`.
* @param bindingName the name of the binding.
Expand Down Expand Up @@ -129,7 +139,7 @@ public void changeState(String bindingName, State state) {
* @return the list of {@link Binding}s
*/
@SuppressWarnings("unchecked")
public List<Map<?, ?>> queryStates() {
public List<Map<String, Object>> queryStates() {
List<Binding<?>> bindings = new ArrayList<>(gatherInputBindings());
bindings.addAll(gatherOutputBindings());
return this.objectMapper.convertValue(bindings, List.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,8 +16,10 @@

package org.springframework.cloud.stream.config;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration;
import org.springframework.boot.actuate.autoconfigure.endpoint.condition.ConditionalOnAvailableEndpoint;
import org.springframework.boot.actuate.endpoint.SanitizingFunction;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
Expand All @@ -29,6 +31,7 @@

/**
* @author Oleg Zhurakousky
* @author Soby Chacko
* @since 2.0
*/
@AutoConfiguration
Expand All @@ -40,8 +43,10 @@ public class BindingsEndpointAutoConfiguration {

@Bean
@ConditionalOnAvailableEndpoint
public BindingsEndpoint bindingsEndpoint(BindingsLifecycleController bindingsLifecycleController) {
return new BindingsEndpoint(bindingsLifecycleController);
public BindingsEndpoint bindingsEndpoint(BindingsLifecycleController bindingsLifecycleController,
ObjectProvider<SanitizingFunction> sanitizingFunctions) {
return new BindingsEndpoint(bindingsLifecycleController, sanitizingFunctions.orderedStream().toList(),
bindingsLifecycleController.getObjectMapper());
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,30 +16,58 @@

package org.springframework.cloud.stream.endpoint;

import java.util.Collection;
import java.util.List;
import java.util.Map;

import com.fasterxml.jackson.databind.ObjectMapper;

import org.springframework.boot.actuate.endpoint.SanitizableData;
import org.springframework.boot.actuate.endpoint.Sanitizer;
import org.springframework.boot.actuate.endpoint.SanitizingFunction;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
import org.springframework.boot.actuate.endpoint.annotation.Selector;
import org.springframework.boot.actuate.endpoint.annotation.WriteOperation;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binding.BindingsLifecycleController;
import org.springframework.cloud.stream.binding.BindingsLifecycleController.State;
import org.springframework.lang.Nullable;
import org.springframework.util.CollectionUtils;

/**
*
* Actuator endpoint for binding control.
*
* @author Oleg Zhurakousky
* @author Soby Chacko
* @since 2.0
*/
@Endpoint(id = "bindings")
public class BindingsEndpoint {

private final BindingsLifecycleController lifecycleController;

private final Sanitizer sanitizer;

private final ObjectMapper objectMapper;

public BindingsEndpoint(BindingsLifecycleController lifecycleController) {
this(lifecycleController, null, null);
}

/**
* @param lifecycleController {@link BindingsLifecycleController}
* @param sanitizingFunctions list of user provided {@link SanitizingFunction} beans
* @param objectMapper from {@link BindingsLifecycleController}
* @since 4.1.2
*/
public BindingsEndpoint(BindingsLifecycleController lifecycleController,
@Nullable Iterable<SanitizingFunction> sanitizingFunctions, @Nullable ObjectMapper objectMapper) {
this.lifecycleController = lifecycleController;
this.sanitizer = CollectionUtils.isEmpty((Collection<?>) sanitizingFunctions) ? null :
new Sanitizer(sanitizingFunctions);
this.objectMapper = objectMapper;
}

@WriteOperation
Expand All @@ -48,13 +76,45 @@ public void changeState(@Selector String name, State state) {
}

@ReadOperation
public List<?> queryStates() {
return this.lifecycleController.queryStates();
public List<Map<String, Object>> queryStates() {
List<Map<String, Object>> bindings = this.lifecycleController.queryStates();
if (this.sanitizer != null) {
for (Map<String, Object> binding : bindings) {
sanitizeSensitiveData(binding);
}
}
return bindings;
}

@ReadOperation
@SuppressWarnings("unchecked")
public List<Binding<?>> queryState(@Selector String name) {
return this.lifecycleController.queryState(name);
List<Binding<?>> bindings = this.lifecycleController.queryState(name);
if (this.sanitizer != null) {
List<Map<String, Object>> bindingsAsMap = this.objectMapper.convertValue(bindings, List.class);
for (Map<String, Object> binding : bindingsAsMap) {
sanitizeSensitiveData(binding);
}
// End users will get a list of map that contains information from the underlying Binding.
return this.objectMapper.convertValue(bindingsAsMap, List.class);
}
return bindings;
}

@SuppressWarnings("unchecked")
public void sanitizeSensitiveData(Map<String, Object> binding) {
for (String key : binding.keySet()) {
Object value = binding.get(key);
if (value != null && Map.class.isAssignableFrom(value.getClass())) {
// Recursive call since we encountered an inner map
sanitizeSensitiveData((Map<String, Object>) value);
}
else {
SanitizableData sanitizableData = new SanitizableData(null, key, value);
Object sanitized = this.sanitizer.sanitize(sanitizableData, true);
binding.put(key, sanitized);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,24 @@ You can also stop, start, pause, and resume individual bindings by posting to th
NOTE: `PAUSED` and `RESUMED` work only when the corresponding binder and its underlying technology supports it. Otherwise, you see the warning message in the logs.
Currently, only Kafka and [Solace](https://github.com/SolaceProducts/solace-spring-cloud/tree/master/solace-spring-cloud-starters/solace-spring-cloud-stream-starter#consumer-bindings-pauseresume) binders supports the `PAUSED` and `RESUMED` states.

[[sanitize-sensitive-data]]
=== Sanitize Sensitive Data

When using the binding actuator endpoint, it is sometimes critical to sanitize any sensitive data such as user credentials, information about SSL keys, etc.
To achieve this, end user applications can provide a `SanitizingFunction` from Spring Boot as a bean in the application.
Here is an example to scramble the data when providing a value for Apache Kafka's `sasl.jaas.config` property.

```
@Bean
public SanitizingFunction sanitizingFunction() {
return sanitizableData -> {
if (sanitizableData.getKey().equals("sasl.jaas.config")) {
return sanitizableData.withValue("data-scrambled!!");
}
else {
return sanitizableData;
}
};
}
```

0 comments on commit 7845be5

Please sign in to comment.