From 3d40344b778a0767076a1c8e83245102635bd1b9 Mon Sep 17 00:00:00 2001 From: hanshuaikang <1758504262@qq.com> Date: Thu, 16 Nov 2023 21:03:13 +0800 Subject: [PATCH 1/2] =?UTF-8?q?minor:=20=E6=96=B0=E5=A2=9E=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E5=B7=B2=E6=9C=89recursive=5Freplace=5Fid=EF=BC=8C=20?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E8=BF=94=E5=9B=9Emap?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bamboo-pipeline/pipeline/parser/utils.py | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/runtime/bamboo-pipeline/pipeline/parser/utils.py b/runtime/bamboo-pipeline/pipeline/parser/utils.py index 9b1f7203..669dbe01 100644 --- a/runtime/bamboo-pipeline/pipeline/parser/utils.py +++ b/runtime/bamboo-pipeline/pipeline/parser/utils.py @@ -13,23 +13,35 @@ import logging -from pipeline.utils.uniqid import node_uniqid, line_uniqid from pipeline.core.constants import PE from pipeline.exceptions import NodeNotExistException +from pipeline.utils.uniqid import line_uniqid, node_uniqid logger = logging.getLogger("root") BRANCH_SELECT_GATEWAYS = {PE.ExclusiveGateway, PE.ConditionalParallelGateway} -def recursive_replace_id(pipeline_data): +def _recursive_replace_id_with_node_map(pipeline_data, subprocess_id=None): + """ + 替换pipeline_id 并返回 对应的 node_map 映射 + """ pipeline_data[PE.id] = node_uniqid() - replace_all_id(pipeline_data) + node_map = {} + replace_result_map = replace_all_id(pipeline_data) + pipeline_id = subprocess_id or pipeline_data[PE.id] + node_map[pipeline_id] = replace_result_map activities = pipeline_data[PE.activities] for act_id, act in list(activities.items()): if act[PE.type] == PE.SubProcess: - recursive_replace_id(act[PE.pipeline]) + replace_result_map = _recursive_replace_id_with_node_map(act[PE.pipeline], act_id) act[PE.pipeline][PE.id] = act_id + node_map[pipeline_id].setdefault("subprocess", {}).update(replace_result_map) + return node_map + + +def recursive_replace_id(pipeline_data): + return _recursive_replace_id_with_node_map(pipeline_data) def replace_all_id(pipeline_data): From 6e306616eab2b4ce523a298a1a97f25179f0c1d8 Mon Sep 17 00:00:00 2001 From: hanshuaikang <1758504262@qq.com> Date: Fri, 17 Nov 2023 12:01:34 +0800 Subject: [PATCH 2/2] =?UTF-8?q?test:=20=E8=A1=A5=E5=85=85replace=E5=8D=95?= =?UTF-8?q?=E5=85=83=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pipeline/tests/parser/__init__.py | 1 + .../pipeline/tests/parser/test_replace.py | 104 ++++++++++++++++++ 2 files changed, 105 insertions(+) create mode 100644 runtime/bamboo-pipeline/pipeline/tests/parser/__init__.py create mode 100644 runtime/bamboo-pipeline/pipeline/tests/parser/test_replace.py diff --git a/runtime/bamboo-pipeline/pipeline/tests/parser/__init__.py b/runtime/bamboo-pipeline/pipeline/tests/parser/__init__.py new file mode 100644 index 00000000..40a96afc --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/tests/parser/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/runtime/bamboo-pipeline/pipeline/tests/parser/test_replace.py b/runtime/bamboo-pipeline/pipeline/tests/parser/test_replace.py new file mode 100644 index 00000000..b7830800 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/tests/parser/test_replace.py @@ -0,0 +1,104 @@ +# -*- coding: utf-8 -*- + +from django.test import TestCase +from pipeline.parser.utils import recursive_replace_id, replace_all_id + +from bamboo_engine.builder import ( + ConvergeGateway, + Data, + EmptyEndEvent, + EmptyStartEvent, + ExclusiveGateway, + ParallelGateway, + ServiceActivity, + SubProcess, + Var, + build_tree, + builder, +) + + +class ReplaceTests(TestCase): + def test_replace_all_id(self): + start = EmptyStartEvent() + act = ServiceActivity(component_code="example_component") + end = EmptyEndEvent() + start.extend(act).extend(end) + pipeline = builder.build_tree(start) + node_map = replace_all_id(pipeline) + self.assertIsInstance(node_map, dict) + self.assertIn(pipeline["start_event"]["id"], node_map["start_event"][start.id]) + self.assertIn(pipeline["end_event"]["id"], node_map["end_event"][end.id]) + self.assertEqual(list(pipeline["activities"].keys())[0], node_map["activities"][act.id]) + + def test_replace_all_id_gateway(self): + start = EmptyStartEvent() + pg = ParallelGateway() + act_1 = ServiceActivity(component_code="pipe_example_component", name="act_1") + act_2 = ServiceActivity(component_code="pipe_example_component", name="act_2") + act_3 = ServiceActivity(component_code="pipe_example_component", name="act_3") + cg = ConvergeGateway() + end = EmptyEndEvent() + + start.extend(pg).connect(act_1, act_2, act_3).to(pg).converge(cg).extend(end) + pipeline = build_tree(start) + node_map = replace_all_id(pipeline) + + self.assertIn(pg.id, node_map["gateways"].keys()) + self.assertIn(cg.id, node_map["gateways"].keys()) + + self.assertIn(node_map["gateways"][pg.id], pipeline["gateways"].keys()) + self.assertIn(node_map["gateways"][cg.id], pipeline["gateways"].keys()) + + def test_recursive_replace_id(self): + start = EmptyStartEvent() + pg = ParallelGateway() + act_1 = ServiceActivity(component_code="pipe_example_component", name="act_1") + act_2 = ServiceActivity(component_code="pipe_example_component", name="act_2") + act_3 = ServiceActivity(component_code="pipe_example_component", name="act_3") + cg = ConvergeGateway() + end = EmptyEndEvent() + start.extend(pg).connect(act_1, act_2, act_3).to(pg).converge(cg).extend(end) + pipeline = build_tree(start) + node_map = recursive_replace_id(pipeline) + self.assertIn(pg.id, node_map[pipeline["id"]]["gateways"].keys()) + self.assertIn(cg.id, node_map[pipeline["id"]]["gateways"].keys()) + self.assertIn(node_map[pipeline["id"]]["gateways"][pg.id], pipeline["gateways"].keys()) + self.assertIn(node_map[pipeline["id"]]["gateways"][cg.id], pipeline["gateways"].keys()) + self.assertIn(act_1.id, node_map[pipeline["id"]]["activities"].keys()) + + def test_recursive_replace_id_with_subprocess(self): + def sub_process(data): + subproc_start = EmptyStartEvent() + subproc_act = ServiceActivity(component_code="pipe_example_component", name="sub_act") + subproc_end = EmptyEndEvent() + + subproc_start.extend(subproc_act).extend(subproc_end) + + subproc_act.component.inputs.sub_input = Var(type=Var.SPLICE, value="${sub_input}") + + return SubProcess(start=subproc_start, data=data) + + start = EmptyStartEvent() + act_1 = ServiceActivity(component_code="pipe_example_component", name="act_1") + eg = ExclusiveGateway(conditions={0: "${act_1_output} < 0", 1: "${act_1_output} >= 0"}, name="act_2 or act_3") + + sub_pipeline_data_1 = Data(inputs={"${sub_input}": Var(type=Var.PLAIN, value=1)}) + subproc_1 = sub_process(sub_pipeline_data_1) + + sub_pipeline_data_2 = Data(inputs={"${sub_input}": Var(type=Var.PLAIN, value=2)}) + subproc_2 = sub_process(sub_pipeline_data_2) + end = EmptyEndEvent() + + start.extend(act_1).extend(eg).connect(subproc_1, subproc_2).converge(end) + + pipeline = build_tree(start) + node_map = recursive_replace_id(pipeline) + + self.assertEqual(len(node_map[pipeline["id"]]["subprocess"].keys()), 2) + self.assertIn( + node_map[pipeline["id"]]["activities"][subproc_1.id], node_map[pipeline["id"]]["subprocess"].keys() + ) + self.assertIn( + node_map[pipeline["id"]]["activities"][subproc_2.id], node_map[pipeline["id"]]["subprocess"].keys() + )