From 9e1175659999c3097f39ef695220b3ffdf0da9d4 Mon Sep 17 00:00:00 2001 From: dev-mlb <19797865+dev-mlb@users.noreply.github.com> Date: Thu, 16 Nov 2023 11:59:39 -0500 Subject: [PATCH] added enable/disable and the ability to provide custom rules --- .../core/{ => sentinel}/Sentinel.java | 166 +++++------------- .../emissary/core/sentinel/rules/Exit.java | 22 +++ .../emissary/core/sentinel/rules/Kill.java | 22 +++ .../emissary/core/sentinel/rules/Notify.java | 20 +++ .../emissary/core/sentinel/rules/Recover.java | 21 +++ .../emissary/core/sentinel/rules/Rule.java | 87 +++++++++ .../emissary/core/sentinel/rules/Stop.java | 22 +++ .../java/emissary/directory/EmissaryNode.java | 2 +- .../java/emissary/server/EmissaryServer.java | 2 +- src/main/resources/emissary/core/Sentinel.cfg | 12 -- .../emissary/core/sentinel/Sentinel.cfg | 12 ++ 11 files changed, 254 insertions(+), 134 deletions(-) rename src/main/java/emissary/core/{ => sentinel}/Sentinel.java (60%) create mode 100644 src/main/java/emissary/core/sentinel/rules/Exit.java create mode 100644 src/main/java/emissary/core/sentinel/rules/Kill.java create mode 100644 src/main/java/emissary/core/sentinel/rules/Notify.java create mode 100644 src/main/java/emissary/core/sentinel/rules/Recover.java create mode 100644 src/main/java/emissary/core/sentinel/rules/Rule.java create mode 100644 src/main/java/emissary/core/sentinel/rules/Stop.java delete mode 100644 src/main/resources/emissary/core/Sentinel.cfg create mode 100644 src/main/resources/emissary/core/sentinel/Sentinel.cfg diff --git a/src/main/java/emissary/core/Sentinel.java b/src/main/java/emissary/core/sentinel/Sentinel.java similarity index 60% rename from src/main/java/emissary/core/Sentinel.java rename to src/main/java/emissary/core/sentinel/Sentinel.java index 68b10f829f..4ea0fd5e73 100644 --- a/src/main/java/emissary/core/Sentinel.java +++ b/src/main/java/emissary/core/sentinel/Sentinel.java @@ -1,17 +1,20 @@ -package emissary.core; +package emissary.core.sentinel; import emissary.config.ConfigUtil; import emissary.config.Configurator; -import emissary.pool.AgentPool; +import emissary.core.Factory; +import emissary.core.IMobileAgent; +import emissary.core.Namespace; +import emissary.core.NamespaceException; +import emissary.core.sentinel.rules.Notify; +import emissary.core.sentinel.rules.Rule; import emissary.pool.MobileAgentFactory; -import emissary.server.EmissaryServer; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -20,9 +23,6 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static emissary.core.MobileAgent.NO_AGENT_ID; -import static emissary.core.Sentinel.Action.NOTIFY; - /** * Track mobile agents and log any suspicious behavior */ @@ -51,16 +51,22 @@ public class Sentinel implements Runnable { // Loop control protected boolean timeToQuit = false; + protected boolean enabled = false; + /** * Create a Sentinel - set it running and bind into the {@link Namespace} */ public Sentinel() { configure(); - final Thread thread = new Thread(this, DEFAULT_NAMESPACE_NAME); - thread.setPriority(Thread.NORM_PRIORITY); - thread.setDaemon(true); - Namespace.bind(DEFAULT_NAMESPACE_NAME, this); - thread.start(); + if (this.enabled) { + final Thread thread = new Thread(this, DEFAULT_NAMESPACE_NAME); + thread.setPriority(Thread.NORM_PRIORITY); + thread.setDaemon(true); + Namespace.bind(DEFAULT_NAMESPACE_NAME, this); + thread.start(); + } else { + logger.info("Sentinel is disabled"); + } } /** @@ -117,25 +123,34 @@ protected void configure() { } catch (IOException e) { logger.warn("Cannot read Sentinel.cfg, taking default values"); } - - if (!this.rules.containsKey(DEFAULT_RULE)) { - this.rules.put(DEFAULT_RULE, new Rule(DEFAULT_RULE, 60L, 1.0, NOTIFY)); - } } /** * Initialize rule set */ protected void init() { - for (String rule : config.findEntries("RULE")) { - try { - Map map = config.findStringMatchMap(rule + "_"); - this.rules.put(rule, new Rule(rule, map.get("TIME_LIMIT"), map.get("THRESHOLD"), map.get("ACTION"))); - } catch (Exception e) { - logger.warn("Unable to configure Sentinel for: {}", rule); + this.enabled = config.findBooleanEntry("ENABLED", false); + if (this.enabled) { + this.pollingInterval = config.findIntEntry("POLLING_INTERVAL", 5); + + logger.trace("Loading rules..."); + for (String ruleId : config.findEntries("RULE_ID")) { + try { + Map map = config.findStringMatchMap(ruleId + "_"); + String rule = map.getOrDefault("RULE", Notify.class.getName()); + Rule ruleImpl = (Rule) Factory.create(rule, ruleId, map.get("TIME_LIMIT"), map.get("THRESHOLD")); + this.rules.put(ruleId, ruleImpl); + } catch (Exception e) { + logger.warn("Unable to configure Sentinel for: {}", ruleId); + } + } + + // if no rules, create a default one + if (!this.rules.containsKey(DEFAULT_RULE)) { + logger.warn("Default rule not found, creating one..."); + this.rules.put(DEFAULT_RULE, new Notify(DEFAULT_RULE, 60L, 1.0)); } } - this.pollingInterval = config.findIntEntry("POLLING_INTERVAL", 5); } /** @@ -160,6 +175,7 @@ protected void watch() throws NamespaceException { * @throws NamespaceException if there is a problem looking up resources in the {@link Namespace} */ protected void watch(String agentKey) throws NamespaceException { + logger.trace("Searching for agent [{}]", agentKey); IMobileAgent mobileAgent = (IMobileAgent) Namespace.lookup(agentKey); Tracker trackedAgent = tracker.computeIfAbsent(mobileAgent.getName(), Tracker::new); if (mobileAgent.isInUse()) { @@ -172,10 +188,11 @@ protected void watch(String agentKey) throws NamespaceException { trackedAgent.incrementTimer(pollingInterval); String placeSimpleName = getPlaceSimpleName(mobileAgent.getLastPlaceProcessed()); counter.put(placeSimpleName, counter.getOrDefault(placeSimpleName, 0) + 1); + logger.debug("Agent acquired {}", trackedAgent); } else { trackedAgent.clear(); + logger.debug("Agent not in use [{}]", agentKey); } - logger.debug("{}", trackedAgent); } /** @@ -184,11 +201,11 @@ protected void watch(String agentKey) throws NamespaceException { * @throws NamespaceException if there is a problem looking up resources in the {@link Namespace} */ protected void check() throws NamespaceException { - logger.debug("MobileAgents in {}", counter); + logger.debug("Checking agents {}", counter); for (Map.Entry item : counter.entrySet()) { Rule rule = rules.getOrDefault(item.getKey(), rules.get(DEFAULT_RULE)); - logger.debug("Found {} for {}", rule, item.getKey()); - rule.run(item.getKey(), item.getValue()); + logger.trace("Found {} for {}", rule, item.getKey()); + rule.run(tracker, item.getKey(), item.getValue()); } } @@ -196,7 +213,7 @@ protected static String getPlaceSimpleName(String lastPlaceProcessed) { return StringUtils.substringAfterLast(lastPlaceProcessed, "/"); } - static class Tracker { + public static class Tracker { private final String agentName; private String agentId; private String shortName; @@ -216,7 +233,7 @@ public String getAgentId() { } public void setAgentId(String agentId) { - if (StringUtils.contains(agentId, NO_AGENT_ID)) { + if (StringUtils.contains(agentId, "No_AgentID_Set")) { this.agentId = ""; this.shortName = ""; } else { @@ -277,95 +294,4 @@ public String toString() { .toString(); } } - - enum Action { - NOTIFY, RECOVER, STOP, KILL, EXIT - } - - class Rule { - - private final String place; - - // how long to wait before alerting on stuck agents - private final long timeLimit; - - // percentage of mobile agents that are stuck on the same place before sounding the alarm - private final double threshold; - - // what to do in the case of rule hitting limits - private final Action action; - - public Rule(String place, long timeLimit, double threshold, Action action) { - this.place = place; - this.timeLimit = timeLimit; - this.threshold = threshold; - this.action = action; - } - - public Rule(String place, String timeLimit, String threshold, String action) { - this(place, StringUtils.isBlank(timeLimit) ? 60L : Long.parseLong(timeLimit), - StringUtils.isBlank(threshold) ? 1.0 : Double.parseDouble(threshold), - StringUtils.isBlank(threshold) ? NOTIFY : Action.valueOf(action.toUpperCase())); - } - - public String getPlace() { - return place; - } - - public long getTimeLimit() { - return timeLimit; - } - - public double getThreshold() { - return threshold; - } - - public Action getAction() { - return action; - } - - public void run(String placeSimpleName, Integer counter) throws NamespaceException { - if (overThreshold(counter) && overTimeLimit(placeSimpleName)) { - if (action == Action.STOP) { - logger.error("Sentinel detected unrecoverable agent(s) running [{}], initiating graceful shut down...", placeSimpleName); - EmissaryServer.stopServer(); - } else if (action == Action.KILL) { - logger.error("Sentinel detected unrecoverable agent(s) running [{}], initiating forceful shutdown...", placeSimpleName); - EmissaryServer.stopServerForce(); - } else if (action == Action.EXIT) { - logger.error("Sentinel detected unrecoverable agent(s) running [{}], exiting now!!", placeSimpleName); - System.exit(1); - } else if (action == Action.RECOVER) { - logger.warn("Sentinel attempting recovery mode..."); - throw new UnsupportedOperationException("Recovery unavailable"); - } else { - logger.warn("Sentinel detected locked agent(s) running [{}]", placeSimpleName); - } - } - } - - private boolean overThreshold(Integer counter) throws NamespaceException { - int poolSize = AgentPool.lookup().getCurrentPoolSize(); - logger.debug("Testing threshold for place={}, counter={}, poolSize={}, threshold={}", getPlace(), counter, poolSize, - getThreshold()); - return (double) counter / poolSize >= getThreshold(); - } - - private boolean overTimeLimit(String placeSimpleName) { - long maxTimeInPlace = tracker.values().stream() - .filter(t -> StringUtils.equalsIgnoreCase(t.getPlaceSimpleName(), placeSimpleName)) - .map(Tracker::getTimer) - .max(Comparator.naturalOrder()).orElse(0L); - logger.debug("Testing time limit for place={}, timeLimit={}, maxTimeInPlace={}", getPlace(), maxTimeInPlace, getTimeLimit()); - return maxTimeInPlace >= getTimeLimit(); - } - - @Override - public String toString() { - return new StringJoiner(", ", "Rule:" + place + "[", "]") - .add("timeLimit=" + timeLimit) - .add("threshold=" + threshold) - .toString(); - } - } } diff --git a/src/main/java/emissary/core/sentinel/rules/Exit.java b/src/main/java/emissary/core/sentinel/rules/Exit.java new file mode 100644 index 0000000000..db3918600b --- /dev/null +++ b/src/main/java/emissary/core/sentinel/rules/Exit.java @@ -0,0 +1,22 @@ +package emissary.core.sentinel.rules; + +import emissary.core.sentinel.Sentinel; + +import java.util.Map; + +public class Exit extends Rule { + + public Exit(String place, long timeLimit, double threshold) { + super(place, timeLimit, threshold); + } + + public Exit(String place, String timeLimit, String threshold) { + super(place, timeLimit, threshold); + } + + @Override + public void action(Map tracker, String placeSimpleName, Integer counter) { + logger.error("Sentinel detected unrecoverable agent(s) running [{}], exiting now!!", placeSimpleName); + System.exit(1); + } +} diff --git a/src/main/java/emissary/core/sentinel/rules/Kill.java b/src/main/java/emissary/core/sentinel/rules/Kill.java new file mode 100644 index 0000000000..b90869d7e9 --- /dev/null +++ b/src/main/java/emissary/core/sentinel/rules/Kill.java @@ -0,0 +1,22 @@ +package emissary.core.sentinel.rules; + +import emissary.core.sentinel.Sentinel; +import emissary.server.EmissaryServer; + +import java.util.Map; + +public class Kill extends Rule { + public Kill(String place, long timeLimit, double threshold) { + super(place, timeLimit, threshold); + } + + public Kill(String place, String timeLimit, String threshold) { + super(place, timeLimit, threshold); + } + + @Override + public void action(Map tracker, String placeSimpleName, Integer counter) { + logger.error("Sentinel detected unrecoverable agent(s) running [{}], initiating forceful shutdown...", placeSimpleName); + EmissaryServer.stopServerForce(); + } +} diff --git a/src/main/java/emissary/core/sentinel/rules/Notify.java b/src/main/java/emissary/core/sentinel/rules/Notify.java new file mode 100644 index 0000000000..ae2f8a4b66 --- /dev/null +++ b/src/main/java/emissary/core/sentinel/rules/Notify.java @@ -0,0 +1,20 @@ +package emissary.core.sentinel.rules; + +import emissary.core.sentinel.Sentinel; + +import java.util.Map; + +public class Notify extends Rule { + public Notify(String place, long timeLimit, double threshold) { + super(place, timeLimit, threshold); + } + + public Notify(String place, String timeLimit, String threshold) { + super(place, timeLimit, threshold); + } + + @Override + public void action(Map tracker, String placeSimpleName, Integer counter) { + logger.warn("Sentinel detected locked agent(s) running [{}]", placeSimpleName); + } +} diff --git a/src/main/java/emissary/core/sentinel/rules/Recover.java b/src/main/java/emissary/core/sentinel/rules/Recover.java new file mode 100644 index 0000000000..4ad21b1f9b --- /dev/null +++ b/src/main/java/emissary/core/sentinel/rules/Recover.java @@ -0,0 +1,21 @@ +package emissary.core.sentinel.rules; + +import emissary.core.sentinel.Sentinel; + +import java.util.Map; + +public class Recover extends Rule { + public Recover(String place, long timeLimit, double threshold) { + super(place, timeLimit, threshold); + } + + public Recover(String place, String timeLimit, String threshold) { + super(place, timeLimit, threshold); + } + + @Override + public void action(Map tracker, String placeSimpleName, Integer counter) { + logger.warn("Sentinel attempting recovery mode..."); + throw new UnsupportedOperationException("Recovery unavailable"); + } +} diff --git a/src/main/java/emissary/core/sentinel/rules/Rule.java b/src/main/java/emissary/core/sentinel/rules/Rule.java new file mode 100644 index 0000000000..1706a2cab6 --- /dev/null +++ b/src/main/java/emissary/core/sentinel/rules/Rule.java @@ -0,0 +1,87 @@ +package emissary.core.sentinel.rules; + +import emissary.core.NamespaceException; +import emissary.core.sentinel.Sentinel; +import emissary.pool.AgentPool; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.util.Comparator; +import java.util.Map; +import java.util.StringJoiner; + +public abstract class Rule { + + protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final String place; + + // how long to wait before alerting on stuck agents + private final long timeLimit; + + // percentage of mobile agents that are stuck on the same place before sounding the alarm + private final double threshold; + + public Rule(String place, long timeLimit, double threshold) { + this.place = place; + this.timeLimit = timeLimit; + this.threshold = threshold; + logger.trace("Loaded {}", this); + } + + public Rule(String place, String timeLimit, String threshold) { + this(place, StringUtils.isBlank(timeLimit) ? 60L : Long.parseLong(timeLimit), + StringUtils.isBlank(threshold) ? 1.0 : Double.parseDouble(threshold)); + } + + public String getPlace() { + return place; + } + + public long getTimeLimit() { + return timeLimit; + } + + public double getThreshold() { + return threshold; + } + + public void run(Map tracker, String placeSimpleName, Integer counter) throws NamespaceException { + if (condition(tracker, placeSimpleName, counter)) { + action(tracker, placeSimpleName, counter); + } + } + + public boolean condition(Map tracker, String placeSimpleName, Integer counter) throws NamespaceException { + return overThreshold(counter) && overTimeLimit(tracker, placeSimpleName); + } + + public abstract void action(Map tracker, String placeSimpleName, Integer counter); + + protected boolean overThreshold(Integer counter) throws NamespaceException { + int poolSize = AgentPool.lookup().getCurrentPoolSize(); + logger.trace("Testing threshold for place={}, counter={}, poolSize={}, threshold={}", place, counter, poolSize, + threshold); + return (double) counter / poolSize >= getThreshold(); + } + + protected boolean overTimeLimit(Map tracker, String placeSimpleName) { + long maxTimeInPlace = tracker.values().stream() + .filter(t -> StringUtils.equalsIgnoreCase(t.getPlaceSimpleName(), placeSimpleName)) + .map(Sentinel.Tracker::getTimer) + .max(Comparator.naturalOrder()).orElse(0L); + logger.trace("Testing time limit for place={}, timeLimit={}, maxTimeInPlace={}", place, maxTimeInPlace, timeLimit); + return maxTimeInPlace >= getTimeLimit(); + } + + @Override + public String toString() { + return new StringJoiner(", ", "Rule:" + place + "[", "]") + .add("timeLimit=" + timeLimit) + .add("threshold=" + threshold) + .toString(); + } +} diff --git a/src/main/java/emissary/core/sentinel/rules/Stop.java b/src/main/java/emissary/core/sentinel/rules/Stop.java new file mode 100644 index 0000000000..74226ea86f --- /dev/null +++ b/src/main/java/emissary/core/sentinel/rules/Stop.java @@ -0,0 +1,22 @@ +package emissary.core.sentinel.rules; + +import emissary.core.sentinel.Sentinel; +import emissary.server.EmissaryServer; + +import java.util.Map; + +public class Stop extends Rule { + public Stop(String place, long timeLimit, double threshold) { + super(place, timeLimit, threshold); + } + + public Stop(String place, String timeLimit, String threshold) { + super(place, timeLimit, threshold); + } + + @Override + public void action(Map tracker, String placeSimpleName, Integer counter) { + logger.error("Sentinel detected unrecoverable agent(s) running [{}], initiating graceful shut down...", placeSimpleName); + EmissaryServer.stopServer(); + } +} diff --git a/src/main/java/emissary/directory/EmissaryNode.java b/src/main/java/emissary/directory/EmissaryNode.java index fd5c4ce2d5..39df329c90 100644 --- a/src/main/java/emissary/directory/EmissaryNode.java +++ b/src/main/java/emissary/directory/EmissaryNode.java @@ -7,7 +7,7 @@ import emissary.core.EmissaryException; import emissary.core.MetricsManager; import emissary.core.ResourceWatcher; -import emissary.core.Sentinel; +import emissary.core.sentinel.Sentinel; import emissary.pool.AgentPool; import emissary.pool.MobileAgentFactory; import emissary.pool.MoveSpool; diff --git a/src/main/java/emissary/server/EmissaryServer.java b/src/main/java/emissary/server/EmissaryServer.java index 67ae4712d7..20b0c7b6f6 100644 --- a/src/main/java/emissary/server/EmissaryServer.java +++ b/src/main/java/emissary/server/EmissaryServer.java @@ -12,7 +12,7 @@ import emissary.core.Namespace; import emissary.core.NamespaceException; import emissary.core.ResourceWatcher; -import emissary.core.Sentinel; +import emissary.core.sentinel.Sentinel; import emissary.directory.DirectoryPlace; import emissary.directory.EmissaryNode; import emissary.place.IServiceProviderPlace; diff --git a/src/main/resources/emissary/core/Sentinel.cfg b/src/main/resources/emissary/core/Sentinel.cfg deleted file mode 100644 index aa2c52f133..0000000000 --- a/src/main/resources/emissary/core/Sentinel.cfg +++ /dev/null @@ -1,12 +0,0 @@ -POLLING_INTERVAL = 5 - -RULE = "DEFAULT" -DEFAULT_TIME_LIMIT = "60" -DEFAULT_THRESHOLD = "1.0" -DEFAULT_ACTION = "notify" - -# Sample rule -# RULE = "DelayPlace" # the place name to watch -# DelayPlace_TIME_LIMIT = "5" # max time in minutes -# DelayPlace_THRESHOLD = "0.75" # percentage of same places -# DelayPlace_ACTION = "kill" # what to do if rule matches diff --git a/src/main/resources/emissary/core/sentinel/Sentinel.cfg b/src/main/resources/emissary/core/sentinel/Sentinel.cfg new file mode 100644 index 0000000000..23e853a444 --- /dev/null +++ b/src/main/resources/emissary/core/sentinel/Sentinel.cfg @@ -0,0 +1,12 @@ +ENABLED = true +POLLING_INTERVAL = 1 + +RULE_ID = "DEFAULT" +DEFAULT_TIME_LIMIT = "60" +DEFAULT_THRESHOLD = "1.0" + +# Sample rule +# RULE_ID = "DelayPlace" +# DelayPlace_RULE= "emissary.core.sentinel.rules.Kill" +# DelayPlace_TIME_LIMIT = "5" +# DelayPlace_THRESHOLD = "0.75"