Skip to content

Commit

Permalink
optionally prevent role/originalPrincipal logging
Browse files Browse the repository at this point in the history
  • Loading branch information
KannarFr committed Oct 1, 2024
1 parent 5b98d37 commit 7e68070
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,12 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private String clusterName;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Prevent broker from logging role and originalAuthRole, default is false"
)
private Boolean preventRoleLogging = false;

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RedactedRole;
import org.apache.pulsar.common.util.StringInterner;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.common.util.netty.NettyChannelUtil;
Expand Down Expand Up @@ -217,6 +218,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
private AuthData originalAuthDataCopy;
private boolean pendingAuthChallengeResponse = false;
private ScheduledFuture<?> authRefreshTask;
private boolean preventRoleLogging = false;

// Max number of pending requests per connections. If multiple producers are sharing the same connection the flow
// control done by a single producer might not be enough to prevent write spikes on the broker.
Expand Down Expand Up @@ -351,6 +353,7 @@ public ServerCnx(PulsarService pulsar, String listenerName) {
enableSubscriptionPatternEvaluation, maxSubscriptionPatternLength);
this.brokerInterceptor = this.service != null ? this.service.getInterceptor() : null;
this.throttleTracker = new ServerCnxThrottleTracker(this);
this.preventRoleLogging = conf.isPreventRoleLogging();
}

@Override
Expand Down Expand Up @@ -829,13 +832,16 @@ private void completeConnect(int clientProtoVersion, String clientVersion) {
log.info("[{}] connected with clientVersion={}, clientProtocolVersion={}, proxyVersion={}", remoteAddress,
clientVersion, clientProtoVersion, proxyVersion);
} else if (originalPrincipal != null) {
String maybeAnonymizedAuthRole = RedactedRole.anonymize(authRole, preventRoleLogging);
String maybeAnonymizedOriginalPrincipal = RedactedRole.anonymize(originalPrincipal, preventRoleLogging);
log.info("[{}] connected role={} and originalAuthRole={} using authMethod={}, clientVersion={}, "
+ "clientProtocolVersion={}, proxyVersion={}", remoteAddress, authRole, originalPrincipal,
authMethod, clientVersion, clientProtoVersion, proxyVersion);
+ "clientProtocolVersion={}, proxyVersion={}", remoteAddress, maybeAnonymizedAuthRole,
maybeAnonymizedOriginalPrincipal, authMethod, clientVersion, clientProtoVersion, proxyVersion);
} else {
String maybeAnonymizedAuthRole = RedactedRole.anonymize(authRole, preventRoleLogging);
log.info("[{}] connected with role={} using authMethod={}, clientVersion={}, clientProtocolVersion={}, "
+ "proxyVersion={}", remoteAddress, authRole, authMethod, clientVersion, clientProtoVersion,
proxyVersion);
+ "proxyVersion={}", remoteAddress, maybeAnonymizedAuthRole, authMethod, clientVersion,
clientProtoVersion, proxyVersion);
}
if (brokerInterceptor != null) {
brokerInterceptor.onConnectionCreated(this);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.apache.pulsar.common.util;

public final class RedactedRole {
private static final String REDACTED = "[REDACTED]";

public static String anonymize(String role, Boolean preventLogging) {
return preventLogging ? REDACTED : role;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,10 @@ public class ProxyConfiguration implements PulsarConfiguration {
+ "is enabled.")
private Boolean webServiceLogDetailedAddresses;

@FieldContext(category = CATEGORY_SERVER, doc =
"Prevent broker from logging role and originalAuthRole, default is false.")
private Boolean preventRoleLogging = false;

@FieldContext(category = CATEGORY_SERVER,
doc = "Enables zero-copy transport of data across network interfaces using the spice. "
+ "Zero copy mode cannot be used when TLS is enabled or when proxyLogLevel is > 0.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.util.RedactedRole;
import org.apache.pulsar.common.util.Runnables;
import org.apache.pulsar.common.util.netty.NettyChannelUtil;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
Expand Down Expand Up @@ -343,16 +344,18 @@ protected static boolean isTlsChannel(Channel channel) {

private synchronized void completeConnect() throws PulsarClientException {
checkArgument(state == State.Connecting);
Boolean preventRoleLogging = service.getConfiguration().getPreventRoleLogging();
String role = RedactedRole.anonymize(clientAuthRole, preventRoleLogging);
LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}",
remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl);
remoteAddress, authMethod, role, hasProxyToBrokerUrl);
if (hasProxyToBrokerUrl) {
// Optimize proxy connection to fail-fast if the target broker isn't active
// Pulsar client will retry connecting after a back off timeout
if (service.getConfiguration().isCheckActiveBrokers()
&& !isBrokerActive(proxyToBrokerUrl)) {
state = State.Closing;
LOG.warn("[{}] Target broker '{}' isn't available. authenticated with {} role {}.",
remoteAddress, proxyToBrokerUrl, authMethod, clientAuthRole);
remoteAddress, proxyToBrokerUrl, authMethod, role);
final ByteBuf msg = Commands.newError(-1,
ServerError.ServiceNotReady, "Target broker isn't available.");
writeAndFlushAndClose(msg);
Expand All @@ -371,10 +374,10 @@ private synchronized void completeConnect() throws PulsarClientException {

LOG.warn("[{}] Target broker '{}' cannot be validated. {}. authenticated with {} role {}.",
remoteAddress, proxyToBrokerUrl, targetAddressDeniedException.getMessage(),
authMethod, clientAuthRole);
authMethod, role);
} else {
LOG.error("[{}] Error validating target broker '{}'. authenticated with {} role {}.",
remoteAddress, proxyToBrokerUrl, authMethod, clientAuthRole, throwable);
remoteAddress, proxyToBrokerUrl, authMethod, role, throwable);
}
final ByteBuf msg = Commands.newError(-1, ServerError.ServiceNotReady,
"Target broker cannot be validated.");
Expand All @@ -401,7 +404,7 @@ private synchronized void completeConnect() throws PulsarClientException {
Optional.of(dnsAddressResolverGroup.getResolver(service.getWorkerGroup().next())), null);
} else {
LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}",
remoteAddress, state, clientAuthRole);
remoteAddress, state, role);
}

state = State.ProxyLookupRequests;
Expand Down

0 comments on commit 7e68070

Please sign in to comment.