diff --git a/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/NamedThreadFactory.java b/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/NamedThreadFactory.java index c6be9c1e0d..3b79c9c8e1 100644 --- a/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/NamedThreadFactory.java +++ b/foundations/foundation-common/src/main/java/org/apache/servicecomb/foundation/common/NamedThreadFactory.java @@ -43,7 +43,9 @@ public NamedThreadFactory(String prefix) { */ @Override public Thread newThread(Runnable r) { - return new Thread(r, prefix + "-" + threadNumber.getAndIncrement()); + Thread thread = new Thread(r, prefix + "-" + threadNumber.getAndIncrement()); + thread.setDaemon(true); + return thread; } public String getPrefix() { diff --git a/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/DiscoveryManager.java b/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/DiscoveryManager.java index e5c6b80476..256b392de2 100644 --- a/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/DiscoveryManager.java +++ b/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/DiscoveryManager.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.servicecomb.foundation.common.NamedThreadFactory; import org.apache.servicecomb.foundation.common.cache.VersionedCache; import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx; import org.apache.servicecomb.foundation.common.utils.LambdaUtils; @@ -57,6 +58,8 @@ void onInstancesChanged(String registryName, String application, String serviceN private final InstancePing ping; + private final HealthCheckTask healthCheckTask = new HealthCheckTask(); + // application:serviceName:instanceId private final Map>> allInstances = new ConcurrentHashMapEx<>(); @@ -76,91 +79,13 @@ public DiscoveryManager(List> discoveryLi discovery.setInstanceChangedListener(this::onInstancesChanged); } this.ping = pings.get(0); - task = Executors.newScheduledThreadPool(1, (runnable) -> { - Thread thread = new Thread(runnable, "discovery-manager-task") { - @Override - public void run() { - try { - runnable.run(); - } catch (Throwable e) { - // This would never happen, because Worker will catch Throwable and mute all tasks. - LOGGER.error("discovery manager task error, not allowed please fix. ", e); - } - } - }; - thread.setPriority(Thread.MIN_PRIORITY); - thread.setDaemon(true); - return thread; - }); + task = Executors.newScheduledThreadPool(1, new NamedThreadFactory("discovery-manager-task")); } public Discovery getPrimaryDiscovery() { return this.discoveryList.get(0); } - private void doTask() { - // doTask can not throw exception or will mute all tasks. - try { - doTaskImpl(); - } catch (Throwable e) { - LOGGER.error("discovery manager task error. ", e); - } - } - - private void doTaskImpl() { - Map>> removed = new HashMap<>(); - for (Entry>> apps : allInstances.entrySet()) { - for (Entry> services : apps.getValue().entrySet()) { - boolean changed = false; - for (StatefulDiscoveryInstance instance : services.getValue().values()) { - // check isolated time - if (instance.getIsolationStatus() == IsolationStatus.ISOLATED && - instance.getIsolatedTime() + instance.getIsolateDuration() < System.currentTimeMillis()) { - instance.setIsolationStatus(IsolationStatus.NORMAL); - changed = true; - } - // check ping status - if (System.currentTimeMillis() - instance.getPingTime() > 60_000L) { - boolean pingResult = ping.ping(instance); - if (pingResult && instance.getPingStatus() != PingStatus.OK) { - instance.setPingStatus(PingStatus.OK); - changed = true; - } else if (!pingResult && instance.getPingStatus() != PingStatus.FAIL) { - instance.setPingStatus(PingStatus.FAIL); - changed = true; - } - instance.setPingTime(System.currentTimeMillis()); - } - // check unused - if (instance.getHistoryStatus() == HistoryStatus.HISTORY) { - if (instance.getStatus() != MicroserviceInstanceStatus.UP || - instance.getPingStatus() == PingStatus.FAIL || - instance.getIsolationStatus() == IsolationStatus.ISOLATED) { - removed.computeIfAbsent(apps.getKey(), k -> new HashMap<>()) - .computeIfAbsent(services.getKey(), k -> new ArrayList<>()).add(instance.getInstanceId()); - LOGGER.info("Remove instance {}/{}/{}/{}/{}/{}/{}/{}", - apps.getKey(), services.getKey(), instance.getRegistryName(), - instance.getInstanceId(), instance.getHistoryStatus(), - instance.getStatus(), instance.getPingStatus(), instance.getIsolationStatus()); - changed = true; - } - } - } - if (changed) { - rebuildVersionCache(apps.getKey(), services.getKey()); - } - } - } - // remove unused - for (Entry>> apps : removed.entrySet()) { - for (Entry> services : apps.getValue().entrySet()) { - for (String instance : services.getValue()) { - allInstances.get(apps.getKey()).get(services.getKey()).remove(instance); - } - } - } - } - private void onInstancesChanged(String application, String serviceName, List instances) { onInstancesChanged(null, application, serviceName, instances); @@ -326,7 +251,7 @@ public void destroy() { @Override public void run() { discoveryList.forEach(LifeCycle::run); - task.scheduleWithFixedDelay(this::doTask, 3, 3, TimeUnit.SECONDS); + task.schedule(healthCheckTask, 3, TimeUnit.SECONDS); } @Override @@ -345,4 +270,68 @@ public String info() { }); return result.toString(); } + + class HealthCheckTask implements Runnable { + + @Override + public void run() { + try { + Map>> removed = new HashMap<>(); + for (Entry>> apps : DiscoveryManager.this.allInstances.entrySet()) { + for (Entry> services : apps.getValue().entrySet()) { + boolean changed = false; + for (StatefulDiscoveryInstance instance : services.getValue().values()) { + // check isolated time + if (instance.getIsolationStatus() == IsolationStatus.ISOLATED && + instance.getIsolatedTime() + instance.getIsolateDuration() < System.currentTimeMillis()) { + instance.setIsolationStatus(IsolationStatus.NORMAL); + changed = true; + } + // check ping status + if (System.currentTimeMillis() - instance.getPingTime() > 60_000L) { + boolean pingResult = DiscoveryManager.this.ping.ping(instance); + if (pingResult && instance.getPingStatus() != PingStatus.OK) { + instance.setPingStatus(PingStatus.OK); + changed = true; + } else if (!pingResult && instance.getPingStatus() != PingStatus.FAIL) { + instance.setPingStatus(PingStatus.FAIL); + changed = true; + } + instance.setPingTime(System.currentTimeMillis()); + } + // check unused + if (instance.getHistoryStatus() == HistoryStatus.HISTORY) { + if (instance.getStatus() != MicroserviceInstanceStatus.UP || + instance.getPingStatus() == PingStatus.FAIL || + instance.getIsolationStatus() == IsolationStatus.ISOLATED) { + removed.computeIfAbsent(apps.getKey(), k -> new HashMap<>()) + .computeIfAbsent(services.getKey(), k -> new ArrayList<>()).add(instance.getInstanceId()); + LOGGER.info("Remove instance {}/{}/{}/{}/{}/{}/{}/{}", + apps.getKey(), services.getKey(), instance.getRegistryName(), + instance.getInstanceId(), instance.getHistoryStatus(), + instance.getStatus(), instance.getPingStatus(), instance.getIsolationStatus()); + changed = true; + } + } + } + if (changed) { + rebuildVersionCache(apps.getKey(), services.getKey()); + } + } + } + // remove unused + for (Entry>> apps : removed.entrySet()) { + for (Entry> services : apps.getValue().entrySet()) { + for (String instance : services.getValue()) { + DiscoveryManager.this.allInstances.get(apps.getKey()).get(services.getKey()).remove(instance); + } + } + } + } catch (Throwable e) { + LOGGER.error("discovery manager task error. ", e); + } finally { + DiscoveryManager.this.task.schedule(this, 3, TimeUnit.SECONDS); + } + } + } }