Skip to content

Commit

Permalink
Merge pull request #514 from 0RAJA/feat_rule_audit_realtime
Browse files Browse the repository at this point in the history
feat: 规则审计 - 事件实时生成风险 --story=120928201
  • Loading branch information
0RAJA authored Jan 9, 2025
2 parents aa1d112 + 6694fd1 commit 307fc53
Show file tree
Hide file tree
Showing 12 changed files with 223 additions and 41 deletions.
4 changes: 4 additions & 0 deletions src/backend/app_desc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ modules:
command: python manage.py celery beat -l info
plan: 4C1G5R
replicas: 1
gen-risk:
command: python manage.py gen_risk
plan: 4C1G5R
replicas: 2
scripts:
pre_release_hook: sh -c "python manage.py migrate --no-input && python manage.py createcachetable && python manage.py init_system && python manage.py init_fields && python manage.py sync_apigw"
svc_discovery:
Expand Down
10 changes: 9 additions & 1 deletion src/backend/config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
We undertake not to change the open source license (MIT license) applicable
to the current version of the project delivered to anyone in the future.
"""

import json
import sys

from bk_audit.constants.utils import LOGGER_NAME
Expand Down Expand Up @@ -367,6 +367,14 @@
DEFAULT_QUEUE_STORAGE_EXPIRES = int(os.getenv("BKAPP_DEFAULT_QUEUE_STORAGE_EXPIRES", 1))
# HDFS存储时长(天) -1 表示不限制
DEFAULT_HDFS_STORAGE_EXPIRES = int(os.getenv("BKAPP_DEFAULT_HDFS_STORAGE_EXPIRES", -1))
# 审计 kafka 配置
KAFKA_CONFIG = json.loads(os.getenv("BKAPP_KAFKA_CONFIG", "{}"))
# 事件 kafka 拉取超时时长
EVENT_KAFKA_TIMEOUT_MS = int(os.getenv("BKAPP_EVENT_KAFKA_TIMEOUT_MS", 1000))
# 事件 kafka 最大拉取记录数
EVENT_KAFKA_MAX_RECORDS = int(os.getenv("BKAPP_EVENT_KAFKA_MAX_RECORDS", 10))
# 事件 kafka 拉取间隔时间
EVENT_KAFKA_SLEEP_TIME = float(os.getenv("BKAPP_EVENT_KAFKA_SLEEP_TIME", 0.5))

"""
以下为框架代码 请勿修改
Expand Down
33 changes: 33 additions & 0 deletions src/backend/core/connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making
蓝鲸智云 - 审计中心 (BlueKing - Audit Center) available.
Copyright (C) 2023 THL A29 Limited,
a Tencent company. All rights reserved.
Licensed under the MIT License (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
either express or implied. See the License for the
specific language governing permissions and limitations under the License.
We undertake not to change the open source license (MIT license) applicable
to the current version of the project delivered to anyone in the future.
"""

from django.db import connections


def get_pymysql_connection(db_name):
connection = connections[db_name]
pymysql_conn = connection.connection
return pymysql_conn


def ping_db(reconnect=True):
for db_name in connections.databases.keys():
conn = get_pymysql_connection(db_name)
if not conn:
continue
conn.ping(reconnect=reconnect)
61 changes: 61 additions & 0 deletions src/backend/core/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making
蓝鲸智云 - 审计中心 (BlueKing - Audit Center) available.
Copyright (C) 2023 THL A29 Limited,
a Tencent company. All rights reserved.
Licensed under the MIT License (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
either express or implied. See the License for the
specific language governing permissions and limitations under the License.
We undertake not to change the open source license (MIT license) applicable
to the current version of the project delivered to anyone in the future.
"""

import abc
import time

from blueapps.utils.logger import logger
from kafka import KafkaConsumer

from core.connection import ping_db


class KafkaRecordConsumer:
"""kafka 消息消费者基类"""

def __init__(self, consumer: KafkaConsumer, timeout_ms: int, max_records: int, sleep_time: float, sleep_wait=True):
self.consumer = consumer
self.timeout_ms = timeout_ms
self.max_records = max_records
self.sleep_time = sleep_time
self.sleep_wait = sleep_wait

def process(self):
while True:
data = self.consumer.poll(timeout_ms=self.timeout_ms, max_records=self.max_records)
if not data:
if not self.sleep_wait:
return
time.sleep(self.sleep_time)
continue
# 重连 db,防止消费者长时间没有消费数据的情况下, db 连接因为空闲被释放
ping_db()
for records in data.values():
self.process_records(records)
self.consumer.commit()

def process_records(self, records: list):
for record in records:
try:
self.process_record(record)
except Exception as e: # pylint: disable=broad-except
logger.exception(f"[{self.__class__.__name__}] process_record error: {e}; record={record}")

@abc.abstractmethod
def process_record(self, record):
raise NotImplementedError()
12 changes: 11 additions & 1 deletion src/backend/pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ dependencies = [
"bk-notice-sdk==1.1.1",
"pypika==0.48.9",
"pydantic==2.10.4",
"kafka-python==2.0.2",
]
requires-python = "==3.10.*"
readme = "README.md"
Expand Down
1 change: 1 addition & 0 deletions src/backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ elasticsearch==7.17.9
gunicorn==22.0.0
ipython==8.10.0
JSON-log-formatter==0.4.0
kafka-python==2.0.2
mistune==2.0.3
numpy==1.24.2
openpyxl==3.1.2
Expand Down
3 changes: 3 additions & 0 deletions src/backend/services/web/analyze/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
BKBASE_STRATEGY_ID_FIELD = "strategy_id"
DEFAULT_QUEUE_STORAGE_CLUSTER_KEY = "default_queue_storage_cluster"
DEFAULT_HDFS_STORAGE_CLUSTER_KEY = "default_hdfs_storage_cluster"
AUDIT_EVENT_TABLE_PREFIX = "audit_event"
AUDIT_EVENT_TABLE_FORMAT = f"{AUDIT_EVENT_TABLE_PREFIX}_%s_%s"
AUDIT_EVENT_QUEUE_TOPIC_PATTERN = f"^queue_(?'bk_biz_id'.*)_{AUDIT_EVENT_TABLE_PREFIX}_(?'namespace'.*)_(?'time_ns'.*)$"


class ControlTypeChoices(TextChoices):
Expand Down
41 changes: 23 additions & 18 deletions src/backend/services/web/analyze/controls/rule_audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from apps.notice.handlers import ErrorMsgHandler
from core.lock import lock
from services.web.analyze.constants import (
AUDIT_EVENT_TABLE_FORMAT,
BKBASE_DEFAULT_BASELINE_LOCATION,
BKBASE_DEFAULT_COUNT_FREQ,
BKBASE_DEFAULT_OFFSET,
Expand Down Expand Up @@ -79,7 +80,6 @@ class RuleAuditController(BaseControl):
def __init__(self, strategy_id: int):
super().__init__(strategy_id)
self.rt_node_map = {} # {rt_id: node_id}
self.sql_node_table_name = ""
self.x_interval = 300
self.y_interval = 30
self.x = 0
Expand Down Expand Up @@ -114,6 +114,17 @@ def update_or_create(self, status: str):
def flow_name(self) -> str:
return f"{self.strategy.strategy_id}-{str(time.time_ns())}"

@cached_property
def raw_table_name(self) -> str:
"""
原始表名
"""

return self.strategy.backend_data.get("raw_table_name") or AUDIT_EVENT_TABLE_FORMAT % (
self.strategy.namespace,
str(time.time_ns()),
)

def _create_flow(self) -> None:
resp = api.bk_base.create_flow(project_id=settings.BKBASE_PROJECT_ID, flow_name=self.flow_name)
self.strategy.backend_data["flow_id"] = resp["flow_id"]
Expand Down Expand Up @@ -185,7 +196,7 @@ def _build_data_source_node_config(self, rt_id: str) -> dict:
"name": rt_id,
}

def _build_sql_node_config(self, bk_biz_id: int, raw_table_name: str) -> dict:
def _build_sql_node_config(self, bk_biz_id: int) -> dict:
"""
build sql node
"""
Expand All @@ -199,14 +210,13 @@ def _build_sql_node_config(self, bk_biz_id: int, raw_table_name: str) -> dict:

# init sql node
sql_node_type = FlowSQLNodeType.get_sql_node_type(data_source["source_type"])
self.sql_node_table_name = f"{sql_node_type}_{raw_table_name}"
sql_node_params = {
"node_type": sql_node_type,
"bk_biz_id": bk_biz_id,
"from_result_table_ids": self.rt_ids,
"name": self.sql_node_table_name,
"table_name": self.sql_node_table_name,
"output_name": self.sql_node_table_name,
"name": f"{sql_node_type}_{self.raw_table_name}",
"table_name": self.raw_table_name,
"output_name": self.raw_table_name,
}
# add realtime node
if sql_node_type == FlowSQLNodeType.REALTIME:
Expand Down Expand Up @@ -316,15 +326,13 @@ def create_or_update_data_source_nodes(self, need_create: bool, flow_id: int) ->
)
return data_source_node_ids

def create_or_update_sql_node(
self, need_create: bool, flow_id: int, data_source_node_ids: List[int], raw_table_name: str
) -> int:
def create_or_update_sql_node(self, need_create: bool, flow_id: int, data_source_node_ids: List[int]) -> int:
"""
创建/更新sql节点
"""

bk_biz_id = int(self.rt_ids[0].split("_", 1)[0])
sql_node_config = self._build_sql_node_config(bk_biz_id, raw_table_name)
sql_node_config = self._build_sql_node_config(bk_biz_id)
self.x += self.x_interval
self.y = self.y_interval
sql_node = {
Expand All @@ -349,9 +357,7 @@ def create_or_update_sql_node(
)
return sql_node_id

def create_or_update_storage_nodes(
self, need_create: bool, flow_id: int, sql_node_id: int, raw_table_name: str
) -> List[int]:
def create_or_update_storage_nodes(self, need_create: bool, flow_id: int, sql_node_id: int) -> List[int]:
"""
创建/更新存储节点
"""
Expand All @@ -363,7 +369,7 @@ def create_or_update_storage_nodes(
storage_nodes = [ESStorageNode, QueueStorageNode, HDFSStorageNode]
for idx, storage_node in enumerate(storage_nodes):
node_config = storage_node(namespace=self.strategy.namespace).build_node_config(
bk_biz_id=bk_biz_id, raw_table_name=raw_table_name, sql_node_table_name=self.sql_node_table_name
bk_biz_id=bk_biz_id, raw_table_name=self.raw_table_name
)
if not node_config:
continue
Expand Down Expand Up @@ -392,17 +398,16 @@ def create_or_update_storage_nodes(
@transaction.atomic()
def _update_or_create_bkbase_flow(self) -> bool:
# check create flow
raw_table_name = self.strategy.backend_data.get("raw_table_name") or str(time.time_ns())
self.strategy.backend_data["raw_table_name"] = raw_table_name
need_create = not self.strategy.backend_data.get("flow_id")
if need_create:
self._create_flow()
flow_id = self.strategy.backend_data["flow_id"]
data_source_node_ids = self.create_or_update_data_source_nodes(need_create, flow_id)
# 构建 sql 节点
sql_node_id = self.create_or_update_sql_node(need_create, flow_id, data_source_node_ids, raw_table_name)
sql_node_id = self.create_or_update_sql_node(need_create, flow_id, data_source_node_ids)
# 构建存储节点
self.create_or_update_storage_nodes(need_create, flow_id, sql_node_id, raw_table_name)
self.create_or_update_storage_nodes(need_create, flow_id, sql_node_id)
self.strategy.backend_data["raw_table_name"] = self.raw_table_name
self.strategy.save(update_fields=["backend_data"])
return need_create

Expand Down
6 changes: 3 additions & 3 deletions src/backend/services/web/analyze/storage_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def cluster(self) -> Union[str, int]:
def build_rt_id(self, bk_biz_id: int, table_name: str) -> str:
return f"{bk_biz_id}_{table_name}"

def build_node_config(self, bk_biz_id: int, raw_table_name: str, sql_node_table_name: str) -> dict:
def build_node_config(self, bk_biz_id: int, raw_table_name: str) -> dict:
if not self.cluster:
return {}
result_table_id = self.build_rt_id(bk_biz_id, raw_table_name)
Expand Down Expand Up @@ -93,10 +93,10 @@ def cluster(self) -> Union[str, int]:
bkbase_cluster_id = cluster_info["cluster_config"].get("custom_option", {}).get("bkbase_cluster_id")
return bkbase_cluster_id

def build_node_config(self, bk_biz_id: int, raw_table_name: str, sql_node_table_name: str) -> dict:
def build_node_config(self, bk_biz_id: int, raw_table_name: str) -> dict:
table_id = EventHandler.get_table_id().replace(".", "_")
return {
**super().build_node_config(bk_biz_id, raw_table_name, sql_node_table_name),
**super().build_node_config(bk_biz_id, raw_table_name),
"indexed_fields": [],
"has_replica": False,
"has_unique_key": False,
Expand Down
39 changes: 21 additions & 18 deletions src/backend/services/web/risk/handlers/risk.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,31 +56,34 @@ class RiskHandler:
Deal with Risk
"""

def generate_risk(self, event: dict):
try:
is_create, risk = self.create_risk(event)
if is_create:
self.send_risk_notice(risk)

from services.web.risk.tasks import process_risk_ticket

process_risk_ticket(risk_id=risk.risk_id)
except Exception as err: # NOCC:broad-except(需要处理所有错误)
logger.exception("[CreateRiskFailed] Event: %s; Error: %s", json.dumps(event), err)
ErrorMsgHandler(
title=gettext("Create Risk Failed"),
content=gettext("Strategy ID: %s; Raw Event ID:\t%s")
% (
event.get("strategy_id"),
event.get("raw_event_id"),
),
).send()

def generate_risk_from_event(self, start_time: datetime.datetime, end_time: datetime.datetime) -> None:
"""
从事件生成风险
"""

events = self.load_events(start_time, end_time)
for event in events:
try:
is_create, risk = self.create_risk(event)
if is_create:
self.send_risk_notice(risk)

from services.web.risk.tasks import process_risk_ticket

process_risk_ticket(risk_id=risk.risk_id)
except Exception as err: # NOCC:broad-except(需要处理所有错误)
logger.exception("[CreateRiskFailed] Event: %s; Error: %s", json.dumps(event), err)
ErrorMsgHandler(
title=gettext("Create Risk Failed"),
content=gettext("Strategy ID: %s; Raw Event ID:\t%s")
% (
event.get("strategy_id"),
event.get("raw_event_id"),
),
).send()
self.generate_risk(event)

def load_events(self, start_time: datetime.datetime, end_time: datetime.datetime) -> List[dict]:
"""
Expand Down
Loading

0 comments on commit 307fc53

Please sign in to comment.