Skip to content

Commit

Permalink
Merge branch 'master' into bkmonitor-release
Browse files Browse the repository at this point in the history
  • Loading branch information
unique0lai committed Jan 15, 2025
2 parents 43202b8 + 941083e commit 8988622
Show file tree
Hide file tree
Showing 32 changed files with 993 additions and 597 deletions.
18 changes: 15 additions & 3 deletions bklog/apps/log_databus/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,7 @@ class EtlConfig(object):
class MetadataTypeEnum(ChoicesEnum):
PATH = "path"

_choices_labels = (
(PATH, _("路径元数据")),
)
_choices_labels = ((PATH, _("路径元数据")),)


class EtlConfigChoices(ChoicesEnum):
Expand Down Expand Up @@ -658,3 +656,17 @@ class KafkaInitialOffsetEnum(ChoicesEnum):
(OLDEST, _("最旧")),
(NEWEST, _("最新")),
)


class CollectorBatchOperationType(ChoicesEnum):
STOP = "stop"
START = "start"
MODIFY_STORAGE = "modify_storage"
QUERY_STORAGE = "query_storage"

_choices_labels = (
(STOP, _("停用")),
(START, _("启用")),
(MODIFY_STORAGE, _("修改存储配置")),
(QUERY_STORAGE, _("查询存储")),
)
183 changes: 183 additions & 0 deletions bklog/apps/log_databus/handlers/collector_batch_operation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# -*- coding: utf-8 -*-
"""
Tencent is pleased to support the open source community by making BK-LOG 蓝鲸日志平台 available.
Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
BK-LOG 蓝鲸日志平台 is licensed under the MIT License.
License for BK-LOG 蓝鲸日志平台:
--------------------------------------------------------------------
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software,
and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial
portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from apps.log_databus.constants import CollectorBatchOperationType
from apps.log_databus.exceptions import CollectorConfigNotExistException
from apps.log_databus.handlers.collector import CollectorHandler
from apps.log_databus.handlers.etl import EtlHandler
from apps.log_databus.handlers.storage import StorageHandler
from apps.log_databus.models import CollectorConfig


class CollectorBatchHandler(object):
def __init__(self, collector_ids, operation):
self.collector_ids = collector_ids
self.operation = operation
self.collectors = CollectorConfig.objects.filter(collector_config_id__in=self.collector_ids)
if not self.collectors:
raise CollectorConfigNotExistException()

def batch_operation(self, params):
"""
根据operation执行不同的操作逻辑
"""
if self.operation == CollectorBatchOperationType.MODIFY_STORAGE.value:
retention = params["operation_params"].get("retention", -1)
storage_replies = params["operation_params"].get("storage_replies", -1)
es_shards = params["operation_params"].get("es_shards", -1)
allocation_min_days = params["operation_params"].get("allocation_min_days", -1)
result = self.modify_storage(retention, storage_replies, es_shards, allocation_min_days)
elif self.operation == CollectorBatchOperationType.QUERY_STORAGE.value:
result = self.query_storage()
else:
result = self.stop_or_start()
return result

def stop_or_start(self):
"""
停用或启动采集项
"""
results = []
for collector in self.collectors:
collector_info = {
"id": collector.collector_config_id,
"name": collector.collector_config_name,
"status": "SUCCESS",
"description": f"{self.operation} operation executed successfully",
}
try:
handler = CollectorHandler(collector_config_id=collector.collector_config_id)
if self.operation == CollectorBatchOperationType.STOP.value:
handler.stop()
else:
handler.start()

except Exception as e:
collector_info.update(
{
"status": "FAILED",
"description": f"Failed to execute {self.operation} operation, reason: {e}",
}
)
results.append(collector_info)
return results

def modify_storage(self, retention, storage_replies, es_shards, allocation_min_days):
"""
修改存储配置
:param retention: 过期时间
:param storage_replies: 副本数
:param es_shards: 分片数
:param allocation_min_days: 热数据天数
"""
results = []
for collector in self.collectors:
collector_info = {
"id": collector.collector_config_id,
"name": collector.collector_config_name,
"status": "SUCCESS",
"description": f"{self.operation} operation executed successfully",
}
handler = CollectorHandler(collector.collector_config_id)
collect_config = handler.retrieve()
clean_stash = handler.get_clean_stash()

etl_params = clean_stash["etl_params"] if clean_stash else collect_config["etl_params"]
etl_fields = (
clean_stash["etl_fields"]
if clean_stash
else [field for field in collect_config["fields"] if not field["is_built_in"]]
)
storage_cluster_id = collect_config["storage_cluster_id"]
if storage_cluster_id > 0:
cluster_info = StorageHandler(storage_cluster_id).get_cluster_info_by_id()
hot_warm_enabled = (
cluster_info["cluster_config"].get("custom_option", {}).get("hot_warm_config", {}).get("is_enabled")
)
else:
hot_warm_enabled = True

if not collect_config["is_active"]:
collector_info.update(
{
"status": "FAILED",
"description": f"Failed to execute {self.operation} operation, "
f"reason: The collector is not active",
}
)
else:
etl_params = {
"table_id": collector.collector_config_name_en,
"storage_cluster_id": storage_cluster_id,
"retention": retention if retention > 0 else collect_config["retention"],
"allocation_min_days": allocation_min_days if hot_warm_enabled else 0,
"storage_replies": storage_replies if storage_replies >= 0 else collect_config["storage_replies"],
"es_shards": es_shards if es_shards > 0 else collect_config["storage_shards_nums"],
"etl_params": etl_params,
"etl_config": collect_config["etl_config"],
"fields": etl_fields,
}
try:
etl_handler = EtlHandler.get_instance(collector.collector_config_id)
etl_handler.update_or_create(**etl_params)
except Exception as e:
collector_info.update(
{
"status": "FAILED",
"description": f"Failed to execute {self.operation} operation, reason: {e}",
}
)
results.append(collector_info)

return results

def query_storage(self):
"""
查询存储
"""
storage_data = []
total_store_size = 0
for collector in self.collectors:
collector_info = {
"id": collector.collector_config_id,
"name": collector.collector_config_name,
"bk_biz_id": collector.bk_biz_id,
"status": "SUCCESS",
"store_size": 0,
"description": f"{self.operation} success",
}
try:
indices_info = CollectorHandler(collector.collector_config_id).indices_info()
total = sum(int(idx["store.size"]) for idx in indices_info)
total_store_size += total
collector_info.update({"store_size": total})
except Exception as e:
collector_info.update(
{
"status": "FAILED",
"description": f"{self.operation} failed, reason: {e}",
}
)

storage_data.append(collector_info)

return {
"storage_data": storage_data,
"total_store_size": total_store_size,
}
15 changes: 9 additions & 6 deletions bklog/apps/log_databus/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
CLUSTER_NAME_EN_REGEX,
COLLECTOR_CONFIG_NAME_EN_REGEX,
ArchiveInstanceType,
CollectorBatchOperationType,
ContainerCollectorType,
Environment,
EsSourceType,
Expand Down Expand Up @@ -454,9 +455,7 @@ class CollectorUpdateSerializer(serializers.Serializer):
"""

collector_config_name = serializers.CharField(label=_("采集名称"), max_length=50)
collector_config_name_en = serializers.RegexField(
label=_("采集英文名称"), regex=COLLECTOR_CONFIG_NAME_EN_REGEX
)
collector_config_name_en = serializers.RegexField(label=_("采集英文名称"), regex=COLLECTOR_CONFIG_NAME_EN_REGEX)
collector_scenario_id = serializers.ChoiceField(
label=_("日志类型"), choices=CollectorScenarioEnum.get_choices(), required=False
)
Expand All @@ -476,9 +475,7 @@ class CollectorUpdateSerializer(serializers.Serializer):
class UpdateContainerCollectorSerializer(serializers.Serializer):
bk_biz_id = serializers.IntegerField(label=_("业务ID"))
collector_config_name = serializers.CharField(label=_("采集名称"), max_length=50)
collector_config_name_en = serializers.RegexField(
label=_("采集英文名称"), regex=COLLECTOR_CONFIG_NAME_EN_REGEX
)
collector_config_name_en = serializers.RegexField(label=_("采集英文名称"), regex=COLLECTOR_CONFIG_NAME_EN_REGEX)
description = serializers.CharField(
label=_("备注说明"), max_length=100, required=False, allow_null=True, allow_blank=True
)
Expand Down Expand Up @@ -1634,3 +1631,9 @@ class CheckCollectorSerializer(serializers.Serializer):

class GetCollectorCheckResultSerializer(serializers.Serializer):
check_record_id = serializers.CharField(label=_("采集项检查唯一标识"))


class CollectorBatchOperationSerializer(serializers.Serializer):
collector_config_ids = serializers.ListField(label=_("采集项ID列表"), allow_empty=False)
operation_type = serializers.ChoiceField(label=_("操作类型"), choices=CollectorBatchOperationType.get_choices())
operation_params = serializers.DictField(label=_("额外的元数据"), required=False)
9 changes: 9 additions & 0 deletions bklog/apps/log_databus/views/collector_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@
)
from apps.log_databus.constants import Environment, EtlConfig
from apps.log_databus.handlers.collector import CollectorHandler
from apps.log_databus.handlers.collector_batch_operation import CollectorBatchHandler
from apps.log_databus.handlers.etl import EtlHandler
from apps.log_databus.handlers.link import DataLinkHandler
from apps.log_databus.models import CollectorConfig
from apps.log_databus.serializers import (
BatchSubscriptionStatusSerializer,
BCSCollectorSerializer,
CleanStashSerializer,
CollectorBatchOperationSerializer,
CollectorCreateSerializer,
CollectorDataLinkListSerializer,
CollectorEtlSerializer,
Expand Down Expand Up @@ -2440,3 +2442,10 @@ def report_token(self, request, collector_config_id=None):
@list_route(methods=["GET"], url_path="report_host")
def report_host(self, request):
return Response(CollectorHandler().get_report_host())

@list_route(methods=["POST"], url_path="bulk_operation")
def collector_batch_operation(self, request):
params = self.params_valid(CollectorBatchOperationSerializer)
collector_config_ids = params["collector_config_ids"]
operation_type = params["operation_type"]
return Response(CollectorBatchHandler(collector_config_ids, operation_type).batch_operation(params))
3 changes: 2 additions & 1 deletion bklog/apps/log_search/handlers/search/aggs_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import typing
from collections import defaultdict

from django.conf import settings
from elasticsearch_dsl import A, Search

from apps.log_search.constants import TimeFieldTypeEnum, TimeFieldUnitEnum
Expand Down Expand Up @@ -172,7 +173,7 @@ def date_histogram(cls, index_set_id, query_data: dict):
interval = query_data.get("interval")

# 生成起止时间
time_zone = get_local_param("time_zone")
time_zone = get_local_param("time_zone", settings.TIME_ZONE)
start_time, end_time = generate_time_range(
query_data.get("time_range"), query_data.get("start_time"), query_data.get("end_time"), time_zone
)
Expand Down
2 changes: 1 addition & 1 deletion bklog/web/src/views/retrieve-v2/result-comp/kv-list.vue
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@

.log-item {
display: flex;
align-items: baseline;
align-items: start;
min-height: 24px;

.field-label {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,13 +364,13 @@ export default defineComponent({
};

const getSegmentRenderType = () => {
if (wordList.length < 100) {
return 'text';
}
// if (wordList.length < 100) {
// return 'text';
// }

if (wordList.length < 3000) {
return 'fabric';
}
// if (wordList.length < 3000) {
// return 'fabric';
// }

return 'text';
};
Expand Down
2 changes: 1 addition & 1 deletion bklog/web/src/views/retrieve/result-comp/kv-list.vue
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@
.kv-list-wrapper {
.log-item {
display: flex;
align-items: baseline;
align-items: start;

.field-label {
display: flex;
Expand Down
2 changes: 1 addition & 1 deletion bkmonitor/bkmonitor/define/global_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@
("DEFAULT_BKDATA_BIZ_ID", slz.IntegerField(label="接入计算平台使用的业务 ID", default=0)),
("IS_SUBSCRIPTION_ENABLED", slz.BooleanField(label="是否开启采集订阅巡检功能", default=True)),
# K8S新版灰度配置
("K8S_V2_BIZ_LIST", slz.ListField(label=_("支持ipv6的业务列表"), default=[])),
("K8S_V2_BIZ_LIST", slz.ListField(label=_("K8S新版灰度配置"), default=[])),
# 文档链接配置
("DOC_LINK_MAPPING", slz.DictField(label=_("文档链接配置"), default={})),
# 自定义事件休眠开关
Expand Down
8 changes: 4 additions & 4 deletions bkmonitor/packages/apm_web/trace/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ def perform_request(self, data):
try:
response = api.apm_api.query_span_list(params)
except BKAPIError as e:
raise ValueError(_lazy(f"Span列表请求失败: {e.data.get('message')}"))
raise CustomException(_lazy(f"Span列表请求失败: {e.data.get('message')}"))

self.burial_point(data["bk_biz_id"], data["app_name"])
QueryHandler.handle_span_list(response["data"])
Expand Down Expand Up @@ -705,7 +705,7 @@ def perform_request(self, data):
).es_dsl
response = api.apm_api.query_trace_list(params)
except BKAPIError as e:
raise ValueError(_lazy(f"Trace列表请求失败: {e.data.get('message')}"))
raise CustomException(_lazy(f"Trace列表请求失败: {e.data.get('message')}"))

self.burial_point(data["bk_biz_id"], data["app_name"])

Expand Down Expand Up @@ -1096,7 +1096,7 @@ def perform_request(self, validated_data):
try:
response = api.apm_api.query_span_statistics(params)
except BKAPIError as e:
raise ValueError(_lazy("获取接口统计失败: {message}").format(message=e.data.get("message")))
raise CustomException(_lazy("获取接口统计失败: {message}").format(message=e.data.get("message")))

return response

Expand Down Expand Up @@ -1126,7 +1126,7 @@ def perform_request(self, validated_data):
try:
response = api.apm_api.query_service_statistics(params)
except BKAPIError as e:
raise ValueError(_lazy("获取服务统计失败: {message}".format(message=e.data.get("message"))))
raise CustomException(_lazy("获取服务统计失败: {message}".format(message=e.data.get("message"))))

return response

Expand Down
Loading

0 comments on commit 8988622

Please sign in to comment.