From 7af238000789a1c260d3e8bd9757dd31e9fa20f2 Mon Sep 17 00:00:00 2001 From: liubao Date: Sat, 7 Oct 2023 15:24:39 +0800 Subject: [PATCH] [SCB-2804]Fix instance not found when instances frequently change enhancement: keep cache versions --- .../registry/consumer/AppManager.java | 8 ++-- .../consumer/MicroserviceManager.java | 16 +++---- .../consumer/MicroserviceVersions.java | 45 ++++++++++++++++--- .../serviceregistry/RegistryUtils.java | 8 +--- .../registry/AbstractServiceRegistry.java | 2 + .../serviceregistry/TestConsumers.java | 6 ++- 6 files changed, 57 insertions(+), 28 deletions(-) diff --git a/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/consumer/AppManager.java b/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/consumer/AppManager.java index ef72228c2a..f149516e92 100644 --- a/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/consumer/AppManager.java +++ b/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/consumer/AppManager.java @@ -23,7 +23,7 @@ import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx; import org.apache.servicecomb.foundation.common.event.EventManager; -import org.apache.servicecomb.registry.api.MicroserviceKey; +import org.apache.servicecomb.registry.api.event.MicroserviceInstanceChangedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,13 +70,13 @@ public MicroserviceVersions getOrCreateMicroserviceVersions(String appId, String return microserviceManager.getOrCreateMicroserviceVersions(microserviceName); } - public void onMicroserviceInstancesChanged(MicroserviceKey microserviceKey) { - MicroserviceManager microserviceManager = apps.get(microserviceKey.getAppId()); + public void onMicroserviceInstanceChanged(MicroserviceInstanceChangedEvent changedEvent) { + MicroserviceManager microserviceManager = apps.get(changedEvent.getKey().getAppId()); if (microserviceManager == null) { return; } - microserviceManager.onMicroserviceInstancesChanged(microserviceKey); + microserviceManager.onMicroserviceInstanceChanged(changedEvent); } public void pullInstances() { diff --git a/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/consumer/MicroserviceManager.java b/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/consumer/MicroserviceManager.java index 226bd73234..862ea506c3 100644 --- a/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/consumer/MicroserviceManager.java +++ b/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/consumer/MicroserviceManager.java @@ -18,12 +18,11 @@ package org.apache.servicecomb.registry.consumer; import java.util.Map; -import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx; import org.apache.servicecomb.foundation.vertx.executor.SinglePoolBlockingExecutor; -import org.apache.servicecomb.registry.api.MicroserviceKey; +import org.apache.servicecomb.registry.api.event.MicroserviceInstanceChangedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -129,15 +128,12 @@ public void pullInstances() { /** * Update instance information triggered by event, called when instance list changed. */ - public void onMicroserviceInstancesChanged(MicroserviceKey microserviceKey) { + public void onMicroserviceInstanceChanged(MicroserviceInstanceChangedEvent changedEvent) { synchronized (lock) { - for (Entry item : versionsByName.entrySet()) { - if (item.getKey().equals(microserviceKey.getServiceName())) { - versionsByName.remove(item.getKey()); - item.getValue().destroy(); - LOGGER.info("remove microservice version when instance changed, appId={}, microserviceName={}.", - appId, item.getKey()); - } + for (MicroserviceVersions microserviceVersions : versionsByName.values()) { + microserviceVersions.onMicroserviceInstanceChanged(changedEvent); + + tryRemoveInvalidMicroservice(microserviceVersions); } } } diff --git a/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/consumer/MicroserviceVersions.java b/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/consumer/MicroserviceVersions.java index c2c2446ba4..4e665b2dbe 100644 --- a/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/consumer/MicroserviceVersions.java +++ b/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/consumer/MicroserviceVersions.java @@ -32,6 +32,7 @@ import org.apache.servicecomb.registry.DiscoveryManager; import org.apache.servicecomb.registry.api.event.CreateMicroserviceEvent; import org.apache.servicecomb.registry.api.event.DestroyMicroserviceEvent; +import org.apache.servicecomb.registry.api.event.MicroserviceInstanceChangedEvent; import org.apache.servicecomb.registry.api.registry.MicroserviceInstance; import org.apache.servicecomb.registry.api.registry.MicroserviceInstanceStatus; import org.apache.servicecomb.registry.api.registry.MicroserviceInstances; @@ -232,13 +233,19 @@ private void setInstances(List pulledInstances, String rev synchronized (lock) { MergedInstances mergedInstances = mergeInstances(pulledInstances, instances); instances = mergedInstances.instanceIdMap.values(); - // clear cache - versions.forEach((key, value) -> value.setInstances(new ArrayList<>())); + + // set instances to empty for no instance versions + versions.forEach((key, value) -> { + if (!mergedInstances.microserviceIdMap.containsKey(key)) { + value.setInstances(new ArrayList<>()); + } + }); + + // update instances for (Entry> entry : mergedInstances.microserviceIdMap.entrySet()) { - // always update microservice versions, because we allow microservice info override, like schema info - MicroserviceVersion newVersion = createMicroserviceVersion(entry.getKey(), entry.getValue()); - newVersion.setInstances(entry.getValue()); - versions.put(entry.getKey(), newVersion); + versions.computeIfAbsent(entry.getKey(), + microserviceId -> createMicroserviceVersion(microserviceId, entry.getValue())) + .setInstances(entry.getValue()); } for (MicroserviceVersionRule microserviceVersionRule : versionRules.values()) { @@ -297,6 +304,32 @@ protected MicroserviceVersionRule createAndInitMicroserviceVersionRule(String st return microserviceVersionRule; } + public void onMicroserviceInstanceChanged(MicroserviceInstanceChangedEvent changedEvent) { + if (!isEventAccept(changedEvent)) { + return; + } + // pull instances always replace old instances, not append + // + // pull result and watch event sequence is not defined even inside SC. + // it's not safe to trust the event, so we just send a new pull request + // + // CREATE/UPDATE: + // if pull 1/2/3, and then add 4, but "add 4" received before pull result, will lost 4. + // DELETE: + // if pull 1/2/3, and then delete 3, but "delete 3" received before pull result, will have wrong 3. + // EXPIRE:: + // black/white config in SC changed, we must refresh all data from sc. + pullInstances(); + } + + protected boolean isEventAccept(MicroserviceInstanceChangedEvent changedEvent) { + return (appId.equals(changedEvent.getKey().getAppId()) && + microserviceName.equals(changedEvent.getKey().getServiceName())) || + microserviceName.equals( + changedEvent.getKey().getAppId() + DefinitionConst.APP_SERVICE_SEPARATOR + changedEvent.getKey() + .getServiceName()); + } + public void destroy() { synchronized (lock) { for (MicroserviceVersion microserviceVersion : versions.values()) { diff --git a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java index fb398bba84..72022f6a18 100644 --- a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java +++ b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java @@ -36,7 +36,6 @@ import org.apache.servicecomb.foundation.common.event.EventManager; import org.apache.servicecomb.foundation.common.utils.BeanUtils; import org.apache.servicecomb.registry.DiscoveryManager; -import org.apache.servicecomb.registry.api.MicroserviceKey; import org.apache.servicecomb.registry.api.event.MicroserviceInstanceRegisteredEvent; import org.apache.servicecomb.registry.api.registry.FindInstancesResponse; import org.apache.servicecomb.registry.api.registry.Microservice; @@ -98,12 +97,7 @@ private static void initAggregateServiceRegistryCache() { executeOnEachServiceRegistry(serviceRegistries::add); aggregateServiceRegistryCache = new AggregateServiceRegistryCache(serviceRegistries); aggregateServiceRegistryCache - .setCacheRefreshedWatcher(refreshedCaches -> { - MicroserviceKey microserviceKey = new MicroserviceKey(); - microserviceKey.setAppId(refreshedCaches.get(0).getKey().getAppId()); - microserviceKey.setServiceName(refreshedCaches.get(0).getKey().getServiceName()); - DiscoveryManager.INSTANCE.getAppManager().onMicroserviceInstancesChanged(microserviceKey); - }); + .setCacheRefreshedWatcher(refreshedCaches -> DiscoveryManager.INSTANCE.getAppManager().pullInstances()); executeOnEachServiceRegistry( serviceRegistry -> serviceRegistry diff --git a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java index 0640a59330..cf32dde8c9 100644 --- a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java +++ b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java @@ -26,6 +26,7 @@ import org.apache.commons.configuration.Configuration; import org.apache.servicecomb.foundation.common.concurrency.SuppressedRunnableWrapper; +import org.apache.servicecomb.registry.DiscoveryManager; import org.apache.servicecomb.registry.api.event.MicroserviceInstanceChangedEvent; import org.apache.servicecomb.registry.api.registry.BasePath; import org.apache.servicecomb.registry.api.registry.Microservice; @@ -276,6 +277,7 @@ public void onMicroserviceInstanceChanged(MicroserviceInstanceChangedEvent chang executorService.execute(new SuppressedRunnableWrapper( () -> { serviceRegistryCache.onMicroserviceInstanceChanged(changedEvent); + DiscoveryManager.INSTANCE.getAppManager().onMicroserviceInstanceChanged(changedEvent); })); } catch (Exception e) { LOGGER.info("instance changed event ignored, {}", e.getMessage()); diff --git a/service-registry/registry-service-center/src/test/java/org/apache/servicecomb/serviceregistry/TestConsumers.java b/service-registry/registry-service-center/src/test/java/org/apache/servicecomb/serviceregistry/TestConsumers.java index 1458cf1e5f..977a5e2f24 100644 --- a/service-registry/registry-service-center/src/test/java/org/apache/servicecomb/serviceregistry/TestConsumers.java +++ b/service-registry/registry-service-center/src/test/java/org/apache/servicecomb/serviceregistry/TestConsumers.java @@ -23,6 +23,7 @@ import org.apache.servicecomb.registry.DiscoveryManager; import org.apache.servicecomb.registry.RegistrationManager; import org.apache.servicecomb.registry.api.MicroserviceKey; +import org.apache.servicecomb.registry.api.event.MicroserviceInstanceChangedEvent; import org.apache.servicecomb.registry.api.registry.Microservice; import org.apache.servicecomb.registry.api.registry.MicroserviceInstance; import org.apache.servicecomb.registry.consumer.MicroserviceVersion; @@ -82,9 +83,12 @@ public void watchDeleteEvent() { mockNotExist(); MicroserviceKey key = new MicroserviceKey(); + MicroserviceInstanceChangedEvent event = new MicroserviceInstanceChangedEvent(); + event.setKey(key); + key.setAppId(appId); key.setServiceName(serviceName); - microserviceManager.onMicroserviceInstancesChanged(key); + eventBus.post(event); long begin = System.currentTimeMillis(); while (microserviceManager.getVersionsByName().size() > 0 && System.currentTimeMillis() - begin < 1000) { Thread.yield();