diff --git a/elastalert/config.py b/elastalert/config.py index d1c736306..0fcfc4934 100644 --- a/elastalert/config.py +++ b/elastalert/config.py @@ -58,6 +58,7 @@ 'cardinality': ruletypes.CardinalityRule, 'metric_aggregation': ruletypes.MetricAggregationRule, 'percentage_match': ruletypes.PercentageMatchRule, + 'error_rate': ruletypes.ErrorRateRule } # Used to map names of alerts to their classes diff --git a/elastalert/elastalert.py b/elastalert/elastalert.py index f9047d31d..667fbfd38 100755 --- a/elastalert/elastalert.py +++ b/elastalert/elastalert.py @@ -10,11 +10,14 @@ import time import timeit import traceback +import requests from email.mime.text import MIMEText from smtplib import SMTP from smtplib import SMTPException from socket import error + + import dateutil.tz import kibana import yaml @@ -28,9 +31,11 @@ from elasticsearch.exceptions import TransportError from enhancements import DropMatchException from ruletypes import FlatlineRule +from ruletypes import ErrorRateRule from util import add_raw_postfix from util import cronite_datetime_to_timestamp from util import dt_to_ts +from util import dt_to_ts_with_format from util import dt_to_unix from util import EAException from util import elastalert_logger @@ -156,6 +161,8 @@ def __init__(self, args): self.writeback_es = elasticsearch_client(self.conf) self._es_version = None + self.query_endpoint = self.conf['query_endpoint'] + remove = [] for rule in self.rules: if not self.init_rule(rule): @@ -534,43 +541,77 @@ def get_hits_terms(self, rule, starttime, endtime, index, key, qk=None, size=Non return {endtime: buckets} def get_hits_aggregation(self, rule, starttime, endtime, index, query_key, term_size=None): - rule_filter = copy.copy(rule['filter']) - base_query = self.get_query( - rule_filter, - starttime, - endtime, - timestamp_field=rule['timestamp_field'], - sort=False, - to_ts_func=rule['dt_to_ts'], - five=rule['five'] - ) - if term_size is None: - term_size = rule.get('terms_size', 50) - query = self.get_aggregation_query(base_query, rule, query_key, term_size, rule['timestamp_field']) + agg_key = '{}({})'.format(rule['metric_agg_type'],rule['metric_agg_key']) + query = self.get_query_string(rule) + aggregation = {"function": rule['metric_agg_type'].upper(), "field": rule['metric_agg_key']} + data, count = self.get_ch_data(rule, starttime, endtime, agg_key, query, aggregation) + + if data is None: + return {} + payload = {rule['metric_agg_key']+"_"+rule['metric_agg_type']: {'value': data}} + + self.num_hits += count + return {endtime: payload} + + def get_error_rate(self, rule, starttime, endtime): + elastalert_logger.info("query start time and endtime %s at %s" % (starttime, endtime)) + agg_key = '{}({})'.format(rule['total_agg_type'],rule['total_agg_key']) + query = self.get_query_string(rule) + aggregation = {"function": rule['total_agg_type'].upper(), "field": rule['total_agg_key']} + + total_data, total_count = self.get_ch_data(rule, starttime, endtime, agg_key, query, aggregation) + + elastalert_logger.info("total data is %s" % (total_data)) + if total_data is None: + return {} + + agg_key = "count()" + if(query): + query = '{} AND {}'.format(query,rule['error_condition']) + else: + query = rule['error_condition'] + + aggregation = {"function": "COUNT", "field": "1"} + + error_data, error_count = self.get_ch_data(rule, starttime, endtime, agg_key, query, aggregation) + + elastalert_logger.info("error data is %s" % (error_data)) + if error_data is None: + return {} + + payload = {'error_count': error_data, 'total_count': total_data, 'start_time': starttime, 'end_time': endtime} + self.num_hits += int(total_count) + + return {endtime: payload} + + def get_query_string(self, rule): + if rule['filter'] and ('query_string' in rule['filter'][0]) and ('query' in rule['filter'][0]['query_string']): + return rule['filter'][0]['query_string']['query'] + return "" + + def get_ch_data(self, rule, starttime, endtime, agg_key, freshquery,aggregation): + elastalert_logger.info("query start timestamp and end timestamp %s at %s" % (dt_to_ts(starttime), dt_to_ts(endtime))) + data = { + "selects":[], + "start_time":dt_to_ts_with_format(starttime,"%Y-%m-%dT%H:%M:%S.%f")[:-3]+'Z', + "end_time":dt_to_ts_with_format(endtime,"%Y-%m-%dT%H:%M:%S.%f")[:-3]+'Z', + "freshquery": freshquery, + "group_bys":[], + "sort_orders":[{"sort_by": agg_key,"sort_direction":"desc"}], + "limit":"500", + "aggregations":[aggregation] + } try: - if not rule['five']: - res = self.current_es.search( - index=index, - doc_type=rule.get('doc_type'), - body=query, - search_type='count', - ignore_unavailable=True - ) - else: - res = self.current_es.search(index=index, doc_type=rule.get('doc_type'), body=query, size=0, ignore_unavailable=True) - except ElasticsearchException as e: + elastalert_logger.info("request data is %s" % json.dumps(data)) + res = requests.post(self.query_endpoint, json=data) + res.raise_for_status() + except requests.exceptions.RequestException as e: if len(str(e)) > 1024: e = str(e)[:1024] + '... (%d characters removed)' % (len(str(e)) - 1024) self.handle_error('Error running query: %s' % (e), {'rule': rule['name']}) - return None - if 'aggregations' not in res: - return {} - if not rule['five']: - payload = res['aggregations']['filtered'] - else: - payload = res['aggregations'] - self.num_hits += res['hits']['total'] - return {endtime: payload} + return None,0 + res = json.loads(res.content) + return res['data'][0][agg_key], res['rows'] def remove_duplicate_events(self, data, rule): new_events = [] @@ -616,6 +657,8 @@ def run_query(self, rule, start=None, end=None, scroll=False): data = self.get_hits_count(rule, start, end, index) elif rule.get('use_terms_query'): data = self.get_hits_terms(rule, start, end, index, rule['query_key']) + elif isinstance(rule_inst, ErrorRateRule): + data = self.get_error_rate(rule, start, end) elif rule.get('aggregation_query_element'): data = self.get_hits_aggregation(rule, start, end, index, rule.get('query_key', None)) else: @@ -633,6 +676,8 @@ def run_query(self, rule, start=None, end=None, scroll=False): rule_inst.add_count_data(data) elif rule.get('use_terms_query'): rule_inst.add_terms_data(data) + elif isinstance(rule_inst, ErrorRateRule): + rule_inst.calculate_err_rate(data) elif rule.get('aggregation_query_element'): rule_inst.add_aggregation_data(data) else: diff --git a/elastalert/ruletypes.py b/elastalert/ruletypes.py index d3511a582..f7c609ae4 100644 --- a/elastalert/ruletypes.py +++ b/elastalert/ruletypes.py @@ -1019,6 +1019,35 @@ def unwrap_term_buckets(self, timestamp, term_buckets): def check_matches(self, timestamp, query_key, aggregation_data): raise NotImplementedError() +class ErrorRateRule(BaseAggregationRule): + """ A rule that determines error rate with sampling rate""" + required_options = frozenset(['sampling', 'threshold', 'unique_column']) + def __init__(self, *args): + super(ErrorRateRule, self).__init__(*args) + self.ts_field = self.rules.get('timestamp_field', '@timestamp') + self.rules['total_agg_key'] = self.rules['unique_column'] + # hardcoding uniq aggregation for total count + self.rules['total_agg_type'] = "uniq" + + self.rules['aggregation_query_element'] = self.generate_aggregation_query() + + def get_match_str(self, match): + message = 'Threshold violation, error rate is %s' % (match['error_rate']) + return message + + def generate_aggregation_query(self): + return {"function": self.rules['total_agg_type'].upper(), "field": self.rules['total_agg_key']} + + def calculate_err_rate(self,payload): + for timestamp, payload_data in payload.iteritems(): + if int(payload_data['total_count']) > 0: + rate = float(payload_data['error_count'])/float(payload_data['total_count']) + rate = float(rate)/float(self.rules['sampling']) + rate = rate*100 + if 'threshold' in self.rules and rate > self.rules['threshold']: + match = {self.rules['timestamp_field']: timestamp, 'error_rate': rate, 'from': payload_data['start_time'], 'to': payload_data['end_time']} + self.add_match(match) + class MetricAggregationRule(BaseAggregationRule): """ A rule that matches when there is a low number of events given a timeframe. """ diff --git a/elastalert/schema.yaml b/elastalert/schema.yaml index c3d4509e9..6555d274f 100644 --- a/elastalert/schema.yaml +++ b/elastalert/schema.yaml @@ -129,6 +129,14 @@ oneOf: # custom rules include a period in the rule type type: {pattern: "[.]"} + - title: Error Rate + required: [sampling, threshold] + properties: + sampling: {type: integer} + threshold: {type: number} + error_condition: {type: string} + unique_column: {type: string} + properties: # Common Settings