Skip to content

Commit

Permalink
chore: 补充es索引查询优化的注释 --story=121725248
Browse files Browse the repository at this point in the history
  • Loading branch information
pegasusljn committed Jan 20, 2025
1 parent 0b41691 commit 4d6ab36
Showing 1 changed file with 31 additions and 2 deletions.
33 changes: 31 additions & 2 deletions bkmonitor/apm/utils/es_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ def _scan(


class EsQueryProxy(QueryProxy):
"""
Es查询代理, 用于注入query条件对查询索引的优化操作
"""

def __call__(self, *args, **kwargs):
filters = kwargs.get("filter", [])
if filters:
Expand All @@ -106,6 +110,10 @@ def __call__(self, *args, **kwargs):


class EsSearch(Search):
"""
重写es的search类,增加es查询索引优化
"""

query = ProxyDescriptor("es_query")
post_filter = ProxyDescriptor("es_post_filter")

Expand All @@ -115,6 +123,8 @@ def __init__(self, **kwargs):
self._es_post_filter_proxy = EsQueryProxy(self, "post_filter")

def fix_index(self, indices: List[str]):
"""调整查询的目标索引列表"""

if indices:
self._index = indices

Expand Down Expand Up @@ -156,6 +166,8 @@ def limits(calls, period):


class QueryIndexOptimizer(object):
"""es查询索引优化类"""

def __init__(
self,
indices: list,
Expand All @@ -180,7 +192,8 @@ def index(self):
return [self._index] if self._index else None

def index_filter(self, indices, start_time: arrow.Arrow, end_time: arrow.Arrow, time_zone: str) -> List[str]:
# BkData索引集优化
"""根据时间query时间对索引进行过滤, 返回经过过滤后的索引列表"""

indices_list = set()
for indices_str in indices:
for _index in indices_str.split(","):
Expand All @@ -190,52 +203,67 @@ def index_filter(self, indices, start_time: arrow.Arrow, end_time: arrow.Arrow,
return final_index_list

def index_time_filters(self, date_start: arrow.Arrow, date_end: arrow.Arrow, time_zone: str):
"""获取时间过滤器列表"""

now = arrow.now(time_zone)
date_start = date_start.to(time_zone)
date_end = date_end.to(time_zone) if date_end else now
if date_end > now:
date_end = now

# 根据输入query的事件参数,生成能覆盖查询条件的"day"级别日期类表
date_day_list: List[Any] = list(
rrule(DAILY, interval=1, dtstart=date_start.floor("day").datetime, until=date_end.ceil("day").datetime)
)

# 根据输入query的事件参数,生成能覆盖查询条件的"month"级别日期类表
date_month_list: List[Any] = list(
rrule(
MONTHLY, interval=1, dtstart=date_start.floor("month").datetime, until=date_end.ceil("month").datetime
)
)

# 根据日期类表和查询条件,生成索引的时间过滤器列表
return self._generate_filter_list(date_day_list, date_month_list)

@classmethod
def _generate_filter_list(cls, date_day_list, date_month_list):
"""根据日期类表和查询条件,生成索引的时间过滤器列表"""

date_filter_template = r"^.*_bkapm_trace_.+_{}_\d+$"
month_filter_template = r"^.*_bkapm_trace_.+_{}.*_\d+$"

# 用于索引过滤的正则pattern列表
filter_mapping = {}
# 一天范围内的查询
if len(date_day_list) == 1:
for x in date_day_list:
pattern = date_filter_template.format(x.strftime("%Y%m%d"))
if pattern not in filter_mapping:
filter_mapping[pattern] = re.compile(pattern)
# 一个月范围内的多日查询
elif len(date_day_list) > 1 and len(date_month_list) == 1:
# 14天以上的查询
if len(date_day_list) > 14:
for x in date_month_list:
pattern = month_filter_template.format(x.strftime("%Y%m"))
if pattern not in filter_mapping:
filter_mapping[pattern] = re.compile(pattern)
# 2-14天的查询
else:
for x in date_day_list:
pattern = date_filter_template.format(x.strftime("%Y%m%d"))
if pattern not in filter_mapping:
filter_mapping[pattern] = re.compile(pattern)
# 跨月份的多日查询
elif len(date_day_list) > 1 and len(date_month_list) > 1:
# 6个月内的查询
if len(date_month_list) <= 6:
for x in date_month_list:
pattern = month_filter_template.format(x.strftime("%Y%m"))
if pattern not in filter_mapping:
filter_mapping[pattern] = re.compile(pattern)
# 6个月以上的查询
else:
for x in date_month_list[-6::1]:
pattern = month_filter_template.format(x.strftime("%Y%m"))
Expand All @@ -245,7 +273,8 @@ def _generate_filter_list(cls, date_day_list, date_month_list):

@classmethod
def check_index_date(cls, index, index_filters):
# 从索引名称中提取日期部分
# 检查索引是否匹配时间过滤器

for filter_pattern in index_filters:
if filter_pattern.match(index):
return True
Expand Down

0 comments on commit 4d6ab36

Please sign in to comment.