From 99c3990ea6376b893405e77727e83065941dae32 Mon Sep 17 00:00:00 2001 From: dev-mlb <19797865+dev-mlb@users.noreply.github.com> Date: Thu, 30 Nov 2023 09:17:37 -0500 Subject: [PATCH] protocols --- .../java/emissary/core/sentinel/Sentinel.java | 65 ++++++---- .../core/sentinel/protocols/Protocol.java | 117 +++++++++++------- .../sentinel/protocols/actions/Action.java | 11 +- .../core/sentinel/protocols/actions/Exit.java | 15 +-- .../core/sentinel/protocols/actions/Kill.java | 15 +-- .../sentinel/protocols/actions/Notify.java | 15 +-- .../sentinel/protocols/actions/Recover.java | 18 ++- .../core/sentinel/protocols/actions/Stop.java | 15 +-- .../sentinel/protocols/rules/AllMaxTime.java | 14 +-- .../sentinel/protocols/rules/AnyMaxTime.java | 14 +-- .../core/sentinel/protocols/rules/Rule.java | 45 ++++--- .../emissary/core/sentinel/Sentinel.cfg | 6 +- .../core/sentinel/protocols/DEFAULT.cfg | 14 --- .../sentinel/protocols/ProtocolSample.cfg | 23 ++++ 14 files changed, 209 insertions(+), 178 deletions(-) delete mode 100644 src/main/resources/emissary/core/sentinel/protocols/DEFAULT.cfg create mode 100644 src/main/resources/emissary/core/sentinel/protocols/ProtocolSample.cfg diff --git a/src/main/java/emissary/core/sentinel/Sentinel.java b/src/main/java/emissary/core/sentinel/Sentinel.java index 5e6206e4a0..711eda09d0 100644 --- a/src/main/java/emissary/core/sentinel/Sentinel.java +++ b/src/main/java/emissary/core/sentinel/Sentinel.java @@ -14,16 +14,18 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.StringJoiner; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** - * Track mobile agents and log any suspicious behavior + * Track mobile agents and take action on suspicious behavior */ public class Sentinel implements Runnable { @@ -34,8 +36,10 @@ public class Sentinel implements Runnable { // key: agent name, value: how long Sentinel has observed the mobile agent protected final Map trackers = new ConcurrentHashMap<>(); - protected final Map protocols = new ConcurrentHashMap<>(); + // protocols contain an action to perform when the set of rule conditions are met + protected final Set protocols = new LinkedHashSet<>(); + // the default configuration Sentinel.cfg protected Configurator config; // how many minutes to sleep before checking the mobile agents @@ -44,6 +48,7 @@ public class Sentinel implements Runnable { // Loop control protected boolean timeToQuit = false; + // turn on/off sentinel protected boolean enabled = false; /** @@ -84,8 +89,7 @@ public void quit() { */ @Override public void run() { - logger.info("Sentinel is starting"); - + logger.info("Sentinel is watching"); while (!this.timeToQuit) { // Delay this loop try { @@ -98,12 +102,12 @@ public void run() { } } Namespace.unbind(DEFAULT_NAMESPACE_NAME); - logger.info("Sentinel stopped."); + logger.info("Sentinel stopped"); } @Override public String toString() { - return "Watching agents with " + protocols.values(); + return "Watching agents with " + protocols; } /** @@ -127,22 +131,22 @@ protected void init() { this.pollingInterval = config.findIntEntry("POLLING_INTERVAL_MINUTES", 5); logger.trace("Sentinel protocols initializing..."); - for (Map.Entry proto : config.findStringMatchMap("PROTOCOL_", true, true).entrySet()) { + for (String config : config.findEntries("PROTOCOL")) { try { - String protocolId = proto.getKey(); - String config = proto.getValue(); Protocol protocol = new Protocol(config); - logger.info("Sentinel initiated {}", protocol); if (protocol.isEnabled()) { - this.protocols.put(protocolId, protocol); + logger.info("Sentinel protocol initialized {}", protocol); + this.protocols.add(protocol); + } else { + logger.debug("Sentinel protocol disabled {}", protocol); } } catch (Exception e) { - logger.warn("Unable to configure Sentinel Protocol {}: {}", proto, e.getMessage()); + logger.warn("Unable to configure Sentinel Protocol[{}]: {}", config, e.getMessage()); } } if (this.protocols.isEmpty()) { this.enabled = false; - logger.warn("Sentinel initializing failed: no protocols found"); + logger.warn("Sentinel initialization failed: no protocols found"); } } } @@ -160,7 +164,7 @@ protected void watch() throws NamespaceException { for (String agentKey : agentKeys) { watch(agentKey); } - protocols.values().forEach(protocol -> protocol.run(trackers)); + protocols.forEach(protocol -> protocol.run(trackers)); } /** @@ -174,17 +178,17 @@ protected void watch(String agentKey) throws NamespaceException { IMobileAgent mobileAgent = (IMobileAgent) Namespace.lookup(agentKey); Tracker trackedAgent = trackers.computeIfAbsent(mobileAgent.getName(), Tracker::new); if (mobileAgent.isInUse()) { - if (!Objects.equals(mobileAgent.getLastPlaceProcessed(), trackedAgent.getPlaceName()) - && !Objects.equals(mobileAgent.agentID(), trackedAgent.getAgentId())) { + if (!Objects.equals(mobileAgent.agentID(), trackedAgent.getAgentId()) + || !Objects.equals(mobileAgent.getLastPlaceProcessed(), trackedAgent.getPlaceName())) { trackedAgent.setAgentId(mobileAgent.agentID()); trackedAgent.setPlaceName(mobileAgent.getLastPlaceProcessed()); trackedAgent.resetTimer(); } trackedAgent.incrementTimer(pollingInterval); - logger.debug("Agent acquired {}", trackedAgent); + logger.trace("Agent acquired {}", trackedAgent); } else { trackedAgent.clear(); - logger.debug("Agent not in use [{}]", agentKey); + logger.trace("Agent not in use [{}]", agentKey); } } @@ -214,7 +218,7 @@ public void setAgentId(String agentId) { } else { this.agentId = agentId; if (StringUtils.contains(agentId, "Agent-")) { - this.shortName = StringUtils.substringAfter(StringUtils.substringAfter(agentId, "Agent-"), "-"); + this.shortName = getShortName(agentId); } } } @@ -223,6 +227,10 @@ public String getShortName() { return shortName; } + public static String getShortName(String agentId) { + return StringUtils.substringAfter(StringUtils.substringAfter(agentId, "Agent-"), "-"); + } + public String getPlaceName() { return placeName; } @@ -232,7 +240,19 @@ public void setPlaceName(String placeName) { } public String getPlaceSimpleName() { - return Protocol.getPlaceSimpleName(this.placeName); + return getPlaceSimpleName(this.placeName); + } + + public String getPlaceAndShortName() { + return getPlaceAndShortName(this.placeName, this.shortName); + } + + public static String getPlaceSimpleName(String place) { + return StringUtils.defaultString(StringUtils.substringAfterLast(place, "/"), ""); + } + + public static String getPlaceAndShortName(String place, String shortName) { + return getPlaceSimpleName(place) + "/" + shortName; } public long getTimer() { @@ -260,11 +280,10 @@ public void clear() { @Override public String toString() { - return new StringJoiner(", ", Tracker.class.getSimpleName() + "[", "]") + return new StringJoiner(", ", "[", "]") .add("agentName='" + agentName + "'") - .add("agentId='" + agentId + "'") - .add("shortName='" + shortName + "'") .add("placeName='" + placeName + "'") + .add("shortName='" + shortName + "'") .add("timer=" + timer + " minute(s)") .toString(); } diff --git a/src/main/java/emissary/core/sentinel/protocols/Protocol.java b/src/main/java/emissary/core/sentinel/protocols/Protocol.java index a5baec4d23..727f390dfc 100644 --- a/src/main/java/emissary/core/sentinel/protocols/Protocol.java +++ b/src/main/java/emissary/core/sentinel/protocols/Protocol.java @@ -23,15 +23,22 @@ import java.util.StringJoiner; import java.util.concurrent.ConcurrentHashMap; +import static emissary.core.sentinel.Sentinel.Tracker.getPlaceSimpleName; + +/** + * This protocol buckets places that are running in mobile agents and then looks at max and min time in place and the + * number of agents that are potentially "stuck." After places are bucketed, the place stats are run against the + * configured rules to determine if all conditions are met. Once all rule conditions are met, then the configured action + * will be triggered, i.e. log/notify. + */ public class Protocol { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - protected static final String DEFAULT_RULE = "DEFAULT"; protected Configurator config; protected boolean enabled = false; + protected final Map rules = new ConcurrentHashMap<>(); protected Action action; - protected final Map rules = new ConcurrentHashMap<>(); // key: place, value: rule public Protocol(String conf) { configure(conf); @@ -45,18 +52,20 @@ public boolean isEnabled() { * Run the configured rules over the watched mobile-agents */ public void run(Map trackers) { - Map placeAgentCounts = new ConcurrentHashMap<>(); + + Map placeAgentStats = new ConcurrentHashMap<>(); for (Sentinel.Tracker tracker : trackers.values()) { String placeKey = getPlaceKey(tracker); if (StringUtils.isNotBlank(placeKey)) { - placeAgentCounts.put(placeKey, placeAgentCounts.getOrDefault(placeKey, 0) + 1); + placeAgentStats.put(placeKey, placeAgentStats.getOrDefault(placeKey, new PlaceAgentStats(placeKey)).update(tracker.getTimer())); } } - logger.debug("Checking agents {}", placeAgentCounts); - for (Map.Entry item : placeAgentCounts.entrySet()) { - Rule rule = rules.getOrDefault(item.getKey(), rules.get(DEFAULT_RULE)); - logger.trace("Found {} for {}", rule, item.getKey()); - check(trackers, item.getKey(), item.getValue()); + + if (!placeAgentStats.isEmpty()) { + logger.debug("Running rules on agents {}", placeAgentStats); + if (rules.values().stream().allMatch(rule -> rule.condition(placeAgentStats.values()))) { + action.trigger(trackers); + } } } @@ -70,16 +79,6 @@ public String getPlaceKey(Sentinel.Tracker tracker) { return getPlaceSimpleName(tracker.getPlaceName()); } - /** - * Get the simple name of a place, looks for the position in a string after the last '/' - * - * @param place the place name - * @return the simple place name - */ - public static String getPlaceSimpleName(String place) { - return StringUtils.substringAfterLast(place, "/"); - } - /** * Get the Configurator * @@ -90,7 +89,7 @@ protected void configure(String conf) { this.config = ConfigUtil.getConfigInfo(conf); init(); } catch (IOException e) { - logger.warn("Cannot read " + conf + ", skipping"); + logger.warn("Cannot read " + conf + ", skipping!!"); } } @@ -100,14 +99,18 @@ protected void configure(String conf) { protected void init() { this.enabled = config.findBooleanEntry("ENABLED", false); if (enabled) { + + String action = config.findStringEntry("ACTION", Notify.class.getName()); + this.action = (Action) Factory.create(action); + logger.trace("Loading rules..."); for (String ruleId : config.findEntries("RULE_ID")) { try { - validate(ruleId); Map map = config.findStringMatchMap(ruleId + "_"); String rule = map.getOrDefault("RULE", AllMaxTime.class.getName()); - Rule ruleImpl = (Rule) Factory.create(rule, ruleId, map.get("TIME_LIMIT_MINUTES"), map.get("THRESHOLD")); - logger.debug("Sentinel loaded {}", ruleImpl); + Rule ruleImpl = + (Rule) Factory.create(rule, validate(map.get("PLACE_MATCHER")), map.get("TIME_LIMIT_MINUTES"), map.get("THRESHOLD")); + logger.debug("Sentinel loaded rule {}", ruleImpl); this.rules.put(ruleId, ruleImpl); } catch (Exception e) { logger.warn("Unable to configure Sentinel for {}: {}", ruleId, e.getMessage()); @@ -117,11 +120,7 @@ protected void init() { // if no rules then disable protocol if (this.rules.isEmpty()) { this.enabled = false; - return; } - - String action = config.findStringEntry("ACTION", Notify.class.getName()); - this.action = (Action) Factory.create(action); } } @@ -132,35 +131,57 @@ protected void init() { * @throws NamespaceException if the directory place does not exist * @throws IllegalStateException if the place cannot be found */ - protected void validate(String place) throws NamespaceException { + protected String validate(String place) throws NamespaceException { // validate that the place exists - if (!DEFAULT_RULE.equalsIgnoreCase(place)) { - DirectoryPlace directoryPlace = Namespace.lookup(DirectoryPlace.class).iterator().next(); - if (directoryPlace.getEntries().stream() - .noneMatch(entry -> place.equalsIgnoreCase(KeyManipulator.getServiceClassname(entry.getFullKey())))) { - throw new IllegalStateException("Place not found in the directory"); - } - } - } - - /** - * Check the configured rules against the Sentinel tracking objects - * - * @param trackers the listing of agents, places, and filenames that's currently processing - * @param placeSimpleName the place name currently processing on one or more mobile agents - * @param count number of mobile agents stuck on the place - */ - protected void check(Map trackers, String placeSimpleName, Integer count) { - if (rules.values().stream().allMatch(rule -> rule.condition(trackers, placeSimpleName, count))) { - action.trigger(trackers, placeSimpleName, count); + DirectoryPlace directoryPlace = Namespace.lookup(DirectoryPlace.class).iterator().next(); + if (directoryPlace.getEntries().stream() + .noneMatch(entry -> KeyManipulator.getServiceClassname(entry.getFullKey()).matches(place))) { + throw new IllegalStateException("Place not found in the directory"); } + return place; } @Override public String toString() { - return new StringJoiner(", ", Protocol.class.getSimpleName() + "[", "]") + return new StringJoiner(", ", "[", "]") .add("rules=" + rules) .add("action=" + action) .toString(); } + + public static class PlaceAgentStats { + + private final String place; + private int count; + private long maxTimeInPlace = -1; + private long minTimeInPlace = -1; + + public PlaceAgentStats(String place) { + this.place = place; + } + + public String getPlace() { + return place; + } + + public int getCount() { + return count; + } + + public long getMaxTimeInPlace() { + return maxTimeInPlace; + } + + public long getMinTimeInPlace() { + return minTimeInPlace; + } + + public PlaceAgentStats update(long timer) { + this.count++; + this.minTimeInPlace = this.minTimeInPlace < 0 ? timer : Math.min(this.minTimeInPlace, timer); + this.maxTimeInPlace = Math.max(this.maxTimeInPlace, timer); + return this; + } + } + } diff --git a/src/main/java/emissary/core/sentinel/protocols/actions/Action.java b/src/main/java/emissary/core/sentinel/protocols/actions/Action.java index ceb94fb47f..eed8af13b3 100644 --- a/src/main/java/emissary/core/sentinel/protocols/actions/Action.java +++ b/src/main/java/emissary/core/sentinel/protocols/actions/Action.java @@ -8,7 +8,7 @@ import java.lang.invoke.MethodHandles; import java.util.Map; -public abstract class Action { +public class Action { protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -16,13 +16,14 @@ public abstract class Action { * Take action when rule conditions are met * * @param trackers the listing of agents, places, and filenames that's currently processing - * @param placeSimpleName the place name currently processing on one or more mobile agents - * @param count number of mobile agents stuck on the place */ - public abstract void trigger(Map trackers, String placeSimpleName, Integer count); + public void trigger(Map trackers) { + + } @Override public String toString() { - return getClass().getName(); + return getClass().getSimpleName(); } + } diff --git a/src/main/java/emissary/core/sentinel/protocols/actions/Exit.java b/src/main/java/emissary/core/sentinel/protocols/actions/Exit.java index 3c8897cd22..01c5c52c6b 100644 --- a/src/main/java/emissary/core/sentinel/protocols/actions/Exit.java +++ b/src/main/java/emissary/core/sentinel/protocols/actions/Exit.java @@ -4,19 +4,14 @@ import java.util.Map; +/** + * Try to terminate the JVM + */ public class Exit extends Action { - /** - * Try to terminate the JVM - * - * @param trackers the listing of agents, places, and filenames that's currently processing - * @param placeSimpleName the place name currently processing on one or more mobile agents - * @param count number of mobile agents stuck on the place - */ @Override - public void trigger(Map trackers, String placeSimpleName, Integer count) { - logger.error("Sentinel detected {} unrecoverable agent(s) running [{}], exiting now!!", count, placeSimpleName); - logger.debug("{}", trackers); + public void trigger(Map trackers) { + logger.error("Sentinel detected unrecoverable agents {}, exiting now!!", trackers.values()); System.exit(1); } } diff --git a/src/main/java/emissary/core/sentinel/protocols/actions/Kill.java b/src/main/java/emissary/core/sentinel/protocols/actions/Kill.java index a51e9dc4f7..145796e88c 100644 --- a/src/main/java/emissary/core/sentinel/protocols/actions/Kill.java +++ b/src/main/java/emissary/core/sentinel/protocols/actions/Kill.java @@ -5,19 +5,14 @@ import java.util.Map; +/** + * Force a shutdown of the system + */ public class Kill extends Action { - /** - * Force a shutdown of the system - * - * @param trackers the listing of agents, places, and filenames that's currently processing - * @param placeSimpleName the place name currently processing on one or more mobile agents - * @param count number of mobile agents stuck on the place - */ @Override - public void trigger(Map trackers, String placeSimpleName, Integer count) { - logger.error("Sentinel detected {} unrecoverable agent(s) running [{}], initiating forceful shutdown...", count, placeSimpleName); - logger.debug("{}", trackers); + public void trigger(Map trackers) { + logger.error("Sentinel detected unrecoverable agents {}, initiating forceful shutdown...", trackers.values()); EmissaryServer.stopServerForce(); } } diff --git a/src/main/java/emissary/core/sentinel/protocols/actions/Notify.java b/src/main/java/emissary/core/sentinel/protocols/actions/Notify.java index 3b968330dc..d4a2c3683d 100644 --- a/src/main/java/emissary/core/sentinel/protocols/actions/Notify.java +++ b/src/main/java/emissary/core/sentinel/protocols/actions/Notify.java @@ -4,18 +4,13 @@ import java.util.Map; +/** + * Log the problem agents/threads + */ public class Notify extends Action { - /** - * Log the problem agents/threads - * - * @param trackers the listing of agents, places, and filenames that's currently processing - * @param placeSimpleName the place name currently processing on one or more mobile agents - * @param count number of mobile agents stuck on the place - */ @Override - public void trigger(Map trackers, String placeSimpleName, Integer count) { - logger.warn("Sentinel detected {} locked agent(s) running [{}]", count, placeSimpleName); - logger.debug("{}", trackers); + public void trigger(Map trackers) { + logger.warn("Sentinel detected locked agents {}", trackers.values()); } } diff --git a/src/main/java/emissary/core/sentinel/protocols/actions/Recover.java b/src/main/java/emissary/core/sentinel/protocols/actions/Recover.java index d44475caf4..0b652baf8d 100644 --- a/src/main/java/emissary/core/sentinel/protocols/actions/Recover.java +++ b/src/main/java/emissary/core/sentinel/protocols/actions/Recover.java @@ -8,21 +8,18 @@ import java.util.Map; import java.util.stream.Collectors; +/** + * Attempts to recover the mobile agents by interrupting the thread + */ public class Recover extends Action { - /** - * Attempts to recover the mobile agents by interrupting the thread - * - * @param tracker the listing of agents, places, and filenames that's currently processing - * @param placeSimpleName the place name currently processing on one or more mobile agents - * @param counter number of mobile agents stuck on the place - */ @Override - public void trigger(Map tracker, String placeSimpleName, Integer counter) { - logger.warn("Sentinel detected {} locked agent(s) running [{}], attempting recovery...", counter, placeSimpleName); + public void trigger(Map tracker) { + logger.warn("Sentinel detected locked agents, attempting recovery..."); List agentNames = tracker.values().stream() - .filter(t -> t.getPlaceSimpleName().equalsIgnoreCase(placeSimpleName)) + // .filter(t -> t.getPlaceSimpleName().equalsIgnoreCase(placeSimpleName)) .map(Sentinel.Tracker::getAgentName) + .sorted() .collect(Collectors.toList()); for (String agentName : agentNames) { @@ -35,4 +32,5 @@ public void trigger(Map tracker, String placeSimpleNam } } } + } diff --git a/src/main/java/emissary/core/sentinel/protocols/actions/Stop.java b/src/main/java/emissary/core/sentinel/protocols/actions/Stop.java index 0c6c3d7caf..41f5ef0b8b 100644 --- a/src/main/java/emissary/core/sentinel/protocols/actions/Stop.java +++ b/src/main/java/emissary/core/sentinel/protocols/actions/Stop.java @@ -5,19 +5,14 @@ import java.util.Map; +/** + * Attempt a graceful shutdown of the system + */ public class Stop extends Action { - /** - * Attempt a graceful shutdown of the system - * - * @param trackers the listing of agents, places, and filenames that's currently processing - * @param placeSimpleName the place name currently processing on one or more mobile agents - * @param count number of mobile agents stuck on the place - */ @Override - public void trigger(Map trackers, String placeSimpleName, Integer count) { - logger.error("Sentinel detected {} unrecoverable agent(s) running [{}], initiating graceful shutdown...", count, placeSimpleName); - logger.debug("{}", trackers); + public void trigger(Map trackers) { + logger.error("Sentinel detected unrecoverable agents {}, initiating graceful shutdown...", trackers.values()); EmissaryServer.stopServer(); } } diff --git a/src/main/java/emissary/core/sentinel/protocols/rules/AllMaxTime.java b/src/main/java/emissary/core/sentinel/protocols/rules/AllMaxTime.java index 2a020d8b55..fdc6da1092 100644 --- a/src/main/java/emissary/core/sentinel/protocols/rules/AllMaxTime.java +++ b/src/main/java/emissary/core/sentinel/protocols/rules/AllMaxTime.java @@ -1,13 +1,11 @@ package emissary.core.sentinel.protocols.rules; -import emissary.core.sentinel.Sentinel; +import emissary.core.sentinel.protocols.Protocol; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; -import java.util.Map; /** * Looks at the place that has been running for the least amount of time. @@ -27,14 +25,12 @@ public AllMaxTime(String place, String timeLimit, String threshold) { /** * Check to see if ALL places in mobile agents are over the configured time limit. * - * @param trackers the listing of agents, places, and filenames that's currently processing - * @param placeSimpleName the place name currently processing on one or more mobile agents + * @param placeAgentStats the stats of a place that is currently processing * @return true if any places in mobile agents are over the configured time limit, false otherwise */ - protected boolean overTimeLimit(Map trackers, String placeSimpleName) { - return trackers.values().stream() - .filter(t -> StringUtils.equalsIgnoreCase(t.getPlaceSimpleName(), placeSimpleName)) - .allMatch(tracker -> tracker.getTimer() >= this.timeLimit); + protected boolean overTimeLimit(Protocol.PlaceAgentStats placeAgentStats) { + logger.debug("Testing timeLimit for place={}, minTime={}, timeLimit={}", place, placeAgentStats.getMinTimeInPlace(), timeLimit); + return placeAgentStats.getMinTimeInPlace() >= this.timeLimit; } } diff --git a/src/main/java/emissary/core/sentinel/protocols/rules/AnyMaxTime.java b/src/main/java/emissary/core/sentinel/protocols/rules/AnyMaxTime.java index 499fe45bd5..b2236593e1 100644 --- a/src/main/java/emissary/core/sentinel/protocols/rules/AnyMaxTime.java +++ b/src/main/java/emissary/core/sentinel/protocols/rules/AnyMaxTime.java @@ -1,13 +1,11 @@ package emissary.core.sentinel.protocols.rules; -import emissary.core.sentinel.Sentinel; +import emissary.core.sentinel.protocols.Protocol; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; -import java.util.Map; /** * Looks at the place that has been running for the most amount of time. @@ -27,14 +25,12 @@ public AnyMaxTime(String place, String timeLimit, String threshold) { /** * Check to see if ANY places in mobile agents are over the configured time limit * - * @param trackers the listing of agents, places, and filenames that's currently processing - * @param placeSimpleName the place name currently processing on one or more mobile agents + * @param placeAgentStats the stats of a place that is currently processing * @return true if any places in mobile agents are over the configured time limit, false otherwise */ - protected boolean overTimeLimit(Map trackers, String placeSimpleName) { - return trackers.values().stream() - .filter(t -> StringUtils.equalsIgnoreCase(t.getPlaceSimpleName(), placeSimpleName)) - .anyMatch(tracker -> tracker.getTimer() >= this.timeLimit); + protected boolean overTimeLimit(Protocol.PlaceAgentStats placeAgentStats) { + logger.debug("Testing timeLimit for place={}, maxTime={}, timeLimit={}", place, placeAgentStats.getMaxTimeInPlace(), timeLimit); + return placeAgentStats.getMaxTimeInPlace() >= this.timeLimit; } } diff --git a/src/main/java/emissary/core/sentinel/protocols/rules/Rule.java b/src/main/java/emissary/core/sentinel/protocols/rules/Rule.java index 6162592743..fba6094156 100644 --- a/src/main/java/emissary/core/sentinel/protocols/rules/Rule.java +++ b/src/main/java/emissary/core/sentinel/protocols/rules/Rule.java @@ -1,7 +1,7 @@ package emissary.core.sentinel.protocols.rules; import emissary.core.NamespaceException; -import emissary.core.sentinel.Sentinel; +import emissary.core.sentinel.protocols.Protocol; import emissary.pool.AgentPool; import org.apache.commons.lang3.StringUtils; @@ -9,14 +9,16 @@ import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; -import java.util.Map; +import java.util.Collection; import java.util.StringJoiner; +import java.util.regex.Pattern; public abstract class Rule { protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - protected final String place; + // the place name to test the condition + protected final Pattern place; // how long to wait before alerting on stuck agents protected final long timeLimit; @@ -26,7 +28,7 @@ public abstract class Rule { public Rule(String place, long timeLimit, double threshold) { logger.trace("Creating rule for place={}, timeLimit={}, threshold={}", place, timeLimit, threshold); - this.place = place; + this.place = Pattern.compile(place); this.timeLimit = timeLimit; this.threshold = threshold; } @@ -39,26 +41,33 @@ public Rule(String place, String timeLimit, String threshold) { /** * Check the rule conditions * - * @param trackers the listing of agents, places, and filenames that's currently processing - * @param placeSimpleName the place name currently processing on one or more mobile agents - * @param count number of mobile agents stuck on the place + * @param placeAgentStats collection of the stats of a place that is currently processing * @return true if conditions are met, false otherwise */ - public boolean condition(Map trackers, String placeSimpleName, Integer count) { - return overThreshold(count) && overTimeLimit(trackers, placeSimpleName); + public boolean condition(Collection placeAgentStats) { + return placeAgentStats.stream().filter(p -> place.matcher(p.getPlace()).matches()).anyMatch(p -> overThreshold(p) && overTimeLimit(p)); } /** * Check to see if the number of places in mobile agents are over the configured threshold - * - * @param count number of mobile agents stuck on the place + * + * @param placeAgentStats the stats of a place that is currently processing * @return true if the number of mobile agents stuck on the place is over the threshold, false otherwise */ - protected boolean overThreshold(Integer count) { + protected boolean overThreshold(Protocol.PlaceAgentStats placeAgentStats) { + int poolSize = getAgentCount(); + logger.debug("Testing threshold for place={}, counter={}, poolSize={}, threshold={}", place, placeAgentStats.getCount(), poolSize, threshold); + return (double) placeAgentStats.getCount() / poolSize >= this.threshold; + } + + /** + * Get the total number of agents, idle and active. Override this method to + * + * @return the total number of agents + */ + protected int getAgentCount() { try { - int poolSize = AgentPool.lookup().getCurrentPoolSize(); - logger.trace("Testing threshold for place={}, counter={}, poolSize={}, threshold={}", place, count, poolSize, threshold); - return (double) count / poolSize >= this.threshold; + return AgentPool.lookup().getCurrentPoolSize(); } catch (NamespaceException ne) { throw new IllegalStateException(ne); } @@ -67,11 +76,10 @@ protected boolean overThreshold(Integer count) { /** * Check to see if the places in mobile agents are over the configured time limit * - * @param trackers the listing of agents, places, and filenames that's currently processing - * @param placeSimpleName the place name currently processing on one or more mobile agents + * @param placeAgentStats the stats of a place that is currently processing * @return true if the places in mobile agents are over the configured time limit, false otherwise */ - protected abstract boolean overTimeLimit(Map trackers, String placeSimpleName); + protected abstract boolean overTimeLimit(Protocol.PlaceAgentStats placeAgentStats); @Override public String toString() { @@ -81,4 +89,5 @@ public String toString() { .add("threshold=" + threshold) .toString(); } + } diff --git a/src/main/resources/emissary/core/sentinel/Sentinel.cfg b/src/main/resources/emissary/core/sentinel/Sentinel.cfg index 7ac8ef6878..d52d0fdf3f 100644 --- a/src/main/resources/emissary/core/sentinel/Sentinel.cfg +++ b/src/main/resources/emissary/core/sentinel/Sentinel.cfg @@ -1,4 +1,6 @@ ENABLED = false -POLLING_INTERVAL_MINUTES = 5 -PROTOCOL_DEFAULT = emissary.core.sentinel.protocols.DEFAULT.cfg +POLLING_INTERVAL_MINUTES = 10 + +PROTOCOL = emissary.core.sentinel.protocols.ProtocolSample.cfg +#PROTOCOL = emissary.core.sentinel.protocols.AnotherProtocol.cfg diff --git a/src/main/resources/emissary/core/sentinel/protocols/DEFAULT.cfg b/src/main/resources/emissary/core/sentinel/protocols/DEFAULT.cfg deleted file mode 100644 index a24b6bf9cf..0000000000 --- a/src/main/resources/emissary/core/sentinel/protocols/DEFAULT.cfg +++ /dev/null @@ -1,14 +0,0 @@ -ENABLED = false - -ACTION = "emissary.core.sentinel.protocols.actions.Notify" - -RULE_ID = "DEFAULT" -DEFAULT_TIME_LIMIT_MINUTES = "60" -DEFAULT_THRESHOLD = "1.0" - -# Sample rule -# RULE_ID = "DelayPlace" -# DelayPlace_RULE = "emissary.core.sentinel.protocols.rules.AnyMaxTime" -# DelayPlace_TIME_LIMIT_MINUTES = "1" -# DelayPlace_THRESHOLD = "0.75" - diff --git a/src/main/resources/emissary/core/sentinel/protocols/ProtocolSample.cfg b/src/main/resources/emissary/core/sentinel/protocols/ProtocolSample.cfg new file mode 100644 index 0000000000..f8fe4b1864 --- /dev/null +++ b/src/main/resources/emissary/core/sentinel/protocols/ProtocolSample.cfg @@ -0,0 +1,23 @@ +ENABLED = false + +# Default action is Notify +# ACTION = "emissary.core.sentinel.protocols.actions.Notify" + +# Sample rules + +# RULE_ID = "LONG_RUNNING" +# LONG_RUNNING_PLACE_MATCHER = ".*" +# LONG_RUNNING_TIME_LIMIT_MINUTES = "60" +# LONG_RUNNING_THRESHOLD = "1.0" + +# RULE_ID = "LONG_RUNNING2" +# LONG_RUNNING2_PLACE_MATCHER = ".*" +# DELAY_RULE = "emissary.core.sentinel.protocols.rules.AnyMaxTime" +# LONG_RUNNING2_TIME_LIMIT_MINUTES = "120" +# LONG_RUNNING2_THRESHOLD = "0.5" + +# RULE_ID = "DELAY" +# DELAY_PLACE_MATCHER = "DelayPlace" +# DELAY_RULE = "emissary.core.sentinel.protocols.rules.AllMaxTime" +# DELAY_TIME_LIMIT_MINUTES = "5" +# DELAY_THRESHOLD = "0.75"