Skip to content

Commit

Permalink
[SCB-2804]Fix instance not found when instances frequently change enh…
Browse files Browse the repository at this point in the history
…ancement: keep cache versions
  • Loading branch information
liubao68 committed Oct 7, 2023
1 parent 2fb4b6e commit 7af2380
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;
import org.apache.servicecomb.foundation.common.event.EventManager;
import org.apache.servicecomb.registry.api.MicroserviceKey;
import org.apache.servicecomb.registry.api.event.MicroserviceInstanceChangedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -70,13 +70,13 @@ public MicroserviceVersions getOrCreateMicroserviceVersions(String appId, String
return microserviceManager.getOrCreateMicroserviceVersions(microserviceName);
}

public void onMicroserviceInstancesChanged(MicroserviceKey microserviceKey) {
MicroserviceManager microserviceManager = apps.get(microserviceKey.getAppId());
public void onMicroserviceInstanceChanged(MicroserviceInstanceChangedEvent changedEvent) {
MicroserviceManager microserviceManager = apps.get(changedEvent.getKey().getAppId());
if (microserviceManager == null) {
return;
}

microserviceManager.onMicroserviceInstancesChanged(microserviceKey);
microserviceManager.onMicroserviceInstanceChanged(changedEvent);
}

public void pullInstances() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
package org.apache.servicecomb.registry.consumer;

import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;

import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;
import org.apache.servicecomb.foundation.vertx.executor.SinglePoolBlockingExecutor;
import org.apache.servicecomb.registry.api.MicroserviceKey;
import org.apache.servicecomb.registry.api.event.MicroserviceInstanceChangedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -129,15 +128,12 @@ public void pullInstances() {
/**
* Update instance information triggered by event, called when instance list changed.
*/
public void onMicroserviceInstancesChanged(MicroserviceKey microserviceKey) {
public void onMicroserviceInstanceChanged(MicroserviceInstanceChangedEvent changedEvent) {
synchronized (lock) {
for (Entry<String, MicroserviceVersions> item : versionsByName.entrySet()) {
if (item.getKey().equals(microserviceKey.getServiceName())) {
versionsByName.remove(item.getKey());
item.getValue().destroy();
LOGGER.info("remove microservice version when instance changed, appId={}, microserviceName={}.",
appId, item.getKey());
}
for (MicroserviceVersions microserviceVersions : versionsByName.values()) {
microserviceVersions.onMicroserviceInstanceChanged(changedEvent);

tryRemoveInvalidMicroservice(microserviceVersions);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.servicecomb.registry.DiscoveryManager;
import org.apache.servicecomb.registry.api.event.CreateMicroserviceEvent;
import org.apache.servicecomb.registry.api.event.DestroyMicroserviceEvent;
import org.apache.servicecomb.registry.api.event.MicroserviceInstanceChangedEvent;
import org.apache.servicecomb.registry.api.registry.MicroserviceInstance;
import org.apache.servicecomb.registry.api.registry.MicroserviceInstanceStatus;
import org.apache.servicecomb.registry.api.registry.MicroserviceInstances;
Expand Down Expand Up @@ -232,13 +233,19 @@ private void setInstances(List<MicroserviceInstance> pulledInstances, String rev
synchronized (lock) {
MergedInstances mergedInstances = mergeInstances(pulledInstances, instances);
instances = mergedInstances.instanceIdMap.values();
// clear cache
versions.forEach((key, value) -> value.setInstances(new ArrayList<>()));

// set instances to empty for no instance versions
versions.forEach((key, value) -> {
if (!mergedInstances.microserviceIdMap.containsKey(key)) {
value.setInstances(new ArrayList<>());
}
});

// update instances
for (Entry<String, List<MicroserviceInstance>> entry : mergedInstances.microserviceIdMap.entrySet()) {
// always update microservice versions, because we allow microservice info override, like schema info
MicroserviceVersion newVersion = createMicroserviceVersion(entry.getKey(), entry.getValue());
newVersion.setInstances(entry.getValue());
versions.put(entry.getKey(), newVersion);
versions.computeIfAbsent(entry.getKey(),
microserviceId -> createMicroserviceVersion(microserviceId, entry.getValue()))
.setInstances(entry.getValue());
}

for (MicroserviceVersionRule microserviceVersionRule : versionRules.values()) {
Expand Down Expand Up @@ -297,6 +304,32 @@ protected MicroserviceVersionRule createAndInitMicroserviceVersionRule(String st
return microserviceVersionRule;
}

public void onMicroserviceInstanceChanged(MicroserviceInstanceChangedEvent changedEvent) {
if (!isEventAccept(changedEvent)) {
return;
}
// pull instances always replace old instances, not append
//
// pull result and watch event sequence is not defined even inside SC.
// it's not safe to trust the event, so we just send a new pull request
//
// CREATE/UPDATE:
// if pull 1/2/3, and then add 4, but "add 4" received before pull result, will lost 4.
// DELETE:
// if pull 1/2/3, and then delete 3, but "delete 3" received before pull result, will have wrong 3.
// EXPIRE::
// black/white config in SC changed, we must refresh all data from sc.
pullInstances();
}

protected boolean isEventAccept(MicroserviceInstanceChangedEvent changedEvent) {
return (appId.equals(changedEvent.getKey().getAppId()) &&
microserviceName.equals(changedEvent.getKey().getServiceName())) ||
microserviceName.equals(
changedEvent.getKey().getAppId() + DefinitionConst.APP_SERVICE_SEPARATOR + changedEvent.getKey()
.getServiceName());
}

public void destroy() {
synchronized (lock) {
for (MicroserviceVersion microserviceVersion : versions.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.servicecomb.foundation.common.event.EventManager;
import org.apache.servicecomb.foundation.common.utils.BeanUtils;
import org.apache.servicecomb.registry.DiscoveryManager;
import org.apache.servicecomb.registry.api.MicroserviceKey;
import org.apache.servicecomb.registry.api.event.MicroserviceInstanceRegisteredEvent;
import org.apache.servicecomb.registry.api.registry.FindInstancesResponse;
import org.apache.servicecomb.registry.api.registry.Microservice;
Expand Down Expand Up @@ -98,12 +97,7 @@ private static void initAggregateServiceRegistryCache() {
executeOnEachServiceRegistry(serviceRegistries::add);
aggregateServiceRegistryCache = new AggregateServiceRegistryCache(serviceRegistries);
aggregateServiceRegistryCache
.setCacheRefreshedWatcher(refreshedCaches -> {
MicroserviceKey microserviceKey = new MicroserviceKey();
microserviceKey.setAppId(refreshedCaches.get(0).getKey().getAppId());
microserviceKey.setServiceName(refreshedCaches.get(0).getKey().getServiceName());
DiscoveryManager.INSTANCE.getAppManager().onMicroserviceInstancesChanged(microserviceKey);
});
.setCacheRefreshedWatcher(refreshedCaches -> DiscoveryManager.INSTANCE.getAppManager().pullInstances());

executeOnEachServiceRegistry(
serviceRegistry -> serviceRegistry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.apache.commons.configuration.Configuration;
import org.apache.servicecomb.foundation.common.concurrency.SuppressedRunnableWrapper;
import org.apache.servicecomb.registry.DiscoveryManager;
import org.apache.servicecomb.registry.api.event.MicroserviceInstanceChangedEvent;
import org.apache.servicecomb.registry.api.registry.BasePath;
import org.apache.servicecomb.registry.api.registry.Microservice;
Expand Down Expand Up @@ -276,6 +277,7 @@ public void onMicroserviceInstanceChanged(MicroserviceInstanceChangedEvent chang
executorService.execute(new SuppressedRunnableWrapper(
() -> {
serviceRegistryCache.onMicroserviceInstanceChanged(changedEvent);
DiscoveryManager.INSTANCE.getAppManager().onMicroserviceInstanceChanged(changedEvent);
}));
} catch (Exception e) {
LOGGER.info("instance changed event ignored, {}", e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.servicecomb.registry.DiscoveryManager;
import org.apache.servicecomb.registry.RegistrationManager;
import org.apache.servicecomb.registry.api.MicroserviceKey;
import org.apache.servicecomb.registry.api.event.MicroserviceInstanceChangedEvent;
import org.apache.servicecomb.registry.api.registry.Microservice;
import org.apache.servicecomb.registry.api.registry.MicroserviceInstance;
import org.apache.servicecomb.registry.consumer.MicroserviceVersion;
Expand Down Expand Up @@ -82,9 +83,12 @@ public void watchDeleteEvent() {
mockNotExist();

MicroserviceKey key = new MicroserviceKey();
MicroserviceInstanceChangedEvent event = new MicroserviceInstanceChangedEvent();
event.setKey(key);

key.setAppId(appId);
key.setServiceName(serviceName);
microserviceManager.onMicroserviceInstancesChanged(key);
eventBus.post(event);
long begin = System.currentTimeMillis();
while (microserviceManager.getVersionsByName().size() > 0 && System.currentTimeMillis() - begin < 1000) {
Thread.yield();
Expand Down

0 comments on commit 7af2380

Please sign in to comment.