Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add aggregated data to alert #30

Open
wants to merge 2 commits into
base: fw_2.9.0_migration
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .python-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.11
3.9.16
12 changes: 12 additions & 0 deletions elastalert/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ def _add_top_counts(self):
self.text += '%s: %s\n' % (term, count)

self.text += '\n'
def _add_aggregated_data(self):
if 'aggregated_data' in self.match:
list_of_tuples = self.match.get('aggregated_data', [])
for _tuple in list_of_tuples:
print("handling tuple: "+_tuple)
for item in _tuple:
print("item : "+item)
self.text += item + ','
self.text += "\n"
self.text += '\n'

def _add_match_items(self):
match_items = list(self.match.items())
Expand Down Expand Up @@ -124,6 +134,8 @@ def __str__(self):
self._ensure_new_line()
if self.rule.get('top_count_keys'):
self._add_top_counts()
if self.rule.get('agregated_keys'):
self._add_aggregated_data()
if self.rule.get('alert_text_type') != 'exclude_fields':
self._add_match_items()
return self.text
Expand Down
92 changes: 91 additions & 1 deletion elastalert/elastalert.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,9 @@ def get_aggregation_query(self, query, rule, query_key, terms_size, timestamp_fi
query_element = query['query']
if 'sort' in query_element:
query_element.pop('sort')
metric_agg_element = rule['aggregation_query_element']
metric_agg_element = {}
if 'aggregation_query_element' in rule:
metric_agg_element = rule['aggregation_query_element']

bucket_interval_period = rule.get('bucket_interval_period')
if bucket_interval_period is not None:
Expand All @@ -297,6 +299,8 @@ def get_aggregation_query(self, query, rule, query_key, terms_size, timestamp_fi
aggs_element['interval_aggs']['date_histogram']['offset'] = '+%ss' % (rule['bucket_offset_delta'])
else:
aggs_element = metric_agg_element

elastalert_logger.info("Aggregation keys: %s",query_key)

if query_key is not None:
for idx, key in reversed(list(enumerate(query_key.split(',')))):
Expand Down Expand Up @@ -369,6 +373,8 @@ def process_hits(rule, hits):
hit['_source'][rule['aggregation_key']] = ', '.join([str(value) for value in values])

processed_hits.append(hit['_source'])
print("len(processed_hits)")
print(len(processed_hits))

return processed_hits

Expand Down Expand Up @@ -605,14 +611,17 @@ def get_hits_aggregation(self, rule, starttime, endtime, index, query_key, term_
sort=False,
to_ts_func=rule['dt_to_ts'],
)
elastalert_logger.info("FORMED base query %s",base_query)
if term_size is None:
term_size = rule.get('terms_size', 50)
query = self.get_aggregation_query(base_query, rule, query_key, term_size, rule['timestamp_field'])
elastalert_logger.info("FORMED aggregated query %s",query)
request = get_msearch_query(query,rule)
try:
#using backwards compatibile msearch
res = self.thread_data.current_es.msearch(body=request)
res = res['responses'][0]
elastalert_logger.info("Aggregated response: %s",res)
except ElasticsearchException as e:
if len(str(e)) > 1024:
e = str(e)[:1024] + '... (%d characters removed)' % (len(str(e)) - 1024)
Expand Down Expand Up @@ -694,6 +703,7 @@ def remove_duplicate_events(self, data, rule):
new_events = []
for event in data:
if event['_id'] in rule['processed_hits']:
print("Removing duplicate hit")
continue

# Remember the new data's IDs
Expand Down Expand Up @@ -1494,7 +1504,40 @@ def send_alert(self, matches, rule, alert_time=None, retried=False):
end = ts_to_dt(lookup_es_key(match, rule['timestamp_field'])) + datetime.timedelta(minutes=10)
keys = rule.get('top_count_keys')
counts = self.get_top_counts(rule, start, end, keys, qk=qk)
elastalert_logger.info("top_count_keys: %s",counts)
elastalert_logger.info("Match: %s",match)
match.update(counts)
elastalert_logger.info("Match: %s",match)
elif rule.get('aggregation_keys'):
elastalert_logger.info("GOT INTO aggregation_keys")
for match in matches:
if 'query_key' in rule:
qk = lookup_es_key(match, rule['query_key'])
else:
qk = None

if isinstance(rule['type'], FlatlineRule):
# flatline rule triggers when there have been no events from now()-timeframe to now(),
# so using now()-timeframe will return no results. for now we can just mutliple the timeframe
# by 2, but this could probably be timeframe+run_every to prevent too large of a lookup?
timeframe = datetime.timedelta(seconds=2 * rule.get('timeframe').total_seconds())
else:
timeframe = rule.get('timeframe', datetime.timedelta(minutes=10))

start = ts_to_dt(lookup_es_key(match, rule['timestamp_field'])) - timeframe
end = ts_to_dt(lookup_es_key(match, rule['timestamp_field'])) + datetime.timedelta(minutes=10)
keys = rule.get('aggregation_keys')
aggregated_data = self.get_hits_aggregation(rule, start, end, self.get_index(rule, start, end), rule.get('aggregation_keys',None))
aggregated_data = next(iter(aggregated_data.values()))['bucket_aggs']['buckets']
elastalert_logger.info("aggregated_data: %s" % (aggregated_data))
flatten_data = []
flatten_data = self.flatten_aggregated_data(aggregated_data,flatten_data)
elastalert_logger.info("csv data: %s",flatten_data)
elastalert_logger.info("Match: %s",match)
match_aggrgated_data = {"aggregated_data":flatten_data}
match.update(match_aggrgated_data)
elastalert_logger.info("Match: %s",match)


if rule.get('generate_kibana_discover_url'):
kb_link = generate_kibana_discover_url(rule, matches[0])
Expand Down Expand Up @@ -1982,6 +2025,53 @@ def get_top_counts(self, rule, starttime, endtime, keys, number=None, qk=None):
for key in keys:
index = self.get_index(rule, starttime, endtime)

hits_terms = self.get_hits_terms(rule, starttime, endtime, index, key, qk, number)
elastalert_logger.info("hits_terms: %s",hits_terms)
if hits_terms is None or not hits_terms:
top_events_count = {}
else:
buckets = list(hits_terms.values())[0]

# get_hits_terms adds to num_hits, but we don't want to count these
self.thread_data.num_hits -= len(buckets)
terms = {}
for bucket in buckets:
terms[bucket['key']] = bucket['doc_count']
counts = list(terms.items())
counts.sort(key=lambda x: x[1], reverse=True)
top_events_count = dict(counts[:number])

# Save a dict with the top 5 events by key
all_counts['top_events_%s' % (key)] = top_events_count
elastalert_logger.info("returning all_count %s",all_counts)

return all_counts

def flatten_aggregated_data(self,buckets,records,key=None):
for bucket in buckets:
elastalert_logger.info("handling bucket: %s",bucket)
if key == None:
nestedkey = (str(bucket['key']))
else:
nestedkey = key + str(bucket['key'])
if 'bucket_aggs' in bucket:
records = self.flatten_aggregated_data(bucket['bucket_aggs']['buckets'],records,nestedkey)
else:
record = nestedkey + str(bucket['doc_count'])
elastalert_logger.info("record: %s",record)
records.append(record)
return records


def get_aggregated_data(self, rule, starttime, endtime, keys, number=None, qk=None):
""" Counts the number of events for each unique value for each key field.
Returns a dictionary with top_events_<key> mapped to the top 5 counts for each key. """
all_counts = {}
if not number:
number = rule.get('aggregated_count_number', 10)
for key in keys:
index = self.get_index(rule, starttime, endtime)

hits_terms = self.get_hits_terms(rule, starttime, endtime, index, key, qk, number)
if hits_terms is None or not hits_terms:
top_events_count = {}
Expand Down
19 changes: 18 additions & 1 deletion elastalert/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ def load_rule_schema():
schema_path = os.path.join(os.path.dirname(__file__), 'schema.yaml')
with open(schema_path) as schema_file:
schema_yml = yaml.load(schema_file, Loader=yaml.FullLoader)
print("SCHEMA YAML")
print(schema_yml)
return jsonschema.Draft7Validator(schema_yml)


Expand Down Expand Up @@ -166,7 +168,11 @@ def load(self, conf, args=None):
rule_files = self.get_names(conf, use_rule)
for rule_file in rule_files:
try:


rule = self.load_configuration(rule_file, conf, args)
print("RULE")
print(rule)
# A rule failed to load, don't try to process it
if not rule:
elastalert_logger.error('Invalid rule file skipped: %s' % rule_file)
Expand Down Expand Up @@ -399,7 +405,8 @@ def _dt_to_ts_with_format(dt):
# Make sure we have required options
if self.required_locals - frozenset(list(rule.keys())):
raise EAException('Missing required option(s): %s' % (', '.join(self.required_locals - frozenset(list(rule.keys())))))

print("RULE : ")
print(rule)
if 'include' in rule and type(rule['include']) != list:
raise EAException('include option must be a list')

Expand All @@ -418,6 +425,16 @@ def _dt_to_ts_with_format(dt):
else:
del(rule['query_key'])

raw_aggregation_keys = rule.get('aggregation_keys')
if isinstance(raw_aggregation_keys, list):
if len(raw_aggregation_keys) > 1:
rule['compound_aggregation_keys'] = raw_aggregation_keys
rule['aggregation_keys'] = ','.join(raw_aggregation_keys)
elif len(raw_aggregation_keys) == 1:
rule['aggregation_keys'] = raw_aggregation_keys[0]
else:
del(rule['aggregation_keys'])

if isinstance(rule.get('aggregation_key'), list):
rule['compound_aggregation_key'] = rule['aggregation_key']
rule['aggregation_key'] = ','.join(rule['aggregation_key'])
Expand Down