Skip to content

Commit

Permalink
Merge branch 'master' into renovate/gradle-gradle-build-action-3.x
Browse files Browse the repository at this point in the history
  • Loading branch information
yidongnan authored Apr 12, 2024
2 parents f7ec0f1 + bbe8def commit a653e17
Show file tree
Hide file tree
Showing 19 changed files with 387 additions and 63 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ establishing a direct line of communication. Your feedback is highly appreciated

README: [English](README.md) | [中文](README-zh-CN.md)

**Documentation:** [English](https://yidongnan.github.io/grpc-spring-boot-starter/en/) | [中文](https://yidongnan.github.io/grpc-spring-boot-starter/zh-CN/)
**Documentation:** [English](https://grpc-ecosystem.github.io/grpc-spring/en/) | [中文](https://grpc-ecosystem.github.io/grpc-spring/zh-CN/)

## Features

Expand Down
8 changes: 4 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ buildscript {
projectVersion = '3.0.0.RELEASE'

// https://github.com/grpc/grpc-java/releases
grpcVersion = '1.60.1'
grpcVersion = '1.63.0'

// https://github.com/google/guava/releases
guavaVersion = '33.0.0-jre'
guavaVersion = '33.1.0-jre'
// https://github.com/protocolbuffers/protobuf/releases
protobufVersion = '3.25.3'
protobufGradlePluginVersion = '0.9.4'

// https://github.com/spring-projects/spring-boot/releases
springBootVersion = '3.2.3'
springBootVersion = '3.2.4'
// https://github.com/spring-cloud/spring-cloud-release/releases
springCloudVersion = '2023.0.0'
// https://github.com/alibaba/spring-cloud-alibaba/releases
Expand Down Expand Up @@ -61,7 +61,7 @@ if (hasProperty('buildScan')) {
wrapper {
// Update using:
// ./gradlew wrapper --gradle-version=8.4 --distribution-type=bin
gradleVersion = '8.6'
gradleVersion = '8.7'
}

def buildTimeAndDate = OffsetDateTime.now()
Expand Down
2 changes: 1 addition & 1 deletion docs/en/client/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ There are a number of supported schemes, that you can use to determine the targe
- `discovery` (Prio 6): \
(Optional) Uses spring-cloud's `DiscoveryClient` to lookup appropriate targets. The connections will be refreshed
automatically during `HeartbeatEvent`s. Uses the `gRPC_port` metadata to determine the port, otherwise uses the
service port. \
service port. Uses the `gRPC_service_config` metadata to determine [service config](https://grpc.github.io/grpc/core/md_doc_service_config.html). \
Example: `discovery:///service-name`
- `self` (Prio 0): \
The self address or scheme is a keyword that is available, if you also use `grpc-server-spring-boot-starter` and
Expand Down
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.6-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ protected void configure(final T builder, final String name) {
configureKeepAlive(builder, name);
configureSecurity(builder, name);
configureLimits(builder, name);
configureCompression(builder, name);
configureUserAgent(builder, name);
for (final GrpcChannelConfigurer channelConfigurer : this.channelConfigurers) {
channelConfigurer.accept(builder, name);
Expand Down Expand Up @@ -234,7 +233,7 @@ protected boolean isNonNullAndNonBlank(final String value) {
}

/**
* Configures limits such as max message sizes that should be used by the channel.
* Configures limits such as max message or metadata sizes that should be used by the channel.
*
* @param builder The channel builder to configure.
* @param name The name of the client to configure.
Expand All @@ -245,18 +244,9 @@ protected void configureLimits(final T builder, final String name) {
if (maxInboundMessageSize != null) {
builder.maxInboundMessageSize((int) maxInboundMessageSize.toBytes());
}
}

/**
* Configures the compression options that should be used by the channel.
*
* @param builder The channel builder to configure.
* @param name The name of the client to configure.
*/
protected void configureCompression(final T builder, final String name) {
final GrpcChannelProperties properties = getPropertiesFor(name);
if (properties.isFullStreamDecompression()) {
builder.enableFullStreamDecompression();
final DataSize maxInboundMetadataSize = properties.getMaxInboundMetadataSize();
if (maxInboundMetadataSize != null) {
builder.maxInboundMetadataSize((int) maxInboundMetadataSize.toBytes());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,35 +333,43 @@ public void setMaxInboundMessageSize(final DataSize maxInboundMessageSize) {
}
}

// --------------------------------------------------

private Boolean fullStreamDecompression;
private static final boolean DEFAULT_FULL_STREAM_DECOMPRESSION = false;
@DataSizeUnit(DataUnit.BYTES)
private DataSize maxInboundMetadataSize = null;

/**
* Gets whether full-stream decompression of inbound streams should be enabled.
* Sets the maximum size of metadata in bytes allowed to be received. If not set ({@code null}) then it will default
* to gRPC's default. The default is implementation-dependent, but is not generally less than 8 KiB and may be
* unlimited. If set to {@code -1} then it will use the highest possible limit (not recommended). Integer.MAX_VALUE
* disables the enforcement.
*
* @return True, if full-stream decompression of inbound streams should be enabled. False otherwise.
* @return The maximum size of metadata in bytes allowed to be received or null if the default should be used.
*
* @see #setFullStreamDecompression(Boolean)
* @see ManagedChannelBuilder#maxInboundMetadataSize(int) (int)
*/
public boolean isFullStreamDecompression() {
return this.fullStreamDecompression == null ? DEFAULT_FULL_STREAM_DECOMPRESSION : this.fullStreamDecompression;
public DataSize getMaxInboundMetadataSize() {
return maxInboundMetadataSize;
}

/**
* Sets whether full-stream decompression of inbound streams should be enabled. This will cause the channel's
* outbound headers to advertise support for GZIP compressed streams, and gRPC servers which support the feature may
* respond with a GZIP compressed stream.
* Sets the maximum size of metadata in bytes allowed to be received. If not set ({@code null}) then it will
* default.The default is implementation-dependent, but is not generally less than 8 KiB and may be unlimited. If
* set to {@code -1} then it will use the highest possible limit (not recommended). Integer.MAX_VALUE disables the
* enforcement.
*
* @param fullStreamDecompression Whether full stream decompression should be enabled or null to use the fallback.
* @param maxInboundMetadataSize The new maximum size of metadata in bytes allowed to be received. {@code -1} for
* max possible. Null to use the gRPC's default.
*
* @see ManagedChannelBuilder#enableFullStreamDecompression()
* @see ManagedChannelBuilder#maxInboundMetadataSize(int) (int)
*/
public void setFullStreamDecompression(final Boolean fullStreamDecompression) {
this.fullStreamDecompression = fullStreamDecompression;
public void setMaxInboundMetadataSize(DataSize maxInboundMetadataSize) {
if (maxInboundMetadataSize == null || maxInboundMetadataSize.toBytes() >= 0) {
this.maxInboundMetadataSize = maxInboundMetadataSize;
} else if (maxInboundMetadataSize.toBytes() == -1) {
this.maxInboundMetadataSize = DataSize.ofBytes(Integer.MAX_VALUE);
} else {
throw new IllegalArgumentException("Unsupported maxInboundMetadataSize: " + maxInboundMetadataSize);
}
}

// --------------------------------------------------

private NegotiationType negotiationType;
Expand Down Expand Up @@ -493,9 +501,6 @@ public void copyDefaultsFrom(final GrpcChannelProperties config) {
if (this.maxInboundMessageSize == null) {
this.maxInboundMessageSize = config.maxInboundMessageSize;
}
if (this.fullStreamDecompression == null) {
this.fullStreamDecompression = config.fullStreamDecompression;
}
if (this.negotiationType == null) {
this.negotiationType = config.negotiationType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.grpc.Status;
import io.grpc.Status.Code;
import io.micrometer.core.instrument.Tags;
import net.devh.boot.grpc.common.util.Constants;

/**
* Provides factories for {@link io.grpc.StreamTracer} that records metrics.
Expand All @@ -47,6 +48,8 @@
final class MetricsClientStreamTracers {
private static final Supplier<Stopwatch> STOPWATCH_SUPPLIER = Stopwatch::createUnstarted;
private final Supplier<Stopwatch> stopwatchSupplier;
private static final String INSTRUMENTATION_SOURCE_TAG_KEY = "instrumentation_source";
private static final String INSTRUMENTATION_VERSION_TAG_KEY = "instrumentation_version";

MetricsClientStreamTracers() {
this(STOPWATCH_SUPPLIER);
Expand Down Expand Up @@ -127,7 +130,10 @@ public void streamClosed(Status status) {

void recordFinishedAttempt() {
Tags attemptMetricTags =
Tags.of("grpc.method", fullMethodName, "grpc.status", statusCode.toString());
Tags.of("grpc.method", fullMethodName,
"grpc.status", statusCode.toString(),
INSTRUMENTATION_SOURCE_TAG_KEY, Constants.LIBRARY_NAME,
INSTRUMENTATION_VERSION_TAG_KEY, Constants.VERSION);
this.metricsClientMeters.getClientAttemptDuration()
.withTags(attemptMetricTags)
.record(attemptNanos, TimeUnit.NANOSECONDS);
Expand Down Expand Up @@ -168,7 +174,9 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory

// Record here in case newClientStreamTracer() would never be called.
this.metricsClientMeters.getAttemptCounter()
.withTags(Tags.of("grpc.method", fullMethodName))
.withTags(Tags.of("grpc.method", fullMethodName,
INSTRUMENTATION_SOURCE_TAG_KEY, Constants.LIBRARY_NAME,
INSTRUMENTATION_VERSION_TAG_KEY, Constants.VERSION))
.increment();
}

Expand All @@ -188,7 +196,9 @@ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metada
// attempt, as first attempt cannot be a transparent retry.
if (attemptsPerCall.get() > 0) {
this.metricsClientMeters.getAttemptCounter()
.withTags((Tags.of("grpc.method", fullMethodName)))
.withTags((Tags.of("grpc.method", fullMethodName,
INSTRUMENTATION_SOURCE_TAG_KEY, Constants.LIBRARY_NAME,
INSTRUMENTATION_VERSION_TAG_KEY, Constants.VERSION)))
.increment();
}
if (!info.isTransparentRetry()) {
Expand Down Expand Up @@ -248,7 +258,10 @@ void recordFinishedCall() {
}
callLatencyNanos = clientCallStopWatch.elapsed(TimeUnit.NANOSECONDS);
Tags clientCallMetricTags =
Tags.of("grpc.method", this.fullMethodName, "grpc.status", status.getCode().toString());
Tags.of("grpc.method", this.fullMethodName,
"grpc.status", status.getCode().toString(),
INSTRUMENTATION_SOURCE_TAG_KEY, Constants.LIBRARY_NAME,
INSTRUMENTATION_VERSION_TAG_KEY, Constants.VERSION);
this.metricsClientMeters.getClientCallDuration()
.withTags(clientCallMetricTags)
.record(callLatencyNanos, TimeUnit.NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static net.devh.boot.grpc.client.nameresolver.DiscoveryClientResolverFactory.DISCOVERY_INSTANCE_ID_KEY;
import static net.devh.boot.grpc.client.nameresolver.DiscoveryClientResolverFactory.DISCOVERY_SERVICE_NAME_KEY;
import static net.devh.boot.grpc.common.util.GrpcUtils.CLOUD_DISCOVERY_METADATA_PORT;
import static net.devh.boot.grpc.common.util.GrpcUtils.CLOUD_DISCOVERY_METADATA_SERVICE_CONFIG;

import java.net.InetSocketAddress;
import java.util.List;
Expand All @@ -35,6 +36,8 @@
import org.springframework.util.CollectionUtils;

import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;

import io.grpc.Attributes;
import io.grpc.Attributes.Builder;
Expand All @@ -58,13 +61,15 @@ public class DiscoveryClientNameResolver extends NameResolver {
@Deprecated
private static final String LEGACY_CLOUD_DISCOVERY_METADATA_PORT = "gRPC.port";
private static final List<ServiceInstance> KEEP_PREVIOUS = null;
private static final Gson GSON = new Gson();

private final String name;
private final DiscoveryClient client;
private final SynchronizationContext syncContext;
private final Consumer<DiscoveryClientNameResolver> shutdownHook;
private final SharedResourceHolder.Resource<Executor> executorResource;
private final boolean usingExecutorResource;
private final ServiceConfigParser serviceConfigParser;

// The field must be accessed from syncContext, although the methods on an Listener2 can be called
// from any thread.
Expand Down Expand Up @@ -93,6 +98,7 @@ public DiscoveryClientNameResolver(final String name, final DiscoveryClient clie
this.executor = args.getOffloadExecutor();
this.usingExecutorResource = this.executor == null;
this.executorResource = executorResource;
this.serviceConfigParser = args.getServiceConfigParser();
}

/**
Expand Down Expand Up @@ -187,6 +193,55 @@ protected int getGrpcPort(final ServiceInstance instance) {
}
}

/**
* Extracts and parse gRPC service config from the given service instances.
*
* @param instances The list of instances to extract the service config from.
* @return Parsed gRPC service config or null.
*/
private ConfigOrError resolveServiceConfig(List<ServiceInstance> instances) {
final String serviceConfig = getServiceConfig(instances);
if (serviceConfig == null) {
return null;
}
log.debug("Found service config for {}", getName());
if (log.isTraceEnabled()) {
// This is to avoid blowing log into several lines if newlines present in service config string.
final String logStr = serviceConfig.replace("\r", "\\r").replace("\n", "\\n");
log.trace("Service config for {}: {}", getName(), logStr);
}
try {
@SuppressWarnings("unchecked")
Map<String, ?> parsedServiceConfig = GSON.fromJson(serviceConfig, Map.class);
return serviceConfigParser.parseServiceConfig(parsedServiceConfig);
} catch (JsonSyntaxException e) {
return ConfigOrError.fromError(
Status.UNKNOWN
.withDescription("Failed to parse grpc service config")
.withCause(e));
}
}

/**
* Extracts the gRPC service config string from the given service instances.
*
* @param instances The list of instances to extract the service config from.
* @return The gRPC service config or null.
*/
protected String getServiceConfig(final List<ServiceInstance> instances) {
for (final ServiceInstance inst : instances) {
final Map<String, String> metadata = inst.getMetadata();
if (metadata == null || metadata.isEmpty()) {
continue;
}
final String metaValue = metadata.get(CLOUD_DISCOVERY_METADATA_SERVICE_CONFIG);
if (metaValue != null && !metaValue.isEmpty()) {
return metaValue;
}
}
return null;
}

/**
* Gets the attributes from the service instance for later use in a load balancer. Can be overwritten to convert
* custom attributes.
Expand Down Expand Up @@ -318,6 +373,7 @@ private List<ServiceInstance> resolveInternal() {
log.debug("Ready to update server list for {}", getName());
this.savedListener.onResult(ResolutionResult.newBuilder()
.setAddresses(toTargets(newInstanceList))
.setServiceConfig(resolveServiceConfig(newInstanceList))
.build());
log.info("Done updating server list for {}", getName());
return newInstanceList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,6 @@
"description": "Whether keepAlive should be enabled.",
"defaultValue": false
},
{
"name": "grpc.client.GLOBAL.full-stream-decompression",
"type": "java.lang.Boolean",
"sourceType": "net.devh.boot.grpc.client.config.GrpcChannelProperties",
"description": "Whether full-stream decompression of inbound streams should be enabled.",
"defaultValue": false
},
{
"name": "grpc.client.GLOBAL.keep-alive-time",
"type": "java.time.Duration",
Expand Down Expand Up @@ -94,6 +87,12 @@
"sourceType": "net.devh.boot.grpc.client.config.GrpcChannelProperties",
"description": "The maximum message size allowed to be received by the channel.\nIf not set (null) then it will default to gRPC's default.\nIf set to -1 then it will use the highest possible limit (not recommended)."
},
{
"name": "grpc.client.GLOBAL.max-inbound-metadata-size",
"type": "org.springframework.util.unit.DataSize",
"sourceType": "net.devh.boot.grpc.client.config.GrpcChannelProperties",
"description": "the maximum size of metadata in bytes allowed to be received. \nIf not set (null) then it will default to gRPC's default. \nIf set to {@code -1} then it will use the highest possible limit (not recommended)."
},
{
"name": "grpc.client.GLOBAL.negotiation-type",
"type": "net.devh.boot.grpc.client.config.NegotiationType",
Expand Down Expand Up @@ -158,4 +157,4 @@
"description": "The path to the trusted certificate collection.\nIf not set (null) it will use the system's default collection (Default).\nThis collection will be used to verify server certificates."
}
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
@ExtendWith(SpringExtension.class)
@SpringBootTest(properties = {
"grpc.client.test.keepAliveTime=42m",
"grpc.client.test.maxInboundMessageSize=5MB"
"grpc.client.test.maxInboundMessageSize=5MB",
"grpc.client.test.maxInboundMetadataSize=3MB"
})
class GrpcChannelPropertiesGivenUnitTest {

Expand All @@ -45,6 +46,7 @@ void test() {
final GrpcChannelProperties properties = this.grpcChannelsProperties.getChannel("test");
assertEquals(Duration.ofMinutes(42), properties.getKeepAliveTime());
assertEquals(DataSize.ofMegabytes(5), properties.getMaxInboundMessageSize());
assertEquals(DataSize.ofMegabytes(3), properties.getMaxInboundMetadataSize());
}

}
Loading

0 comments on commit a653e17

Please sign in to comment.