diff --git a/CHANGELOG.md b/CHANGELOG.md index d93e72c423..0d02ff4f65 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and what APIs have changed, if applicable. ## [Unreleased] +## [29.53.0] - 2024-04-09 +- add xDS server latency metric provider + ## [29.52.1] - 2024-04-03 - fix concurrent configuration resolution issue in the Gradle plugin in Gradle 8 and above @@ -5680,7 +5683,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.52.1...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.53.0...master +[29.53.0]: https://github.com/linkedin/rest.li/compare/v29.52.1...v29.53.0 [29.52.1]: https://github.com/linkedin/rest.li/compare/v29.52.0...v29.52.1 [29.52.0]: https://github.com/linkedin/rest.li/compare/v29.51.14...v29.52.0 [29.51.14]: https://github.com/linkedin/rest.li/compare/v29.51.13...v29.51.14 diff --git a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java index d30b44e957..e539cd0b95 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java @@ -48,6 +48,7 @@ import com.linkedin.d2.discovery.event.ServiceDiscoveryEventEmitter; import com.linkedin.d2.discovery.stores.zk.ZKPersistentConnection; import com.linkedin.d2.discovery.stores.zk.ZooKeeper; +import com.linkedin.d2.jmx.XdsServerMetricsProvider; import com.linkedin.d2.jmx.JmxManager; import com.linkedin.d2.jmx.NoOpJmxManager; import com.linkedin.r2.transport.common.TransportClientFactory; @@ -215,7 +216,8 @@ public D2Client build() _config.dualReadNewLbExecutor, _config.xdsChannelLoadBalancingPolicy, _config.xdsChannelLoadBalancingPolicyConfig, - _config.subscribeToUriGlobCollection + _config.subscribeToUriGlobCollection, + _config._xdsServerMetricsProvider ); final LoadBalancerWithFacilitiesFactory loadBalancerFactory = (_config.lbWithFacilitiesFactory == null) ? @@ -717,6 +719,11 @@ public D2ClientBuilder setSubscribeToUriGlobCollection(boolean subscribeToUriGlo return this; } + public D2ClientBuilder setXdsServerMetricsProvider(XdsServerMetricsProvider xdsServerMetricsProvider) { + _config._xdsServerMetricsProvider = xdsServerMetricsProvider; + return this; + } + private Map createDefaultTransportClientFactories() { final Map clientFactories = new HashMap<>(); diff --git a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java index 97fadb6449..a3c289c971 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java @@ -37,7 +37,9 @@ import com.linkedin.d2.discovery.stores.zk.ZKPersistentConnection; import com.linkedin.d2.discovery.stores.zk.ZooKeeper; import com.linkedin.d2.discovery.stores.zk.ZooKeeperStore; +import com.linkedin.d2.jmx.XdsServerMetricsProvider; import com.linkedin.d2.jmx.JmxManager; +import com.linkedin.d2.jmx.NoOpXdsServerMetricsProvider; import com.linkedin.d2.jmx.NoOpJmxManager; import com.linkedin.r2.transport.common.TransportClientFactory; import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; @@ -135,6 +137,7 @@ public class D2ClientConfig public String xdsChannelLoadBalancingPolicy = null; public Map xdsChannelLoadBalancingPolicyConfig = null; public boolean subscribeToUriGlobCollection = false; + public XdsServerMetricsProvider _xdsServerMetricsProvider = new NoOpXdsServerMetricsProvider(); public D2ClientConfig() { @@ -210,7 +213,8 @@ public D2ClientConfig() ExecutorService dualReadNewLbExecutor, String xdsChannelLoadBalancingPolicy, Map xdsChannelLoadBalancingPolicyConfig, - boolean subscribeToUriGlobCollection + boolean subscribeToUriGlobCollection, + XdsServerMetricsProvider xdsServerMetricsProvider ) { this.zkHosts = zkHosts; @@ -284,5 +288,6 @@ public D2ClientConfig() this.xdsChannelLoadBalancingPolicy = xdsChannelLoadBalancingPolicy; this.xdsChannelLoadBalancingPolicyConfig = xdsChannelLoadBalancingPolicyConfig; this.subscribeToUriGlobCollection = subscribeToUriGlobCollection; + this._xdsServerMetricsProvider = xdsServerMetricsProvider; } } diff --git a/d2/src/main/java/com/linkedin/d2/jmx/NoOpXdsServerMetricsProvider.java b/d2/src/main/java/com/linkedin/d2/jmx/NoOpXdsServerMetricsProvider.java new file mode 100644 index 0000000000..d5237f5fe7 --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/jmx/NoOpXdsServerMetricsProvider.java @@ -0,0 +1,41 @@ +package com.linkedin.d2.jmx; + + +/** + * NoOp implementation of {@link XdsServerMetricsProvider} + */ +public class NoOpXdsServerMetricsProvider implements XdsServerMetricsProvider { + @Override + public long getLatencyMin() { + return 0; + } + + @Override + public double getLatencyAverage() { + return 0; + } + + @Override + public long getLatency50Pct() { + return 0; + } + + @Override + public long getLatency99Pct() { + return 0; + } + + @Override + public long getLatency99_9Pct() { + return 0; + } + + @Override + public long getLatencyMax() { + return 0; + } + + @Override + public void trackLatency(long latency) { + } +} diff --git a/d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmx.java b/d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmx.java index 382785b0d1..4c151bfd35 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmx.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmx.java @@ -20,7 +20,8 @@ import java.util.concurrent.atomic.AtomicInteger; -public class XdsClientJmx implements XdsClientJmxMBean { +public class XdsClientJmx implements XdsClientJmxMBean +{ private final AtomicInteger _connectionLostCount = new AtomicInteger(); private final AtomicInteger _connectionClosedCount = new AtomicInteger(); @@ -28,6 +29,19 @@ public class XdsClientJmx implements XdsClientJmxMBean { private final AtomicBoolean _isConnected = new AtomicBoolean(); private final AtomicInteger _resourceNotFoundCount = new AtomicInteger(); + private final XdsServerMetricsProvider _xdsServerMetricsProvider; + + @Deprecated + public XdsClientJmx() + { + this(new NoOpXdsServerMetricsProvider()); + } + + public XdsClientJmx(XdsServerMetricsProvider xdsServerMetricsProvider) + { + _xdsServerMetricsProvider = xdsServerMetricsProvider == null ? + new NoOpXdsServerMetricsProvider() : xdsServerMetricsProvider; + } @Override public int getConnectionLostCount() @@ -53,6 +67,39 @@ public int getResourceNotFoundCount() return _resourceNotFoundCount.get(); } + @Override + public long getXdsServerLatencyMin() { + return _xdsServerMetricsProvider.getLatencyMin(); + } + + @Override + public double getXdsServerLatencyAverage() + { + return _xdsServerMetricsProvider.getLatencyAverage(); + } + + @Override + public long getXdsServerLatency50Pct() + { + return _xdsServerMetricsProvider.getLatency50Pct(); + } + + @Override + public long getXdsServerLatency99Pct() + { + return _xdsServerMetricsProvider.getLatency99Pct(); + } + + @Override + public long getXdsServerLatency99_9Pct() { + return _xdsServerMetricsProvider.getLatency99_9Pct(); + } + + @Override + public long getXdsServerLatencyMax() { + return _xdsServerMetricsProvider.getLatencyMax(); + } + @Override public int isDisconnected() { diff --git a/d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmxMBean.java b/d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmxMBean.java index 6d9558e708..4744495a18 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmxMBean.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmxMBean.java @@ -34,4 +34,40 @@ public interface XdsClientJmxMBean { // when the resource is not found. int getResourceNotFoundCount(); + + /** + * Get minimum of Xds server latency, which is from when the resource is updated on the Xds server to when the + * client receives it. + */ + long getXdsServerLatencyMin(); + + /** + * Get Avg of Xds server latency, which is from when the resource is updated on the Xds server to when the + * client receives it. + */ + double getXdsServerLatencyAverage(); + + /** + * Get 50 Percentile of Xds server latency, which is from when the resource is updated on the Xds server to when the + * client receives it. + */ + long getXdsServerLatency50Pct(); + + /** + * Get 90 Percentile of Xds server latency, which is from when the resource is updated on the Xds server to when the + * client receives it. + */ + long getXdsServerLatency99Pct(); + + /** + * Get 99.9 Percentile of Xds server latency, which is from when the resource is updated on the Xds server to when the + * client receives it. + */ + long getXdsServerLatency99_9Pct(); + + /** + * Get maximum of Xds server latency, which is from when the resource is updated on the Xds server to when the + * client receives it. + */ + long getXdsServerLatencyMax(); } diff --git a/d2/src/main/java/com/linkedin/d2/jmx/XdsServerMetricsProvider.java b/d2/src/main/java/com/linkedin/d2/jmx/XdsServerMetricsProvider.java new file mode 100644 index 0000000000..e5ca4daf8d --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/jmx/XdsServerMetricsProvider.java @@ -0,0 +1,49 @@ +package com.linkedin.d2.jmx; + + +/** + * Interface for providing metrics for Xds Server + */ +public interface XdsServerMetricsProvider { + /** + * Get minimum of Xds server latency, which is from when the resource is updated on the Xds server to when the + * client receives it. + */ + long getLatencyMin(); + + /** + * Get Avg of Xds server latency, which is from when the resource is updated on the Xds server to when the + * client receives it. + */ + double getLatencyAverage(); + + /** + * Get 50 Percentile of Xds server latency, which is from when the resource is updated on the Xds server to when the + * client receives it. + */ + long getLatency50Pct(); + + /** + * Get 90 Percentile of Xds server latency, which is from when the resource is updated on the Xds server to when the + * client receives it. + */ + long getLatency99Pct(); + + /** + * Get 99.9 Percentile of Xds server latency, which is from when the resource is updated on the Xds server to when the + * client receives it. + */ + long getLatency99_9Pct(); + + /** + * Get maximum of Xds server latency, which is from when the resource is updated on the Xds server to when the + * client receives it. + */ + long getLatencyMax(); + + /** + * Track the latency of the Xds server. + * @param latency the latency to track + */ + void trackLatency(long latency); +} diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index 3c0c5ea8d3..36068e72c8 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -19,11 +19,15 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Strings; +import com.google.common.collect.MapDifference; import com.google.common.collect.Maps; import com.google.protobuf.InvalidProtocolBufferException; import com.google.rpc.Code; +import com.linkedin.d2.jmx.XdsServerMetricsProvider; +import com.linkedin.d2.jmx.NoOpXdsServerMetricsProvider; import com.linkedin.d2.jmx.XdsClientJmx; import com.linkedin.d2.xds.GlobCollectionUtils.D2UriIdentifier; +import com.linkedin.util.clock.SystemClock; import indis.XdsD2; import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc; import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest; @@ -58,6 +62,9 @@ import org.slf4j.LoggerFactory; +/** + * Implementation of {@link XdsClient} interface. + */ public class XdsClientImpl extends XdsClient { private static final Logger _log = LoggerFactory.getLogger(XdsClientImpl.class); @@ -80,6 +87,7 @@ public class XdsClientImpl extends XdsClient private final long _readyTimeoutMillis; private final XdsClientJmx _xdsClientJmx; + private final XdsServerMetricsProvider _serverMetricsProvider; @Deprecated public XdsClientImpl(Node node, ManagedChannel managedChannel, ScheduledExecutorService executorService) @@ -94,8 +102,16 @@ public XdsClientImpl(Node node, ManagedChannel managedChannel, ScheduledExecutor this(node, managedChannel, executorService, readyTimeoutMillis, false); } + @Deprecated public XdsClientImpl(Node node, ManagedChannel managedChannel, ScheduledExecutorService executorService, long readyTimeoutMillis, boolean subscribeToUriGlobCollection) + { + this(node, managedChannel, executorService, readyTimeoutMillis, subscribeToUriGlobCollection, + new NoOpXdsServerMetricsProvider()); + } + + public XdsClientImpl(Node node, ManagedChannel managedChannel, ScheduledExecutorService executorService, + long readyTimeoutMillis, boolean subscribeToUriGlobCollection, XdsServerMetricsProvider serverMetricsProvider) { _readyTimeoutMillis = readyTimeoutMillis; _node = node; @@ -106,7 +122,9 @@ public XdsClientImpl(Node node, ManagedChannel managedChannel, ScheduledExecutor { _log.info("Glob collection support enabled"); } - _xdsClientJmx = new XdsClientJmx(); + + _xdsClientJmx = new XdsClientJmx(serverMetricsProvider); + _serverMetricsProvider = serverMetricsProvider == null ? new NoOpXdsServerMetricsProvider() : serverMetricsProvider; } @Override @@ -431,7 +449,7 @@ private void handleResourceUpdate(Map updates, ResourceSubscriber subscriber = subscribers.get(entry.getKey()); if (subscriber != null) { - subscriber.onData(entry.getValue()); + subscriber.onData(entry.getValue(), _serverMetricsProvider); } } } @@ -524,16 +542,17 @@ void addWatcher(ResourceWatcher watcher) } } - private void onData(ResourceUpdate data) + private void onData(ResourceUpdate data, XdsServerMetricsProvider metricsProvider) { if (Objects.equals(_data, data)) { _log.debug("Received resource update data equal to the current data. Will not perform the update."); return; } + // null value guard to avoid overwriting the property with null if (data != null && data.isValid()) { - // null value guard to avoid overwriting the property with null + trackServerLatency(data, metricsProvider); // data updated, track xds server latency _data = data; } if (_data == null) @@ -547,6 +566,51 @@ private void onData(ResourceUpdate data) } } + // track rough estimate of latency spent on the xds server in millis = resource receipt time - resource modified time + private void trackServerLatency(ResourceUpdate resourceUpdate, XdsServerMetricsProvider metricsProvider) + { + if (!shouldTrackServerLatency()) + { + return; + } + + long now = SystemClock.instance().currentTimeMillis(); + if (resourceUpdate instanceof NodeUpdate) + { + XdsD2.Node nodeData = ((NodeUpdate) resourceUpdate).getNodeData(); + if (nodeData == null) + { + return; + } + metricsProvider.trackLatency(now - nodeData.getStat().getMtime()); + } + else if (resourceUpdate instanceof D2URIMapUpdate) + { + // only track server latency for the updated/new uris in the update + Map currentUriMap = ((D2URIMapUpdate) _data).getURIMap(); + MapDifference rawDiff = Maps.difference(((D2URIMapUpdate) resourceUpdate).getURIMap(), + currentUriMap == null ? Collections.emptyMap() : currentUriMap); + Map updatedUris = rawDiff.entriesDiffering().entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> e.getValue().leftValue()) // new data of updated uris + ); + trackServerLatencyForUris(updatedUris, metricsProvider, now); + trackServerLatencyForUris(rawDiff.entriesOnlyOnLeft(), metricsProvider, now); // newly added uris + } + } + + private boolean shouldTrackServerLatency() + { + return _data != null && _data.isValid(); // not initial update and there has been valid update before + } + + private void trackServerLatencyForUris(Map uriMap, XdsServerMetricsProvider metricsProvider, + long now) + { + uriMap.forEach((k, v) -> metricsProvider.trackLatency(now - v.getModifiedTime().getSeconds() * 1000)); + } + public ResourceType getType() { return _type; diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java b/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java index d265653c18..cdfd6365cb 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java @@ -39,7 +39,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -462,26 +461,29 @@ public void onChanged(XdsClient.D2URIMapUpdate update) try { updates = update.getURIMap().entrySet().stream().collect(Collectors.toMap( - // for ZK data, the uri name has a unique number suffix (e.g: ltx1-app2253-0000000554), - // but Kafka data uri name is just the uri string, - // appending the version number will differentiate announcements made - // for the same uri (in case that an uri was de-announced then re-announced quickly). - e -> e.getKey() + e.getValue().getVersion(), e -> - { - UriProperties d2Uri = toUriProperties(e.getKey(), e.getValue()); - return d2Uri == null ? null : new XdsAndD2Uris(e.getKey(), e.getValue(), d2Uri); - })); - updates.values().removeIf(Objects::isNull); + Map.Entry::getKey, e -> + new XdsAndD2Uris(e.getKey(), e.getValue(), toUriProperties(e.getKey(), e.getValue()))) + ); + updates.values().removeIf(u -> + { + if (u._d2Uri == null) + { + LOG.warn("Failed to parse D2 uri properties for uri: {} in cluster: {} from xDS D2URI: {}." + + " Removing it from the update.", + u._uriName, _clusterName, u._xdsUri); + } + return u._d2Uri == null; + }); } catch (Exception e) { - LOG.warn("Failed to parse D2 uri properties from xDS update. Cluster name: {}. Publishing null to event bus", + LOG.warn("Failed to parse D2 uri properties from xDS update. Cluster name: {}. Publishing null to event bus", _clusterName); _uriEventBus.publishInitialize(_clusterName, null); return; } - if (!isInit) + if (!isInit && !_currentData.isEmpty()) { emitSDStatusUpdateReceiptEvents(updates); } @@ -596,8 +598,14 @@ private void emitSDStatusUpdateReceiptEvents(Map updates) MapDifference mapDifference = Maps.difference(_currentData, updates); Map markedDownUris = mapDifference.entriesOnlyOnLeft(); Map markedUpUris = mapDifference.entriesOnlyOnRight(); + Map updatedUris = mapDifference.entriesDiffering().entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> e.getValue().rightValue() // new data in updated uris + )); emitSDStatusUpdateReceiptEvents(markedUpUris, true, timestamp); + emitSDStatusUpdateReceiptEvents(updatedUris, true, timestamp); emitSDStatusUpdateReceiptEvents(markedDownUris, false, timestamp); } @@ -618,7 +626,7 @@ private void emitSDStatusUpdateReceiptEvents(Map updates, true, _xdsClient.getXdsServerAuthority(), nodePath, - d2Uri.toString(), + xdsUri.toString(), (int) xdsUri.getVersion(), xdsUri.getTracingId(), timestamp) diff --git a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java index f8bf574053..4e7f8d614b 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java @@ -57,7 +57,8 @@ public LoadBalancerWithFacilities create(D2ClientConfig config) new XdsChannelFactory(config.grpcSslContext, config.xdsServer).createChannel(), executorService, xdsStreamReadyTimeout, - config.subscribeToUriGlobCollection + config.subscribeToUriGlobCollection, + config._xdsServerMetricsProvider ); d2ClientJmxManager.registerXdsClientJmx(xdsClient.getXdsClientJmx()); diff --git a/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java b/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java index 58fdce1268..2d12fd1ab2 100644 --- a/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java +++ b/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java @@ -4,6 +4,7 @@ import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.linkedin.d2.jmx.XdsClientJmx; +import com.linkedin.d2.jmx.XdsServerMetricsProvider; import com.linkedin.d2.xds.XdsClient.D2URIMapUpdate; import com.linkedin.d2.xds.XdsClient.ResourceType; import com.linkedin.d2.xds.XdsClientImpl.DiscoveryResponseData; @@ -14,6 +15,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.Assert; @@ -29,6 +31,7 @@ public class TestXdsClientImpl { private static final byte[] DATA = "data".getBytes(); + private static final byte[] DATA2 = "data2".getBytes(); private static final String SERVICE_RESOURCE_NAME = "/d2/services/FooService"; private static final String CLUSTER_NAME = "FooClusterMaster-prod-ltx1"; private static final String CLUSTER_RESOURCE_NAME = "/d2/uris/" + CLUSTER_NAME; @@ -38,29 +41,34 @@ public class TestXdsClientImpl private static final String VERSION2 = "2"; private static final String NONCE = "nonce"; private static final XdsD2.Node NODE_WITH_DATA = XdsD2.Node.newBuilder().setData(ByteString.copyFrom(DATA)).build(); + private static final XdsD2.Node NODE_WITH_DATA2 = XdsD2.Node.newBuilder().setData(ByteString.copyFrom(DATA2)).build(); private static final XdsD2.Node NODE_WITH_EMPTY_DATA = XdsD2.Node.newBuilder().build(); private static final Any PACKED_NODE_WITH_DATA = Any.pack(NODE_WITH_DATA); + private static final Any PACKED_NODE_WITH_DATA2 = Any.pack(NODE_WITH_DATA2); private static final Any PACKED_NODE_WITH_EMPTY_DATA = Any.pack(NODE_WITH_EMPTY_DATA); private static final XdsClient.NodeUpdate NODE_UPDATE1 = new XdsClient.NodeUpdate(NODE_WITH_DATA); - private static final XdsClient.NodeUpdate NODE_UPDATE2 = new XdsClient.NodeUpdate(NODE_WITH_DATA); + private static final XdsClient.NodeUpdate NODE_UPDATE2 = new XdsClient.NodeUpdate(NODE_WITH_DATA2); private static final List NODE_RESOURCES_WITH_DATA1 = Collections.singletonList( Resource.newBuilder().setVersion(VERSION1).setName(SERVICE_RESOURCE_NAME).setResource(PACKED_NODE_WITH_DATA).build()); private static final List NODE_RESOURCES_WITH_DATA2 = Collections.singletonList( - Resource.newBuilder().setVersion(VERSION2).setName(SERVICE_RESOURCE_NAME).setResource(PACKED_NODE_WITH_DATA).build()); + Resource.newBuilder().setVersion(VERSION2).setName(SERVICE_RESOURCE_NAME).setResource(PACKED_NODE_WITH_DATA2).build()); private static final List NODE_RESOURCES_WITH_NULL_RESOURCE_FILED = Collections.singletonList( Resource.newBuilder().setVersion(VERSION1).setName(SERVICE_RESOURCE_NAME).setResource(PACKED_NODE_WITH_EMPTY_DATA).build()); - private static final XdsD2.D2URI.Builder URI_BUILDER1 = - XdsD2.D2URI.newBuilder().setVersion(Long.parseLong(VERSION1)).setClusterName(CLUSTER_NAME).setUri(URI1); - private static final XdsD2.D2URI.Builder URI_BUILDER2 = - XdsD2.D2URI.newBuilder().setVersion(Long.parseLong(VERSION1)).setClusterName(CLUSTER_NAME).setUri(URI2); - private static final XdsD2.D2URIMap.Builder D2_URI_MAP_BUILDER = XdsD2.D2URIMap.newBuilder(); + private static final XdsD2.D2URI D2URI_1 = + XdsD2.D2URI.newBuilder().setVersion(Long.parseLong(VERSION1)).setClusterName(CLUSTER_NAME).setUri(URI1).build(); + private static final XdsD2.D2URI D2URI_1_1 = + XdsD2.D2URI.newBuilder().setVersion(Long.parseLong(VERSION2)).setClusterName(CLUSTER_NAME).setUri(URI1) + .putPartitionDesc(0, 2.0).build(); + private static final XdsD2.D2URI D2URI_2 = + XdsD2.D2URI.newBuilder().setVersion(Long.parseLong(VERSION1)).setClusterName(CLUSTER_NAME).setUri(URI2).build(); private static final XdsD2.D2URIMap D2_URI_MAP_WITH_EMPTY_DATA = XdsD2.D2URIMap.newBuilder().build(); - private static final XdsD2.D2URIMap D2_URI_MAP_WITH_DATA1 = - D2_URI_MAP_BUILDER.putUris(URI1, URI_BUILDER1.build()).build(); - private static final XdsD2.D2URIMap D2_URI_MAP_WITH_DATA2 = - D2_URI_MAP_BUILDER.putUris(URI2, URI_BUILDER2.build()).build(); + private static final XdsD2.D2URIMap D2_URI_MAP_WITH_DATA1 = XdsD2.D2URIMap.newBuilder() + .putUris(URI1, D2URI_1).build(); + private static final XdsD2.D2URIMap D2_URI_MAP_WITH_DATA2 = XdsD2.D2URIMap.newBuilder() + .putUris(URI1, D2URI_1_1) // updated uri1 + .putUris(URI2, D2URI_2).build(); // added ur2 private static final D2URIMapUpdate D2_URI_MAP_UPDATE_WITH_DATA1 = new D2URIMapUpdate(D2_URI_MAP_WITH_DATA1.getUrisMap()); private static final D2URIMapUpdate D2_URI_MAP_UPDATE_WITH_DATA2 = @@ -150,14 +158,23 @@ public void testHandleD2NodeResponseWithData() fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA1); fixture.verifyAckSent(1); verify(fixture._resourceWatcher).onChanged(eq(NODE_UPDATE1)); + verifyZeroInteractions(fixture._serverMetricsProvider); // initial update should not track latency XdsClient.NodeUpdate actualData = (XdsClient.NodeUpdate) fixture._nodeSubscriber.getData(); // subscriber data should be updated to NODE_UPDATE1 - Assert.assertEquals(actualData.getNodeData(), NODE_UPDATE1.getNodeData()); + Assert.assertEquals(Objects.requireNonNull(actualData).getNodeData(), NODE_UPDATE1.getNodeData()); + + // subscriber original data is invalid, xds server latency won't be tracked + fixture._nodeSubscriber.setData(new XdsClient.NodeUpdate(null)); + fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA1); + fixture.verifyAckSent(2); + verify(fixture._resourceWatcher, times(2)).onChanged(eq(NODE_UPDATE1)); + verifyZeroInteractions(fixture._serverMetricsProvider); // subscriber data should be updated to NODE_UPDATE2 fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA2); actualData = (XdsClient.NodeUpdate) fixture._nodeSubscriber.getData(); verify(fixture._resourceWatcher).onChanged(eq(NODE_UPDATE2)); + verify(fixture._serverMetricsProvider).trackLatency(anyLong()); Assert.assertEquals(actualData.getNodeData(), NODE_UPDATE2.getNodeData()); } @@ -187,7 +204,7 @@ public void testHandleD2NodeUpdateWithBadData(DiscoveryResponseData badData, boo fixture.verifyAckOrNack(nackExpected, 1); verify(fixture._resourceWatcher).onChanged(eq(NODE.emptyData())); XdsClient.NodeUpdate actualData = (XdsClient.NodeUpdate) fixture._nodeSubscriber.getData(); - Assert.assertEquals(actualData.getNodeData(), null); + Assert.assertNull(Objects.requireNonNull(actualData).getNodeData()); fixture._nodeSubscriber.setData(NODE_UPDATE1); fixture._xdsClientImpl.handleResponse(badData); @@ -209,7 +226,7 @@ public void testHandleD2NodeResponseWithRemoval() verify(fixture._nodeSubscriber).onRemoval(); XdsClient.NodeUpdate actualData = (XdsClient.NodeUpdate) fixture._nodeSubscriber.getData(); // removed resource will not overwrite the original valid data - Assert.assertEquals(actualData.getNodeData(), NODE_UPDATE1.getNodeData()); + Assert.assertEquals(Objects.requireNonNull(actualData).getNodeData(), NODE_UPDATE1.getNodeData()); } @Test @@ -217,20 +234,29 @@ public void testHandleD2URIMapResponseWithData() { XdsClientImplFixture fixture = new XdsClientImplFixture(); // subscriber original data is null - fixture._nodeSubscriber.setData(null); + fixture._clusterSubscriber.setData(null); fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_DATA1); fixture.verifyAckSent(1); verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verifyZeroInteractions(fixture._serverMetricsProvider); D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); // subscriber data should be updated to D2_URI_MAP_UPDATE_WITH_DATA1 - Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); + Assert.assertEquals(Objects.requireNonNull(actualData).getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); + + // subscriber original data is invalid, xds server latency won't be tracked + fixture._clusterSubscriber.setData(new XdsClient.D2URIMapUpdate(null)); + fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_DATA1); + verify(fixture._resourceWatcher, times(2)).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verifyZeroInteractions(fixture._serverMetricsProvider); + fixture.verifyAckSent(2); - fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_DATA2); + fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_DATA2); // updated uri1, added uri2 actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); // subscriber data should be updated to D2_URI_MAP_UPDATE_WITH_DATA2 verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA2)); + verify(fixture._serverMetricsProvider, times(2)).trackLatency(anyLong()); Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA2.getURIMap()); - fixture.verifyAckSent(2); + fixture.verifyAckSent(3); } @Test @@ -265,6 +291,7 @@ public void testHandleD2URIMapUpdateWithBadData(DiscoveryResponseData badData, b : new D2URIMapUpdate(Collections.emptyMap()); verify(fixture._resourceWatcher).onChanged(eq(expectedUpdate)); verify(fixture._clusterSubscriber).setData(eq(null)); + verifyZeroInteractions(fixture._serverMetricsProvider); D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); Assert.assertEquals(actualData, expectedUpdate); @@ -272,6 +299,7 @@ public void testHandleD2URIMapUpdateWithBadData(DiscoveryResponseData badData, b fixture._xdsClientImpl.handleResponse(badData); fixture.verifyAckOrNack(invalidData, 2); actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); + Objects.requireNonNull(actualData); if (invalidData) { verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); // bad data will not overwrite the original valid data @@ -281,6 +309,7 @@ public void testHandleD2URIMapUpdateWithBadData(DiscoveryResponseData badData, b // But an empty cluster should clear the data Assert.assertEquals(actualData.getURIMap(), Collections.emptyMap()); } + verifyZeroInteractions(fixture._serverMetricsProvider); } @Test @@ -292,9 +321,10 @@ public void testHandleD2URIMapResponseWithRemoval() fixture.verifyAckSent(1); verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); verify(fixture._clusterSubscriber).onRemoval(); + verifyZeroInteractions(fixture._serverMetricsProvider); D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); // removed resource will not overwrite the original valid data - Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); + Assert.assertEquals(Objects.requireNonNull(actualData).getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); } @Test @@ -304,33 +334,43 @@ public void testHandleD2URICollectionResponseWithData() Resource.newBuilder() .setVersion(VERSION1) .setName(URI_URN1) - .setResource(Any.pack(URI_BUILDER1.build())) + .setResource(Any.pack(D2URI_1)) .build() ), null, NONCE, null); XdsClientImplFixture fixture = new XdsClientImplFixture(); // subscriber original data is null - fixture._nodeSubscriber.setData(null); + fixture._clusterSubscriber.setData(null); fixture._xdsClientImpl.handleResponse(createUri1); fixture.verifyAckSent(1); verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verifyZeroInteractions(fixture._serverMetricsProvider); D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); // subscriber data should be updated to D2_URI_MAP_UPDATE_WITH_DATA1 - Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); + Assert.assertEquals(Objects.requireNonNull(actualData).getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); + + // subscriber original data is invalid, xds server latency won't be tracked + fixture._clusterSubscriber.setData(new D2URIMapUpdate(null)); + fixture._xdsClientImpl.handleResponse(createUri1); + fixture.verifyAckSent(2); + verify(fixture._resourceWatcher, times(2)).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verifyZeroInteractions(fixture._serverMetricsProvider); DiscoveryResponseData createUri2Delete1 = new DiscoveryResponseData(D2_URI, Collections.singletonList( Resource.newBuilder() .setVersion(VERSION1) .setName(URI_URN2) - .setResource(Any.pack(URI_BUILDER2.build())) + .setResource(Any.pack(D2URI_2)) .build() ), Collections.singletonList(URI_URN1), NONCE, null); fixture._xdsClientImpl.handleResponse(createUri2Delete1); actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); // subscriber data should be updated to D2_URI_MAP_UPDATE_WITH_DATA2 - D2URIMapUpdate expectedUpdate = new D2URIMapUpdate(Collections.singletonMap(URI2, URI_BUILDER2.build())); + D2URIMapUpdate expectedUpdate = new D2URIMapUpdate(Collections.singletonMap(URI2, D2URI_2)); verify(fixture._resourceWatcher).onChanged(eq(expectedUpdate)); + // track latency only for updated/new uri (not for deletion) + verify(fixture._serverMetricsProvider).trackLatency(anyLong()); Assert.assertEquals(actualData.getURIMap(), expectedUpdate.getURIMap()); - fixture.verifyAckSent(2); + fixture.verifyAckSent(3); // Finally sanity check that the client correctly handles the deletion of the final URI in the collection DiscoveryResponseData deleteUri2 = @@ -340,8 +380,9 @@ public void testHandleD2URICollectionResponseWithData() // subscriber data should be updated to empty map expectedUpdate = new D2URIMapUpdate(Collections.emptyMap()); verify(fixture._resourceWatcher).onChanged(eq(expectedUpdate)); + verifyNoMoreInteractions(fixture._serverMetricsProvider); Assert.assertEquals(actualData.getURIMap(), expectedUpdate.getURIMap()); - fixture.verifyAckSent(3); + fixture.verifyAckSent(4); } @Test @@ -370,15 +411,17 @@ public void testHandleD2URICollectionUpdateWithBadData() fixture._xdsClientImpl.handleResponse(badData); fixture.verifyNackSent(1); verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP.emptyData())); + verifyZeroInteractions(fixture._serverMetricsProvider); D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); - Assert.assertEquals(actualData.getURIMap(), null); + Assert.assertNull(Objects.requireNonNull(actualData).getURIMap()); fixture._clusterSubscriber.setData(D2_URI_MAP_UPDATE_WITH_DATA1); fixture._xdsClientImpl.handleResponse(badData); fixture.verifyNackSent(2); - // Due to teh way glob collection updates are handled, bad data is dropped rather than showing any visible side + // Due to the way glob collection updates are handled, bad data is dropped rather than showing any visible side // effects other than NACKing the response. verify(fixture._resourceWatcher, times(0)).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verifyZeroInteractions(fixture._serverMetricsProvider); } @Test @@ -393,6 +436,7 @@ public void testHandleD2URICollectionResponseWithRemoval() fixture.verifyAckSent(1); verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); verify(fixture._clusterSubscriber).onRemoval(); + verifyZeroInteractions(fixture._serverMetricsProvider); D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); // removed resource will not overwrite the original valid data Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); @@ -408,6 +452,8 @@ private static class XdsClientImplFixture Map> _subscribers = new HashMap<>(); @Mock XdsClient.ResourceWatcher _resourceWatcher; + @Mock + XdsServerMetricsProvider _serverMetricsProvider; XdsClientImplFixture() { @@ -425,7 +471,9 @@ private static class XdsClientImplFixture _subscribers.put(subscriber.getType(), Collections.singletonMap(subscriber.getResource(), subscriber)); } - _xdsClientImpl = spy(new XdsClientImpl(null, null, null, 0, useGlobCollections)); + doNothing().when(_serverMetricsProvider).trackLatency(anyLong()); + _xdsClientImpl = spy(new XdsClientImpl(null, null, null, 0, + useGlobCollections, _serverMetricsProvider)); doNothing().when(_xdsClientImpl).sendAckOrNack(any(), any(), any()); when(_xdsClientImpl.getXdsClientJmx()).thenReturn(_xdsClientJmx); when(_xdsClientImpl.getResourceSubscriberMap(any())) diff --git a/d2/src/test/java/com/linkedin/d2/xds/TestXdsToD2PropertiesAdaptor.java b/d2/src/test/java/com/linkedin/d2/xds/TestXdsToD2PropertiesAdaptor.java index 9a98802b6b..8286a0e91a 100644 --- a/d2/src/test/java/com/linkedin/d2/xds/TestXdsToD2PropertiesAdaptor.java +++ b/d2/src/test/java/com/linkedin/d2/xds/TestXdsToD2PropertiesAdaptor.java @@ -32,6 +32,7 @@ import com.linkedin.d2.discovery.PropertySerializationException; import com.linkedin.d2.discovery.event.PropertyEventBus; import com.linkedin.d2.discovery.event.ServiceDiscoveryEventEmitter; +import com.linkedin.d2.discovery.event.ServiceDiscoveryEventEmitter.StatusUpdateActionType; import com.linkedin.d2.xds.XdsClient.D2URIMapResourceWatcher; import com.linkedin.d2.xds.XdsClient.NodeResourceWatcher; import indis.XdsD2; @@ -61,15 +62,21 @@ public class TestXdsToD2PropertiesAdaptor { private static final String URI_SYMLINK_RESOURCE_NAME = URI_NODE_PREFIX + SYMLINK_NAME; private static final String PRIMARY_URI_RESOURCE_NAME = URI_NODE_PREFIX + PRIMARY_CLUSTER_NAME; private static final long VERSION = 123; + private static final long VERSION_2 = 124; private static final String LOCAL_HOST = "localhost"; private static final int PORT = 8443; private static final URI LOCAL_HOST_URI = URI.create("https://" + LOCAL_HOST + ":" + PORT); private static final String TRACING_ID = "5678"; private static final String XDS_SERVER = "dummy-observer-host"; - private static final String URI_NAME = "ltx1-dummyhost456"; + private static final String URI_NAME = "https://ltx1-dummyhost1:8443"; + private static final String URI_NAME_2 = "https://ltx1-dummyhost2:8443"; + private static final String URI_NAME_3 = "https://ltx1-dummyhost3:8443"; + private static final String HOST_1 = "ltx1-dummyhost1"; + private static final String HOST_2 = "ltx1-dummyhost2"; + private static final String HOST_3 = "ltx1-dummyhost3"; private static final String SERVICE_NAME = "FooService"; - private final UriPropertiesJsonSerializer uriSerializer = new UriPropertiesJsonSerializer(); + private final UriPropertiesJsonSerializer _uriSerializer = new UriPropertiesJsonSerializer(); private static final XdsClient.NodeUpdate EMPTY_NODE_DATA = new XdsClient.NodeUpdate(null); private static final XdsClient.D2URIMapUpdate EMPTY_DATA_URI_MAP = new XdsClient.D2URIMapUpdate(null); @@ -200,7 +207,39 @@ public void testListenToNormalUri() throws PropertySerializationException fixture.getSpiedAdaptor().listenToUris(PRIMARY_CLUSTER_NAME); verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), anyMapWatcher()); - verifyUriUpdate(fixture, PRIMARY_CLUSTER_NAME, null); + XdsD2.D2URI protoUri = getD2URI(PRIMARY_CLUSTER_NAME, URI_NAME, VERSION); + Map uriMap = new HashMap<>(Collections.singletonMap(URI_NAME, protoUri)); + fixture._uriMapWatcher.onChanged(new XdsClient.D2URIMapUpdate(uriMap)); + verify(fixture._uriEventBus).publishInitialize(PRIMARY_CLUSTER_NAME, _uriSerializer.fromProto(protoUri)); + verify(fixture._eventEmitter).emitSDStatusInitialRequestEvent( + eq(PRIMARY_CLUSTER_NAME), eq(true), anyLong(), eq(true)); + // no status update receipt event emitted for initial update + verify(fixture._eventEmitter, never()).emitSDStatusUpdateReceiptEvent( + any(), any(), anyInt(), any(), anyBoolean(), any(), any(), any(), any(), any(), anyLong()); + + // add uri 2 + uriMap.put(URI_NAME_2, getD2URI(PRIMARY_CLUSTER_NAME, URI_NAME_2, VERSION)); + fixture._uriMapWatcher.onChanged(new XdsClient.D2URIMapUpdate(uriMap)); + verify(fixture._eventEmitter).emitSDStatusInitialRequestEvent( + eq(PRIMARY_CLUSTER_NAME), eq(true), anyLong(), eq(true)); // no more initial request event emitted + verify(fixture._eventEmitter).emitSDStatusUpdateReceiptEvent( // status update receipt event emitted for added uri + any(), eq(HOST_2), anyInt(), eq(ServiceDiscoveryEventEmitter.StatusUpdateActionType.MARK_READY), anyBoolean(), + any(), any(), any(), eq((int) VERSION), any(), anyLong()); + + // update uri 1, remove uri2, add uri3 + uriMap.clear(); + uriMap.put(URI_NAME, getD2URI(PRIMARY_CLUSTER_NAME, URI_NAME, VERSION_2)); + uriMap.put(URI_NAME_3, getD2URI(PRIMARY_CLUSTER_NAME, URI_NAME_3, VERSION)); + fixture._uriMapWatcher.onChanged(new XdsClient.D2URIMapUpdate(uriMap)); + verify(fixture._eventEmitter).emitSDStatusUpdateReceiptEvent( + any(), eq(HOST_1), anyInt(), eq(ServiceDiscoveryEventEmitter.StatusUpdateActionType.MARK_READY), anyBoolean(), + any(), any(), any(), eq((int) VERSION_2), any(), anyLong()); + verify(fixture._eventEmitter).emitSDStatusUpdateReceiptEvent( + any(), eq(HOST_2), anyInt(), eq(StatusUpdateActionType.MARK_DOWN), anyBoolean(), + any(), any(), any(), eq((int) VERSION), any(), anyLong()); + verify(fixture._eventEmitter).emitSDStatusUpdateReceiptEvent( + any(), eq(HOST_3), anyInt(), eq(ServiceDiscoveryEventEmitter.StatusUpdateActionType.MARK_READY), anyBoolean(), + any(), any(), any(), eq((int) VERSION), any(), anyLong()); } @Test @@ -237,13 +276,14 @@ public void testListenToUriSymlink() throws PropertySerializationException // if the old primary cluster gets an update, it will be published under its original cluster name // since the symlink points to the new primary cluster now. - XdsD2.D2URI protoUri = getD2URI(PRIMARY_CLUSTER_NAME); + XdsD2.D2URI protoUri = getD2URI(PRIMARY_CLUSTER_NAME, LOCAL_HOST_URI.toString(), VERSION); UriProperties uriProps = new UriPropertiesJsonSerializer().fromProto(protoUri); watcher.onChanged(new XdsClient.D2URIMapUpdate(Collections.singletonMap(URI_NAME, protoUri))); verify(fixture._uriEventBus).publishInitialize(PRIMARY_CLUSTER_NAME, uriProps); - verify(fixture._eventEmitter).emitSDStatusUpdateReceiptEvent( + // no status update receipt event emitted when data was empty before the update + verify(fixture._eventEmitter, never()).emitSDStatusUpdateReceiptEvent( eq(PRIMARY_CLUSTER_NAME), eq(LOCAL_HOST), eq(PORT), @@ -251,7 +291,7 @@ public void testListenToUriSymlink() throws PropertySerializationException eq(true), eq(XDS_SERVER), eq(URI_NODE_PREFIX + PRIMARY_CLUSTER_NAME + "/" + URI_NAME), - eq(uriProps.toString()), + eq(protoUri.toString()), eq((int) VERSION), eq(TRACING_ID), anyLong() @@ -261,7 +301,8 @@ public void testListenToUriSymlink() throws PropertySerializationException @Test public void testURIPropertiesDeserialization() throws PropertySerializationException { - UriProperties properties = new UriPropertiesJsonSerializer().fromProto(getD2URI(PRIMARY_CLUSTER_NAME)); + UriProperties properties = new UriPropertiesJsonSerializer().fromProto( + getD2URI(PRIMARY_CLUSTER_NAME, LOCAL_HOST_URI.toString(), VERSION)); Assert.assertEquals(properties.getClusterName(), PRIMARY_CLUSTER_NAME); Assert.assertEquals(properties.getVersion(), VERSION); Assert.assertEquals(properties.getUriSpecificProperties(), @@ -298,11 +339,11 @@ public void testOnChangedWithEmptyUpdate() verify(fixture._uriEventBus).publishInitialize(PRIMARY_CLUSTER_NAME, null); } - private XdsD2.D2URI getD2URI(String clusterName) + private XdsD2.D2URI getD2URI(String clusterName, String uri, long version) { return XdsD2.D2URI.newBuilder() - .setVersion(VERSION) - .setUri(LOCAL_HOST_URI.toString()) + .setVersion(version) + .setUri(uri) .setClusterName(clusterName) .setUriSpecificProperties(Struct.newBuilder() .putFields("foo", Value.newBuilder().setStringValue("bar").build()) @@ -353,13 +394,13 @@ private void verifyUriUpdate(XdsToD2PropertiesAdaptorFixture fixture, String clu throws PropertySerializationException { D2URIMapResourceWatcher watcher = fixture._uriMapWatcher; - XdsD2.D2URI protoUri = getD2URI(clusterName); + XdsD2.D2URI protoUri = getD2URI(clusterName, LOCAL_HOST_URI.toString(), VERSION); watcher.onChanged(new XdsClient.D2URIMapUpdate(Collections.singletonMap(URI_NAME, protoUri))); - verify(fixture._uriEventBus).publishInitialize(clusterName, uriSerializer.fromProto(protoUri)); + verify(fixture._uriEventBus).publishInitialize(clusterName, _uriSerializer.fromProto(protoUri)); if (symlinkName != null) { verify(fixture._uriEventBus).publishInitialize(symlinkName, - uriSerializer.fromProto(getD2URI(symlinkName))); + _uriSerializer.fromProto(getD2URI(symlinkName, LOCAL_HOST_URI.toString(), VERSION))); } } diff --git a/d2/src/test/java/com/linkedin/d2/xds/XdsToD2SampleClient.java b/d2/src/test/java/com/linkedin/d2/xds/XdsToD2SampleClient.java index c3741000ce..52cde627b1 100644 --- a/d2/src/test/java/com/linkedin/d2/xds/XdsToD2SampleClient.java +++ b/d2/src/test/java/com/linkedin/d2/xds/XdsToD2SampleClient.java @@ -5,6 +5,7 @@ import com.linkedin.d2.balancer.dualread.DualReadStateManager; import com.linkedin.d2.jmx.D2ClientJmxManager; import com.linkedin.d2.jmx.JmxManager; +import com.linkedin.d2.jmx.NoOpXdsServerMetricsProvider; import com.linkedin.d2.xds.util.SslContextUtil; import com.linkedin.util.clock.SystemClock; import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; @@ -89,7 +90,8 @@ public static void main(String[] args) throws Exception XdsChannelFactory xdsChannelFactory = new XdsChannelFactory(sslContext, xdsServer); XdsClient xdsClient = new XdsClientImpl(node, xdsChannelFactory.createChannel(), - Executors.newSingleThreadScheduledExecutor(), XdsClientImpl.DEFAULT_READY_TIMEOUT_MILLIS, false); + Executors.newSingleThreadScheduledExecutor(), XdsClientImpl.DEFAULT_READY_TIMEOUT_MILLIS, false, + new NoOpXdsServerMetricsProvider()); DualReadStateManager dualReadStateManager = new DualReadStateManager( () -> DualReadModeProvider.DualReadMode.DUAL_READ, diff --git a/gradle.properties b/gradle.properties index 7777341688..2252185e2a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.52.1 +version=29.53.0 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true