Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: 补充es索引查询优化的注释 --story=121725248 #4869

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions bkmonitor/apm/utils/es_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ def _scan(


class EsQueryProxy(QueryProxy):
"""
Es查询代理, 用于注入query条件对查询索引的优化操作
"""

def __call__(self, *args, **kwargs):
filters = kwargs.get("filter", [])
if filters:
Expand All @@ -106,6 +110,10 @@ def __call__(self, *args, **kwargs):


class EsSearch(Search):
"""
重写es的search类,增加es查询索引优化
"""

query = ProxyDescriptor("es_query")
post_filter = ProxyDescriptor("es_post_filter")

Expand All @@ -115,6 +123,8 @@ def __init__(self, **kwargs):
self._es_post_filter_proxy = EsQueryProxy(self, "post_filter")

def fix_index(self, indices: List[str]):
"""调整查询的目标索引列表"""

if indices:
self._index = indices

Expand Down Expand Up @@ -156,6 +166,8 @@ def limits(calls, period):


class QueryIndexOptimizer(object):
"""es查询索引优化类"""

def __init__(
self,
indices: list,
Expand All @@ -180,7 +192,8 @@ def index(self):
return [self._index] if self._index else None

def index_filter(self, indices, start_time: arrow.Arrow, end_time: arrow.Arrow, time_zone: str) -> List[str]:
# BkData索引集优化
"""根据时间query时间对索引进行过滤, 返回经过过滤后的索引列表"""

indices_list = set()
for indices_str in indices:
for _index in indices_str.split(","):
Expand All @@ -190,52 +203,67 @@ def index_filter(self, indices, start_time: arrow.Arrow, end_time: arrow.Arrow,
return final_index_list

def index_time_filters(self, date_start: arrow.Arrow, date_end: arrow.Arrow, time_zone: str):
"""获取时间过滤器列表"""

now = arrow.now(time_zone)
date_start = date_start.to(time_zone)
date_end = date_end.to(time_zone) if date_end else now
if date_end > now:
date_end = now

# 根据输入query的事件参数,生成能覆盖查询条件的"day"级别日期类表
date_day_list: List[Any] = list(
rrule(DAILY, interval=1, dtstart=date_start.floor("day").datetime, until=date_end.ceil("day").datetime)
)

# 根据输入query的事件参数,生成能覆盖查询条件的"month"级别日期类表
date_month_list: List[Any] = list(
rrule(
MONTHLY, interval=1, dtstart=date_start.floor("month").datetime, until=date_end.ceil("month").datetime
)
)

# 根据日期类表和查询条件,生成索引的时间过滤器列表
return self._generate_filter_list(date_day_list, date_month_list)

@classmethod
def _generate_filter_list(cls, date_day_list, date_month_list):
"""根据日期类表和查询条件,生成索引的时间过滤器列表"""

date_filter_template = r"^.*_bkapm_trace_.+_{}_\d+$"
month_filter_template = r"^.*_bkapm_trace_.+_{}.*_\d+$"

# 用于索引过滤的正则pattern列表
filter_mapping = {}
# 一天范围内的查询
if len(date_day_list) == 1:
for x in date_day_list:
pattern = date_filter_template.format(x.strftime("%Y%m%d"))
if pattern not in filter_mapping:
filter_mapping[pattern] = re.compile(pattern)
# 一个月范围内的多日查询
elif len(date_day_list) > 1 and len(date_month_list) == 1:
# 14天以上的查询
if len(date_day_list) > 14:
for x in date_month_list:
pattern = month_filter_template.format(x.strftime("%Y%m"))
if pattern not in filter_mapping:
filter_mapping[pattern] = re.compile(pattern)
# 2-14天的查询
else:
for x in date_day_list:
pattern = date_filter_template.format(x.strftime("%Y%m%d"))
if pattern not in filter_mapping:
filter_mapping[pattern] = re.compile(pattern)
# 跨月份的多日查询
elif len(date_day_list) > 1 and len(date_month_list) > 1:
# 6个月内的查询
if len(date_month_list) <= 6:
for x in date_month_list:
pattern = month_filter_template.format(x.strftime("%Y%m"))
if pattern not in filter_mapping:
filter_mapping[pattern] = re.compile(pattern)
# 6个月以上的查询
else:
for x in date_month_list[-6::1]:
pattern = month_filter_template.format(x.strftime("%Y%m"))
Expand All @@ -245,7 +273,8 @@ def _generate_filter_list(cls, date_day_list, date_month_list):

@classmethod
def check_index_date(cls, index, index_filters):
# 从索引名称中提取日期部分
# 检查索引是否匹配时间过滤器

for filter_pattern in index_filters:
if filter_pattern.match(index):
return True
Expand Down