Skip to content

Commit

Permalink
Optimize discoveryManager to achieve code separation of concerns (#4571)
Browse files Browse the repository at this point in the history
  • Loading branch information
karsonto authored Oct 30, 2024
1 parent 19e2610 commit f76e208
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public NamedThreadFactory(String prefix) {
*/
@Override
public Thread newThread(Runnable r) {
return new Thread(r, prefix + "-" + threadNumber.getAndIncrement());
Thread thread = new Thread(r, prefix + "-" + threadNumber.getAndIncrement());
thread.setDaemon(true);
return thread;
}

public String getPrefix() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.servicecomb.foundation.common.NamedThreadFactory;
import org.apache.servicecomb.foundation.common.cache.VersionedCache;
import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;
import org.apache.servicecomb.foundation.common.utils.LambdaUtils;
Expand Down Expand Up @@ -57,6 +58,8 @@ void onInstancesChanged(String registryName, String application, String serviceN

private final InstancePing ping;

private final HealthCheckTask healthCheckTask = new HealthCheckTask();

// application:serviceName:instanceId
private final Map<String, Map<String, Map<String, StatefulDiscoveryInstance>>>
allInstances = new ConcurrentHashMapEx<>();
Expand All @@ -76,91 +79,13 @@ public DiscoveryManager(List<Discovery<? extends DiscoveryInstance>> discoveryLi
discovery.setInstanceChangedListener(this::onInstancesChanged);
}
this.ping = pings.get(0);
task = Executors.newScheduledThreadPool(1, (runnable) -> {
Thread thread = new Thread(runnable, "discovery-manager-task") {
@Override
public void run() {
try {
runnable.run();
} catch (Throwable e) {
// This would never happen, because Worker will catch Throwable and mute all tasks.
LOGGER.error("discovery manager task error, not allowed please fix. ", e);
}
}
};
thread.setPriority(Thread.MIN_PRIORITY);
thread.setDaemon(true);
return thread;
});
task = Executors.newScheduledThreadPool(1, new NamedThreadFactory("discovery-manager-task"));
}

public Discovery<? extends DiscoveryInstance> getPrimaryDiscovery() {
return this.discoveryList.get(0);
}

private void doTask() {
// doTask can not throw exception or will mute all tasks.
try {
doTaskImpl();
} catch (Throwable e) {
LOGGER.error("discovery manager task error. ", e);
}
}

private void doTaskImpl() {
Map<String, Map<String, List<String>>> removed = new HashMap<>();
for (Entry<String, Map<String, Map<String, StatefulDiscoveryInstance>>> apps : allInstances.entrySet()) {
for (Entry<String, Map<String, StatefulDiscoveryInstance>> services : apps.getValue().entrySet()) {
boolean changed = false;
for (StatefulDiscoveryInstance instance : services.getValue().values()) {
// check isolated time
if (instance.getIsolationStatus() == IsolationStatus.ISOLATED &&
instance.getIsolatedTime() + instance.getIsolateDuration() < System.currentTimeMillis()) {
instance.setIsolationStatus(IsolationStatus.NORMAL);
changed = true;
}
// check ping status
if (System.currentTimeMillis() - instance.getPingTime() > 60_000L) {
boolean pingResult = ping.ping(instance);
if (pingResult && instance.getPingStatus() != PingStatus.OK) {
instance.setPingStatus(PingStatus.OK);
changed = true;
} else if (!pingResult && instance.getPingStatus() != PingStatus.FAIL) {
instance.setPingStatus(PingStatus.FAIL);
changed = true;
}
instance.setPingTime(System.currentTimeMillis());
}
// check unused
if (instance.getHistoryStatus() == HistoryStatus.HISTORY) {
if (instance.getStatus() != MicroserviceInstanceStatus.UP ||
instance.getPingStatus() == PingStatus.FAIL ||
instance.getIsolationStatus() == IsolationStatus.ISOLATED) {
removed.computeIfAbsent(apps.getKey(), k -> new HashMap<>())
.computeIfAbsent(services.getKey(), k -> new ArrayList<>()).add(instance.getInstanceId());
LOGGER.info("Remove instance {}/{}/{}/{}/{}/{}/{}/{}",
apps.getKey(), services.getKey(), instance.getRegistryName(),
instance.getInstanceId(), instance.getHistoryStatus(),
instance.getStatus(), instance.getPingStatus(), instance.getIsolationStatus());
changed = true;
}
}
}
if (changed) {
rebuildVersionCache(apps.getKey(), services.getKey());
}
}
}
// remove unused
for (Entry<String, Map<String, List<String>>> apps : removed.entrySet()) {
for (Entry<String, List<String>> services : apps.getValue().entrySet()) {
for (String instance : services.getValue()) {
allInstances.get(apps.getKey()).get(services.getKey()).remove(instance);
}
}
}
}

private void onInstancesChanged(String application, String serviceName,
List<? extends DiscoveryInstance> instances) {
onInstancesChanged(null, application, serviceName, instances);
Expand Down Expand Up @@ -326,7 +251,7 @@ public void destroy() {
@Override
public void run() {
discoveryList.forEach(LifeCycle::run);
task.scheduleWithFixedDelay(this::doTask, 3, 3, TimeUnit.SECONDS);
task.schedule(healthCheckTask, 3, TimeUnit.SECONDS);
}

@Override
Expand All @@ -345,4 +270,68 @@ public String info() {
});
return result.toString();
}

class HealthCheckTask implements Runnable {

@Override
public void run() {
try {
Map<String, Map<String, List<String>>> removed = new HashMap<>();
for (Entry<String, Map<String, Map<String, StatefulDiscoveryInstance>>> apps : DiscoveryManager.this.allInstances.entrySet()) {
for (Entry<String, Map<String, StatefulDiscoveryInstance>> services : apps.getValue().entrySet()) {
boolean changed = false;
for (StatefulDiscoveryInstance instance : services.getValue().values()) {
// check isolated time
if (instance.getIsolationStatus() == IsolationStatus.ISOLATED &&
instance.getIsolatedTime() + instance.getIsolateDuration() < System.currentTimeMillis()) {
instance.setIsolationStatus(IsolationStatus.NORMAL);
changed = true;
}
// check ping status
if (System.currentTimeMillis() - instance.getPingTime() > 60_000L) {
boolean pingResult = DiscoveryManager.this.ping.ping(instance);
if (pingResult && instance.getPingStatus() != PingStatus.OK) {
instance.setPingStatus(PingStatus.OK);
changed = true;
} else if (!pingResult && instance.getPingStatus() != PingStatus.FAIL) {
instance.setPingStatus(PingStatus.FAIL);
changed = true;
}
instance.setPingTime(System.currentTimeMillis());
}
// check unused
if (instance.getHistoryStatus() == HistoryStatus.HISTORY) {
if (instance.getStatus() != MicroserviceInstanceStatus.UP ||
instance.getPingStatus() == PingStatus.FAIL ||
instance.getIsolationStatus() == IsolationStatus.ISOLATED) {
removed.computeIfAbsent(apps.getKey(), k -> new HashMap<>())
.computeIfAbsent(services.getKey(), k -> new ArrayList<>()).add(instance.getInstanceId());
LOGGER.info("Remove instance {}/{}/{}/{}/{}/{}/{}/{}",
apps.getKey(), services.getKey(), instance.getRegistryName(),
instance.getInstanceId(), instance.getHistoryStatus(),
instance.getStatus(), instance.getPingStatus(), instance.getIsolationStatus());
changed = true;
}
}
}
if (changed) {
rebuildVersionCache(apps.getKey(), services.getKey());
}
}
}
// remove unused
for (Entry<String, Map<String, List<String>>> apps : removed.entrySet()) {
for (Entry<String, List<String>> services : apps.getValue().entrySet()) {
for (String instance : services.getValue()) {
DiscoveryManager.this.allInstances.get(apps.getKey()).get(services.getKey()).remove(instance);
}
}
}
} catch (Throwable e) {
LOGGER.error("discovery manager task error. ", e);
} finally {
DiscoveryManager.this.task.schedule(this, 3, TimeUnit.SECONDS);
}
}
}
}

0 comments on commit f76e208

Please sign in to comment.