-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Clickhouse changes #1
base: alertmanager_alerter
Are you sure you want to change the base?
Changes from 5 commits
0ad28b0
9435891
36d28ef
f4b674c
7f1b64e
e28d269
0eca5d4
bb7de75
2ffbac6
13e2755
11849a8
72a0932
10e0e09
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,11 +10,14 @@ | |
import time | ||
import timeit | ||
import traceback | ||
import requests | ||
from email.mime.text import MIMEText | ||
from smtplib import SMTP | ||
from smtplib import SMTPException | ||
from socket import error | ||
|
||
|
||
|
||
import dateutil.tz | ||
import kibana | ||
import yaml | ||
|
@@ -28,6 +31,7 @@ | |
from elasticsearch.exceptions import TransportError | ||
from enhancements import DropMatchException | ||
from ruletypes import FlatlineRule | ||
from ruletypes import ErrorRateRule | ||
from util import add_raw_postfix | ||
from util import cronite_datetime_to_timestamp | ||
from util import dt_to_ts | ||
|
@@ -156,6 +160,8 @@ def __init__(self, args): | |
self.writeback_es = elasticsearch_client(self.conf) | ||
self._es_version = None | ||
|
||
self.query_endpoint = self.conf['query_endpoint'] | ||
|
||
remove = [] | ||
for rule in self.rules: | ||
if not self.init_rule(rule): | ||
|
@@ -534,43 +540,57 @@ def get_hits_terms(self, rule, starttime, endtime, index, key, qk=None, size=Non | |
return {endtime: buckets} | ||
|
||
def get_hits_aggregation(self, rule, starttime, endtime, index, query_key, term_size=None): | ||
rule_filter = copy.copy(rule['filter']) | ||
base_query = self.get_query( | ||
rule_filter, | ||
starttime, | ||
endtime, | ||
timestamp_field=rule['timestamp_field'], | ||
sort=False, | ||
to_ts_func=rule['dt_to_ts'], | ||
five=rule['five'] | ||
) | ||
if term_size is None: | ||
term_size = rule.get('terms_size', 50) | ||
query = self.get_aggregation_query(base_query, rule, query_key, term_size, rule['timestamp_field']) | ||
agg_key = rule['metric_agg_type']+"("+rule['metric_agg_key']+")" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a better way to do string interpolation and concatenation in Python. In Python 3, We can do like this |
||
query = rule['filter'][0]['query_string']['query'] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if there is no filter? Will config itself throw that error or will it get stuck here? |
||
data, count = self.get_ch_data(rule, starttime, endtime, agg_key, query) | ||
|
||
if data is None: | ||
return {} | ||
payload = {rule['metric_agg_key']+"_"+rule['metric_agg_type']: {'value': data}} | ||
|
||
self.num_hits += count | ||
return {endtime: payload} | ||
|
||
def get_error_rate(self, rule, starttime, endtime): | ||
agg_key = rule['metric_agg_type']+"()" | ||
query = rule['filter'][0]['query_string']['query'] | ||
total_data, total_count = self.get_ch_data(rule, starttime, endtime, agg_key, query) | ||
|
||
if total_data is None: | ||
return {} | ||
|
||
query = query+" AND "+rule['error_condition'] | ||
error_data, error_count = self.get_ch_data(rule, starttime, endtime, agg_key, query) | ||
|
||
if error_data is None: | ||
return {} | ||
|
||
payload = {'error_count': error_data, 'total_count': total_data} | ||
self.num_hits += int(total_count) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why num_hits is required? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. num hits is mainly used in looping very large dataset in batches using scroll in ES. Also it appears in the alert. Updating it to follow the convention followed across the code |
||
|
||
return {endtime: payload} | ||
|
||
def get_ch_data(self, rule, starttime, endtime, agg_key, freshquery): | ||
data = { | ||
"selects":[], | ||
"start_time":dt_to_ts(starttime), | ||
"end_time":dt_to_ts(endtime), | ||
"freshquery": freshquery, | ||
"group_bys":[], | ||
"sort_orders":[{"sort_by": agg_key,"sort_direction":"desc"}], | ||
"limit":"500", | ||
"aggregations":[{"function": rule['metric_agg_type'].upper(), "field": rule['metric_agg_key']}] | ||
} | ||
try: | ||
if not rule['five']: | ||
res = self.current_es.search( | ||
index=index, | ||
doc_type=rule.get('doc_type'), | ||
body=query, | ||
search_type='count', | ||
ignore_unavailable=True | ||
) | ||
else: | ||
res = self.current_es.search(index=index, doc_type=rule.get('doc_type'), body=query, size=0, ignore_unavailable=True) | ||
except ElasticsearchException as e: | ||
res = requests.post(self.query_endpoint, json=data) | ||
res.raise_for_status() | ||
except requests.exceptions.RequestException as e: | ||
if len(str(e)) > 1024: | ||
e = str(e)[:1024] + '... (%d characters removed)' % (len(str(e)) - 1024) | ||
self.handle_error('Error running query: %s' % (e), {'rule': rule['name']}) | ||
return None | ||
if 'aggregations' not in res: | ||
return {} | ||
if not rule['five']: | ||
payload = res['aggregations']['filtered'] | ||
else: | ||
payload = res['aggregations'] | ||
self.num_hits += res['hits']['total'] | ||
return {endtime: payload} | ||
return None,0 | ||
res = json.loads(res.content) | ||
return res['data'][0][agg_key], res['rows'] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What would be there in these two values? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agg value and the number of rows satisfied the filter criteria |
||
|
||
def remove_duplicate_events(self, data, rule): | ||
new_events = [] | ||
|
@@ -616,6 +636,8 @@ def run_query(self, rule, start=None, end=None, scroll=False): | |
data = self.get_hits_count(rule, start, end, index) | ||
elif rule.get('use_terms_query'): | ||
data = self.get_hits_terms(rule, start, end, index, rule['query_key']) | ||
elif isinstance(rule_inst, ErrorRateRule): | ||
data = self.get_error_rate(rule, start, end) | ||
elif rule.get('aggregation_query_element'): | ||
data = self.get_hits_aggregation(rule, start, end, index, rule.get('query_key', None)) | ||
else: | ||
|
@@ -633,6 +655,8 @@ def run_query(self, rule, start=None, end=None, scroll=False): | |
rule_inst.add_count_data(data) | ||
elif rule.get('use_terms_query'): | ||
rule_inst.add_terms_data(data) | ||
elif isinstance(rule_inst, ErrorRateRule): | ||
rule_inst.calculate_err_rate(data) | ||
elif rule.get('aggregation_query_element'): | ||
rule_inst.add_aggregation_data(data) | ||
else: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1019,6 +1019,30 @@ def unwrap_term_buckets(self, timestamp, term_buckets): | |
def check_matches(self, timestamp, query_key, aggregation_data): | ||
raise NotImplementedError() | ||
|
||
class ErrorRateRule(BaseAggregationRule): | ||
""" A rule that determines error rate with sampling rate""" | ||
required_options = frozenset(['sampling', 'threshold']) | ||
def __init__(self, *args): | ||
super(ErrorRateRule, self).__init__(*args) | ||
self.ts_field = self.rules.get('timestamp_field', '@timestamp') | ||
self.rules['metric_agg_key'] = "1" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this default? Shall we add a comment here as this is default? |
||
self.rules['metric_agg_type'] = "count" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to have cardinality query when finding total traces and just count query for error_traces? So metric_agg_type should be cardinality. But it's going as count only by default. |
||
self.metric_key = self.rules['metric_agg_key'] + '_' + self.rules['metric_agg_type'] | ||
|
||
def get_match_str(self, match): | ||
message = 'Threshold violation, error rate is %s' % (match['error_rate']) | ||
return message | ||
|
||
def calculate_err_rate(self,payload): | ||
for timestamp, payload_data in payload.iteritems(): | ||
if int(payload_data['total_count']) > 0: | ||
rate = float(payload_data['error_count'])/float(payload_data['total_count']) | ||
rate = float(rate*100)/float(self.rules['sampling']) | ||
rate = rate*100 | ||
if 'threshold' in self.rules and rate > self.rules['threshold']: | ||
match = {self.rules['timestamp_field']: timestamp, 'error_rate': rate} | ||
self.add_match(match) | ||
|
||
|
||
class MetricAggregationRule(BaseAggregationRule): | ||
""" A rule that matches when there is a low number of events given a timeframe. """ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are changing the common method being used, so I think it will effect the existing alerts? Can you check?
Also is this required just for response_time alert
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in traces use case it will be used only for response time avg alert. As we already discussed we will host elastalert for trace separately from logs, this should not be an issue