Skip to content

Commit

Permalink
added enable/disable and the ability to provide custom rules
Browse files Browse the repository at this point in the history
  • Loading branch information
dev-mlb committed Nov 16, 2023
1 parent 29129b5 commit 9e11756
Show file tree
Hide file tree
Showing 11 changed files with 254 additions and 134 deletions.
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package emissary.core;
package emissary.core.sentinel;

import emissary.config.ConfigUtil;
import emissary.config.Configurator;
import emissary.pool.AgentPool;
import emissary.core.Factory;
import emissary.core.IMobileAgent;
import emissary.core.Namespace;
import emissary.core.NamespaceException;
import emissary.core.sentinel.rules.Notify;
import emissary.core.sentinel.rules.Rule;
import emissary.pool.MobileAgentFactory;
import emissary.server.EmissaryServer;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -20,9 +23,6 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static emissary.core.MobileAgent.NO_AGENT_ID;
import static emissary.core.Sentinel.Action.NOTIFY;

/**
* Track mobile agents and log any suspicious behavior
*/
Expand Down Expand Up @@ -51,16 +51,22 @@ public class Sentinel implements Runnable {
// Loop control
protected boolean timeToQuit = false;

protected boolean enabled = false;

/**
* Create a Sentinel - set it running and bind into the {@link Namespace}
*/
public Sentinel() {
configure();
final Thread thread = new Thread(this, DEFAULT_NAMESPACE_NAME);
thread.setPriority(Thread.NORM_PRIORITY);
thread.setDaemon(true);
Namespace.bind(DEFAULT_NAMESPACE_NAME, this);
thread.start();
if (this.enabled) {
final Thread thread = new Thread(this, DEFAULT_NAMESPACE_NAME);
thread.setPriority(Thread.NORM_PRIORITY);
thread.setDaemon(true);
Namespace.bind(DEFAULT_NAMESPACE_NAME, this);
thread.start();
} else {
logger.info("Sentinel is disabled");
}
}

/**
Expand Down Expand Up @@ -117,25 +123,34 @@ protected void configure() {
} catch (IOException e) {
logger.warn("Cannot read Sentinel.cfg, taking default values");
}

if (!this.rules.containsKey(DEFAULT_RULE)) {
this.rules.put(DEFAULT_RULE, new Rule(DEFAULT_RULE, 60L, 1.0, NOTIFY));
}
}

/**
* Initialize rule set
*/
protected void init() {
for (String rule : config.findEntries("RULE")) {
try {
Map<String, String> map = config.findStringMatchMap(rule + "_");
this.rules.put(rule, new Rule(rule, map.get("TIME_LIMIT"), map.get("THRESHOLD"), map.get("ACTION")));
} catch (Exception e) {
logger.warn("Unable to configure Sentinel for: {}", rule);
this.enabled = config.findBooleanEntry("ENABLED", false);
if (this.enabled) {
this.pollingInterval = config.findIntEntry("POLLING_INTERVAL", 5);

logger.trace("Loading rules...");
for (String ruleId : config.findEntries("RULE_ID")) {
try {
Map<String, String> map = config.findStringMatchMap(ruleId + "_");
String rule = map.getOrDefault("RULE", Notify.class.getName());
Rule ruleImpl = (Rule) Factory.create(rule, ruleId, map.get("TIME_LIMIT"), map.get("THRESHOLD"));
this.rules.put(ruleId, ruleImpl);
} catch (Exception e) {
logger.warn("Unable to configure Sentinel for: {}", ruleId);
}
}

// if no rules, create a default one
if (!this.rules.containsKey(DEFAULT_RULE)) {
logger.warn("Default rule not found, creating one...");
this.rules.put(DEFAULT_RULE, new Notify(DEFAULT_RULE, 60L, 1.0));
}
}
this.pollingInterval = config.findIntEntry("POLLING_INTERVAL", 5);
}

/**
Expand All @@ -160,6 +175,7 @@ protected void watch() throws NamespaceException {
* @throws NamespaceException if there is a problem looking up resources in the {@link Namespace}
*/
protected void watch(String agentKey) throws NamespaceException {
logger.trace("Searching for agent [{}]", agentKey);
IMobileAgent mobileAgent = (IMobileAgent) Namespace.lookup(agentKey);
Tracker trackedAgent = tracker.computeIfAbsent(mobileAgent.getName(), Tracker::new);
if (mobileAgent.isInUse()) {
Expand All @@ -172,10 +188,11 @@ protected void watch(String agentKey) throws NamespaceException {
trackedAgent.incrementTimer(pollingInterval);
String placeSimpleName = getPlaceSimpleName(mobileAgent.getLastPlaceProcessed());
counter.put(placeSimpleName, counter.getOrDefault(placeSimpleName, 0) + 1);
logger.debug("Agent acquired {}", trackedAgent);
} else {
trackedAgent.clear();
logger.debug("Agent not in use [{}]", agentKey);
}
logger.debug("{}", trackedAgent);
}

/**
Expand All @@ -184,19 +201,19 @@ protected void watch(String agentKey) throws NamespaceException {
* @throws NamespaceException if there is a problem looking up resources in the {@link Namespace}
*/
protected void check() throws NamespaceException {
logger.debug("MobileAgents in {}", counter);
logger.debug("Checking agents {}", counter);
for (Map.Entry<String, Integer> item : counter.entrySet()) {
Rule rule = rules.getOrDefault(item.getKey(), rules.get(DEFAULT_RULE));
logger.debug("Found {} for {}", rule, item.getKey());
rule.run(item.getKey(), item.getValue());
logger.trace("Found {} for {}", rule, item.getKey());
rule.run(tracker, item.getKey(), item.getValue());
}
}

protected static String getPlaceSimpleName(String lastPlaceProcessed) {
return StringUtils.substringAfterLast(lastPlaceProcessed, "/");
}

static class Tracker {
public static class Tracker {
private final String agentName;
private String agentId;
private String shortName;
Expand All @@ -216,7 +233,7 @@ public String getAgentId() {
}

public void setAgentId(String agentId) {
if (StringUtils.contains(agentId, NO_AGENT_ID)) {
if (StringUtils.contains(agentId, "No_AgentID_Set")) {
this.agentId = "";
this.shortName = "";
} else {
Expand Down Expand Up @@ -277,95 +294,4 @@ public String toString() {
.toString();
}
}

enum Action {
NOTIFY, RECOVER, STOP, KILL, EXIT
}

class Rule {

private final String place;

// how long to wait before alerting on stuck agents
private final long timeLimit;

// percentage of mobile agents that are stuck on the same place before sounding the alarm
private final double threshold;

// what to do in the case of rule hitting limits
private final Action action;

public Rule(String place, long timeLimit, double threshold, Action action) {
this.place = place;
this.timeLimit = timeLimit;
this.threshold = threshold;
this.action = action;
}

public Rule(String place, String timeLimit, String threshold, String action) {
this(place, StringUtils.isBlank(timeLimit) ? 60L : Long.parseLong(timeLimit),
StringUtils.isBlank(threshold) ? 1.0 : Double.parseDouble(threshold),
StringUtils.isBlank(threshold) ? NOTIFY : Action.valueOf(action.toUpperCase()));
}

public String getPlace() {
return place;
}

public long getTimeLimit() {
return timeLimit;
}

public double getThreshold() {
return threshold;
}

public Action getAction() {
return action;
}

public void run(String placeSimpleName, Integer counter) throws NamespaceException {
if (overThreshold(counter) && overTimeLimit(placeSimpleName)) {
if (action == Action.STOP) {
logger.error("Sentinel detected unrecoverable agent(s) running [{}], initiating graceful shut down...", placeSimpleName);
EmissaryServer.stopServer();
} else if (action == Action.KILL) {
logger.error("Sentinel detected unrecoverable agent(s) running [{}], initiating forceful shutdown...", placeSimpleName);
EmissaryServer.stopServerForce();
} else if (action == Action.EXIT) {
logger.error("Sentinel detected unrecoverable agent(s) running [{}], exiting now!!", placeSimpleName);
System.exit(1);
} else if (action == Action.RECOVER) {
logger.warn("Sentinel attempting recovery mode...");
throw new UnsupportedOperationException("Recovery unavailable");
} else {
logger.warn("Sentinel detected locked agent(s) running [{}]", placeSimpleName);
}
}
}

private boolean overThreshold(Integer counter) throws NamespaceException {
int poolSize = AgentPool.lookup().getCurrentPoolSize();
logger.debug("Testing threshold for place={}, counter={}, poolSize={}, threshold={}", getPlace(), counter, poolSize,
getThreshold());
return (double) counter / poolSize >= getThreshold();
}

private boolean overTimeLimit(String placeSimpleName) {
long maxTimeInPlace = tracker.values().stream()
.filter(t -> StringUtils.equalsIgnoreCase(t.getPlaceSimpleName(), placeSimpleName))
.map(Tracker::getTimer)
.max(Comparator.naturalOrder()).orElse(0L);
logger.debug("Testing time limit for place={}, timeLimit={}, maxTimeInPlace={}", getPlace(), maxTimeInPlace, getTimeLimit());
return maxTimeInPlace >= getTimeLimit();
}

@Override
public String toString() {
return new StringJoiner(", ", "Rule:" + place + "[", "]")
.add("timeLimit=" + timeLimit)
.add("threshold=" + threshold)
.toString();
}
}
}
22 changes: 22 additions & 0 deletions src/main/java/emissary/core/sentinel/rules/Exit.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package emissary.core.sentinel.rules;

import emissary.core.sentinel.Sentinel;

import java.util.Map;

public class Exit extends Rule {

public Exit(String place, long timeLimit, double threshold) {
super(place, timeLimit, threshold);
}

public Exit(String place, String timeLimit, String threshold) {
super(place, timeLimit, threshold);
}

@Override
public void action(Map<String, Sentinel.Tracker> tracker, String placeSimpleName, Integer counter) {
logger.error("Sentinel detected unrecoverable agent(s) running [{}], exiting now!!", placeSimpleName);
System.exit(1);
}
}
22 changes: 22 additions & 0 deletions src/main/java/emissary/core/sentinel/rules/Kill.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package emissary.core.sentinel.rules;

import emissary.core.sentinel.Sentinel;
import emissary.server.EmissaryServer;

import java.util.Map;

public class Kill extends Rule {
public Kill(String place, long timeLimit, double threshold) {
super(place, timeLimit, threshold);
}

public Kill(String place, String timeLimit, String threshold) {
super(place, timeLimit, threshold);
}

@Override
public void action(Map<String, Sentinel.Tracker> tracker, String placeSimpleName, Integer counter) {
logger.error("Sentinel detected unrecoverable agent(s) running [{}], initiating forceful shutdown...", placeSimpleName);
EmissaryServer.stopServerForce();
}
}
20 changes: 20 additions & 0 deletions src/main/java/emissary/core/sentinel/rules/Notify.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package emissary.core.sentinel.rules;

import emissary.core.sentinel.Sentinel;

import java.util.Map;

public class Notify extends Rule {
public Notify(String place, long timeLimit, double threshold) {
super(place, timeLimit, threshold);
}

public Notify(String place, String timeLimit, String threshold) {
super(place, timeLimit, threshold);
}

@Override
public void action(Map<String, Sentinel.Tracker> tracker, String placeSimpleName, Integer counter) {
logger.warn("Sentinel detected locked agent(s) running [{}]", placeSimpleName);
}
}
21 changes: 21 additions & 0 deletions src/main/java/emissary/core/sentinel/rules/Recover.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package emissary.core.sentinel.rules;

import emissary.core.sentinel.Sentinel;

import java.util.Map;

public class Recover extends Rule {
public Recover(String place, long timeLimit, double threshold) {
super(place, timeLimit, threshold);
}

public Recover(String place, String timeLimit, String threshold) {
super(place, timeLimit, threshold);
}

@Override
public void action(Map<String, Sentinel.Tracker> tracker, String placeSimpleName, Integer counter) {
logger.warn("Sentinel attempting recovery mode...");
throw new UnsupportedOperationException("Recovery unavailable");
}
}
Loading

0 comments on commit 9e11756

Please sign in to comment.