diff --git a/src/backend/services/web/analyze/controls/rule_audit.py b/src/backend/services/web/analyze/controls/rule_audit.py index f21cfc3b..2a9a9c71 100644 --- a/src/backend/services/web/analyze/controls/rule_audit.py +++ b/src/backend/services/web/analyze/controls/rule_audit.py @@ -402,6 +402,13 @@ def create_or_update_storage_nodes(self, need_create: bool, flow_id: int, sql_no ) return storage_node_ids + def stop_flow(self): + flow_status = self._describe_flow_status() + if flow_status not in [FlowNodeStatusChoices.RUNNING, FlowNodeStatusChoices.FAILED]: + return + params = self.build_update_flow_params(flow_id=self.strategy.backend_data["flow_id"]) + api.bk_base.stop_flow(**params) + @transaction.atomic() def _update_or_create_bkbase_flow(self) -> bool: # check create flow @@ -409,7 +416,9 @@ def _update_or_create_bkbase_flow(self) -> bool: if need_create: self._create_flow() else: - self.disabled(force=True) + # 更新 flow 前需要先停止 flow + self.stop_flow() + time.sleep(3) flow_id = self.strategy.backend_data["flow_id"] data_source_node_ids = self.create_or_update_data_source_nodes(need_create, flow_id) # 构建 sql 节点 @@ -446,16 +455,23 @@ def check_flow_status(self, strategy_id: int, success_status: str, failed_status return check_flow_status.delay(strategy_id, success_status, failed_status, other_status) - def _toggle_strategy(self, status: str, force: bool = False) -> None: - # update flow - params = { - "flow_id": self.strategy.backend_data.get("flow_id"), + def build_update_flow_params(self, flow_id: str) -> dict: + """ + 构建更新流参数 + """ + + return { + "flow_id": flow_id, "consuming_mode": BKBASE_FLOW_CONSUMING_MODE, "resource_sets": { "stream": settings.BKBASE_STREAM_RESOURCE_SET_ID, "batch": settings.BKBASE_BATCH_RESOURCE_SET_ID, }, } + + def _toggle_strategy(self, status: str, force: bool = False) -> None: + # update flow + params = self.build_update_flow_params(flow_id=self.strategy.backend_data.get("flow_id")) if not force and ( not params["flow_id"] or self.strategy.status