From 4d6ab3615f6028d233776642afae0b8132cac1d0 Mon Sep 17 00:00:00 2001 From: pegasusljn Date: Mon, 20 Jan 2025 11:51:22 +0800 Subject: [PATCH] =?UTF-8?q?chore:=20=E8=A1=A5=E5=85=85es=E7=B4=A2=E5=BC=95?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E4=BC=98=E5=8C=96=E7=9A=84=E6=B3=A8=E9=87=8A?= =?UTF-8?q?=20--story=3D121725248?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bkmonitor/apm/utils/es_search.py | 33 ++++++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/bkmonitor/apm/utils/es_search.py b/bkmonitor/apm/utils/es_search.py index 53612d510..cd75ef3cd 100644 --- a/bkmonitor/apm/utils/es_search.py +++ b/bkmonitor/apm/utils/es_search.py @@ -86,6 +86,10 @@ def _scan( class EsQueryProxy(QueryProxy): + """ + Es查询代理, 用于注入query条件对查询索引的优化操作 + """ + def __call__(self, *args, **kwargs): filters = kwargs.get("filter", []) if filters: @@ -106,6 +110,10 @@ def __call__(self, *args, **kwargs): class EsSearch(Search): + """ + 重写es的search类,增加es查询索引优化 + """ + query = ProxyDescriptor("es_query") post_filter = ProxyDescriptor("es_post_filter") @@ -115,6 +123,8 @@ def __init__(self, **kwargs): self._es_post_filter_proxy = EsQueryProxy(self, "post_filter") def fix_index(self, indices: List[str]): + """调整查询的目标索引列表""" + if indices: self._index = indices @@ -156,6 +166,8 @@ def limits(calls, period): class QueryIndexOptimizer(object): + """es查询索引优化类""" + def __init__( self, indices: list, @@ -180,7 +192,8 @@ def index(self): return [self._index] if self._index else None def index_filter(self, indices, start_time: arrow.Arrow, end_time: arrow.Arrow, time_zone: str) -> List[str]: - # BkData索引集优化 + """根据时间query时间对索引进行过滤, 返回经过过滤后的索引列表""" + indices_list = set() for indices_str in indices: for _index in indices_str.split(","): @@ -190,52 +203,67 @@ def index_filter(self, indices, start_time: arrow.Arrow, end_time: arrow.Arrow, return final_index_list def index_time_filters(self, date_start: arrow.Arrow, date_end: arrow.Arrow, time_zone: str): + """获取时间过滤器列表""" + now = arrow.now(time_zone) date_start = date_start.to(time_zone) date_end = date_end.to(time_zone) if date_end else now if date_end > now: date_end = now + # 根据输入query的事件参数,生成能覆盖查询条件的"day"级别日期类表 date_day_list: List[Any] = list( rrule(DAILY, interval=1, dtstart=date_start.floor("day").datetime, until=date_end.ceil("day").datetime) ) + # 根据输入query的事件参数,生成能覆盖查询条件的"month"级别日期类表 date_month_list: List[Any] = list( rrule( MONTHLY, interval=1, dtstart=date_start.floor("month").datetime, until=date_end.ceil("month").datetime ) ) + # 根据日期类表和查询条件,生成索引的时间过滤器列表 return self._generate_filter_list(date_day_list, date_month_list) @classmethod def _generate_filter_list(cls, date_day_list, date_month_list): + """根据日期类表和查询条件,生成索引的时间过滤器列表""" + date_filter_template = r"^.*_bkapm_trace_.+_{}_\d+$" month_filter_template = r"^.*_bkapm_trace_.+_{}.*_\d+$" + # 用于索引过滤的正则pattern列表 filter_mapping = {} + # 一天范围内的查询 if len(date_day_list) == 1: for x in date_day_list: pattern = date_filter_template.format(x.strftime("%Y%m%d")) if pattern not in filter_mapping: filter_mapping[pattern] = re.compile(pattern) + # 一个月范围内的多日查询 elif len(date_day_list) > 1 and len(date_month_list) == 1: + # 14天以上的查询 if len(date_day_list) > 14: for x in date_month_list: pattern = month_filter_template.format(x.strftime("%Y%m")) if pattern not in filter_mapping: filter_mapping[pattern] = re.compile(pattern) + # 2-14天的查询 else: for x in date_day_list: pattern = date_filter_template.format(x.strftime("%Y%m%d")) if pattern not in filter_mapping: filter_mapping[pattern] = re.compile(pattern) + # 跨月份的多日查询 elif len(date_day_list) > 1 and len(date_month_list) > 1: + # 6个月内的查询 if len(date_month_list) <= 6: for x in date_month_list: pattern = month_filter_template.format(x.strftime("%Y%m")) if pattern not in filter_mapping: filter_mapping[pattern] = re.compile(pattern) + # 6个月以上的查询 else: for x in date_month_list[-6::1]: pattern = month_filter_template.format(x.strftime("%Y%m")) @@ -245,7 +273,8 @@ def _generate_filter_list(cls, date_day_list, date_month_list): @classmethod def check_index_date(cls, index, index_filters): - # 从索引名称中提取日期部分 + # 检查索引是否匹配时间过滤器 + for filter_pattern in index_filters: if filter_pattern.match(index): return True