Skip to content

Commit

Permalink
protocols
Browse files Browse the repository at this point in the history
  • Loading branch information
dev-mlb committed Nov 30, 2023
1 parent eba0aa6 commit 99c3990
Show file tree
Hide file tree
Showing 14 changed files with 209 additions and 178 deletions.
65 changes: 42 additions & 23 deletions src/main/java/emissary/core/sentinel/Sentinel.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* Track mobile agents and log any suspicious behavior
* Track mobile agents and take action on suspicious behavior
*/
public class Sentinel implements Runnable {

Expand All @@ -34,8 +36,10 @@ public class Sentinel implements Runnable {
// key: agent name, value: how long Sentinel has observed the mobile agent
protected final Map<String, Tracker> trackers = new ConcurrentHashMap<>();

protected final Map<String, Protocol> protocols = new ConcurrentHashMap<>();
// protocols contain an action to perform when the set of rule conditions are met
protected final Set<Protocol> protocols = new LinkedHashSet<>();

// the default configuration Sentinel.cfg
protected Configurator config;

// how many minutes to sleep before checking the mobile agents
Expand All @@ -44,6 +48,7 @@ public class Sentinel implements Runnable {
// Loop control
protected boolean timeToQuit = false;

// turn on/off sentinel
protected boolean enabled = false;

/**
Expand Down Expand Up @@ -84,8 +89,7 @@ public void quit() {
*/
@Override
public void run() {
logger.info("Sentinel is starting");

logger.info("Sentinel is watching");
while (!this.timeToQuit) {
// Delay this loop
try {
Expand All @@ -98,12 +102,12 @@ public void run() {
}
}
Namespace.unbind(DEFAULT_NAMESPACE_NAME);
logger.info("Sentinel stopped.");
logger.info("Sentinel stopped");
}

@Override
public String toString() {
return "Watching agents with " + protocols.values();
return "Watching agents with " + protocols;
}

/**
Expand All @@ -127,22 +131,22 @@ protected void init() {
this.pollingInterval = config.findIntEntry("POLLING_INTERVAL_MINUTES", 5);

logger.trace("Sentinel protocols initializing...");
for (Map.Entry<String, String> proto : config.findStringMatchMap("PROTOCOL_", true, true).entrySet()) {
for (String config : config.findEntries("PROTOCOL")) {
try {
String protocolId = proto.getKey();
String config = proto.getValue();
Protocol protocol = new Protocol(config);
logger.info("Sentinel initiated {}", protocol);
if (protocol.isEnabled()) {
this.protocols.put(protocolId, protocol);
logger.info("Sentinel protocol initialized {}", protocol);
this.protocols.add(protocol);
} else {
logger.debug("Sentinel protocol disabled {}", protocol);
}
} catch (Exception e) {
logger.warn("Unable to configure Sentinel Protocol {}: {}", proto, e.getMessage());
logger.warn("Unable to configure Sentinel Protocol[{}]: {}", config, e.getMessage());
}
}
if (this.protocols.isEmpty()) {
this.enabled = false;
logger.warn("Sentinel initializing failed: no protocols found");
logger.warn("Sentinel initialization failed: no protocols found");
}
}
}
Expand All @@ -160,7 +164,7 @@ protected void watch() throws NamespaceException {
for (String agentKey : agentKeys) {
watch(agentKey);
}
protocols.values().forEach(protocol -> protocol.run(trackers));
protocols.forEach(protocol -> protocol.run(trackers));
}

/**
Expand All @@ -174,17 +178,17 @@ protected void watch(String agentKey) throws NamespaceException {
IMobileAgent mobileAgent = (IMobileAgent) Namespace.lookup(agentKey);
Tracker trackedAgent = trackers.computeIfAbsent(mobileAgent.getName(), Tracker::new);
if (mobileAgent.isInUse()) {
if (!Objects.equals(mobileAgent.getLastPlaceProcessed(), trackedAgent.getPlaceName())
&& !Objects.equals(mobileAgent.agentID(), trackedAgent.getAgentId())) {
if (!Objects.equals(mobileAgent.agentID(), trackedAgent.getAgentId())
|| !Objects.equals(mobileAgent.getLastPlaceProcessed(), trackedAgent.getPlaceName())) {
trackedAgent.setAgentId(mobileAgent.agentID());
trackedAgent.setPlaceName(mobileAgent.getLastPlaceProcessed());
trackedAgent.resetTimer();
}
trackedAgent.incrementTimer(pollingInterval);
logger.debug("Agent acquired {}", trackedAgent);
logger.trace("Agent acquired {}", trackedAgent);
} else {
trackedAgent.clear();
logger.debug("Agent not in use [{}]", agentKey);
logger.trace("Agent not in use [{}]", agentKey);
}
}

Expand Down Expand Up @@ -214,7 +218,7 @@ public void setAgentId(String agentId) {
} else {
this.agentId = agentId;
if (StringUtils.contains(agentId, "Agent-")) {
this.shortName = StringUtils.substringAfter(StringUtils.substringAfter(agentId, "Agent-"), "-");
this.shortName = getShortName(agentId);
}
}
}
Expand All @@ -223,6 +227,10 @@ public String getShortName() {
return shortName;
}

public static String getShortName(String agentId) {
return StringUtils.substringAfter(StringUtils.substringAfter(agentId, "Agent-"), "-");
}

public String getPlaceName() {
return placeName;
}
Expand All @@ -232,7 +240,19 @@ public void setPlaceName(String placeName) {
}

public String getPlaceSimpleName() {
return Protocol.getPlaceSimpleName(this.placeName);
return getPlaceSimpleName(this.placeName);
}

public String getPlaceAndShortName() {
return getPlaceAndShortName(this.placeName, this.shortName);
}

public static String getPlaceSimpleName(String place) {
return StringUtils.defaultString(StringUtils.substringAfterLast(place, "/"), "");
}

public static String getPlaceAndShortName(String place, String shortName) {
return getPlaceSimpleName(place) + "/" + shortName;
}

public long getTimer() {
Expand Down Expand Up @@ -260,11 +280,10 @@ public void clear() {

@Override
public String toString() {
return new StringJoiner(", ", Tracker.class.getSimpleName() + "[", "]")
return new StringJoiner(", ", "[", "]")
.add("agentName='" + agentName + "'")
.add("agentId='" + agentId + "'")
.add("shortName='" + shortName + "'")
.add("placeName='" + placeName + "'")
.add("shortName='" + shortName + "'")
.add("timer=" + timer + " minute(s)")
.toString();
}
Expand Down
117 changes: 69 additions & 48 deletions src/main/java/emissary/core/sentinel/protocols/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,22 @@
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;

import static emissary.core.sentinel.Sentinel.Tracker.getPlaceSimpleName;

/**
* This protocol buckets places that are running in mobile agents and then looks at max and min time in place and the
* number of agents that are potentially "stuck." After places are bucketed, the place stats are run against the
* configured rules to determine if all conditions are met. Once all rule conditions are met, then the configured action
* will be triggered, i.e. log/notify.
*/
public class Protocol {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

protected static final String DEFAULT_RULE = "DEFAULT";
protected Configurator config;
protected boolean enabled = false;
protected final Map<String, Rule> rules = new ConcurrentHashMap<>();
protected Action action;
protected final Map<String, Rule> rules = new ConcurrentHashMap<>(); // key: place, value: rule

public Protocol(String conf) {
configure(conf);
Expand All @@ -45,18 +52,20 @@ public boolean isEnabled() {
* Run the configured rules over the watched mobile-agents
*/
public void run(Map<String, Sentinel.Tracker> trackers) {
Map<String, Integer> placeAgentCounts = new ConcurrentHashMap<>();

Map<String, PlaceAgentStats> placeAgentStats = new ConcurrentHashMap<>();
for (Sentinel.Tracker tracker : trackers.values()) {
String placeKey = getPlaceKey(tracker);
if (StringUtils.isNotBlank(placeKey)) {
placeAgentCounts.put(placeKey, placeAgentCounts.getOrDefault(placeKey, 0) + 1);
placeAgentStats.put(placeKey, placeAgentStats.getOrDefault(placeKey, new PlaceAgentStats(placeKey)).update(tracker.getTimer()));
}
}
logger.debug("Checking agents {}", placeAgentCounts);
for (Map.Entry<String, Integer> item : placeAgentCounts.entrySet()) {
Rule rule = rules.getOrDefault(item.getKey(), rules.get(DEFAULT_RULE));
logger.trace("Found {} for {}", rule, item.getKey());
check(trackers, item.getKey(), item.getValue());

if (!placeAgentStats.isEmpty()) {
logger.debug("Running rules on agents {}", placeAgentStats);
if (rules.values().stream().allMatch(rule -> rule.condition(placeAgentStats.values()))) {
action.trigger(trackers);
}
}
}

Expand All @@ -70,16 +79,6 @@ public String getPlaceKey(Sentinel.Tracker tracker) {
return getPlaceSimpleName(tracker.getPlaceName());
}

/**
* Get the simple name of a place, looks for the position in a string after the last '/'
*
* @param place the place name
* @return the simple place name
*/
public static String getPlaceSimpleName(String place) {
return StringUtils.substringAfterLast(place, "/");
}

/**
* Get the Configurator
*
Expand All @@ -90,7 +89,7 @@ protected void configure(String conf) {
this.config = ConfigUtil.getConfigInfo(conf);
init();
} catch (IOException e) {
logger.warn("Cannot read " + conf + ", skipping");
logger.warn("Cannot read " + conf + ", skipping!!");
}
}

Expand All @@ -100,14 +99,18 @@ protected void configure(String conf) {
protected void init() {
this.enabled = config.findBooleanEntry("ENABLED", false);
if (enabled) {

String action = config.findStringEntry("ACTION", Notify.class.getName());
this.action = (Action) Factory.create(action);

logger.trace("Loading rules...");
for (String ruleId : config.findEntries("RULE_ID")) {
try {
validate(ruleId);
Map<String, String> map = config.findStringMatchMap(ruleId + "_");
String rule = map.getOrDefault("RULE", AllMaxTime.class.getName());
Rule ruleImpl = (Rule) Factory.create(rule, ruleId, map.get("TIME_LIMIT_MINUTES"), map.get("THRESHOLD"));
logger.debug("Sentinel loaded {}", ruleImpl);
Rule ruleImpl =
(Rule) Factory.create(rule, validate(map.get("PLACE_MATCHER")), map.get("TIME_LIMIT_MINUTES"), map.get("THRESHOLD"));
logger.debug("Sentinel loaded rule {}", ruleImpl);
this.rules.put(ruleId, ruleImpl);
} catch (Exception e) {
logger.warn("Unable to configure Sentinel for {}: {}", ruleId, e.getMessage());
Expand All @@ -117,11 +120,7 @@ protected void init() {
// if no rules then disable protocol
if (this.rules.isEmpty()) {
this.enabled = false;
return;
}

String action = config.findStringEntry("ACTION", Notify.class.getName());
this.action = (Action) Factory.create(action);
}
}

Expand All @@ -132,35 +131,57 @@ protected void init() {
* @throws NamespaceException if the directory place does not exist
* @throws IllegalStateException if the place cannot be found
*/
protected void validate(String place) throws NamespaceException {
protected String validate(String place) throws NamespaceException {
// validate that the place exists
if (!DEFAULT_RULE.equalsIgnoreCase(place)) {
DirectoryPlace directoryPlace = Namespace.lookup(DirectoryPlace.class).iterator().next();
if (directoryPlace.getEntries().stream()
.noneMatch(entry -> place.equalsIgnoreCase(KeyManipulator.getServiceClassname(entry.getFullKey())))) {
throw new IllegalStateException("Place not found in the directory");
}
}
}

/**
* Check the configured rules against the Sentinel tracking objects
*
* @param trackers the listing of agents, places, and filenames that's currently processing
* @param placeSimpleName the place name currently processing on one or more mobile agents
* @param count number of mobile agents stuck on the place
*/
protected void check(Map<String, Sentinel.Tracker> trackers, String placeSimpleName, Integer count) {
if (rules.values().stream().allMatch(rule -> rule.condition(trackers, placeSimpleName, count))) {
action.trigger(trackers, placeSimpleName, count);
DirectoryPlace directoryPlace = Namespace.lookup(DirectoryPlace.class).iterator().next();
if (directoryPlace.getEntries().stream()
.noneMatch(entry -> KeyManipulator.getServiceClassname(entry.getFullKey()).matches(place))) {
throw new IllegalStateException("Place not found in the directory");
}
return place;
}

@Override
public String toString() {
return new StringJoiner(", ", Protocol.class.getSimpleName() + "[", "]")
return new StringJoiner(", ", "[", "]")
.add("rules=" + rules)
.add("action=" + action)
.toString();
}

public static class PlaceAgentStats {

private final String place;
private int count;
private long maxTimeInPlace = -1;
private long minTimeInPlace = -1;

public PlaceAgentStats(String place) {
this.place = place;
}

public String getPlace() {
return place;
}

public int getCount() {
return count;
}

public long getMaxTimeInPlace() {
return maxTimeInPlace;
}

public long getMinTimeInPlace() {
return minTimeInPlace;
}

public PlaceAgentStats update(long timer) {
this.count++;
this.minTimeInPlace = this.minTimeInPlace < 0 ? timer : Math.min(this.minTimeInPlace, timer);
this.maxTimeInPlace = Math.max(this.maxTimeInPlace, timer);
return this;
}
}

}
Loading

0 comments on commit 99c3990

Please sign in to comment.