diff --git a/src/bk-user/bkuser/apps/sync/runners.py b/src/bk-user/bkuser/apps/sync/runners.py index c56140fc5..eccd9c290 100644 --- a/src/bk-user/bkuser/apps/sync/runners.py +++ b/src/bk-user/bkuser/apps/sync/runners.py @@ -72,8 +72,13 @@ def _initial_plugin(self, ctx: DataSourceSyncTaskContext, plugin_init_extra_kwar def _sync_departments(self, ctx: DataSourceSyncTaskContext): """同步部门信息""" - departments = self.plugin.fetch_departments() - DataSourceDepartmentSyncer(ctx, self.data_source, departments).sync() + DataSourceDepartmentSyncer( + ctx=ctx, + data_source=self.data_source, + raw_departments=self.plugin.fetch_departments(), + overwrite=bool(self.task.extras.get("overwrite", False)), + incremental=bool(self.task.extras.get("incremental", False)), + ).sync() def _sync_users(self, ctx: DataSourceSyncTaskContext): """同步用户信息""" diff --git a/src/bk-user/bkuser/apps/sync/syncers.py b/src/bk-user/bkuser/apps/sync/syncers.py index 2fd2a5868..15dceb11d 100644 --- a/src/bk-user/bkuser/apps/sync/syncers.py +++ b/src/bk-user/bkuser/apps/sync/syncers.py @@ -43,10 +43,14 @@ def __init__( ctx: DataSourceSyncTaskContext, data_source: DataSource, raw_departments: List[RawDataSourceDepartment], + overwrite: bool, + incremental: bool, ): self.ctx = ctx self.data_source = data_source self.raw_departments = raw_departments + self.overwrite = overwrite + self.incremental = incremental def sync(self): self.ctx.logger.info(f"receive {len(self.raw_departments)} departments from data source plugin") # noqa: G004 @@ -67,8 +71,8 @@ def _sync_departments(self): raw_dept_codes = {dept.code for dept in self.raw_departments} waiting_create_dept_codes = raw_dept_codes - dept_codes - waiting_delete_dept_codes = dept_codes - raw_dept_codes - waiting_update_dept_codes = dept_codes & raw_dept_codes + waiting_delete_dept_codes = dept_codes - raw_dept_codes if not self.incremental else set() + waiting_update_dept_codes = dept_codes & raw_dept_codes if self.overwrite else set() if waiting_delete_dept_codes: self._delete_departments(waiting_delete_dept_codes) @@ -135,11 +139,24 @@ def _sync_department_relations(self): dept_code_map = {dept.code: dept for dept in DataSourceDepartment.objects.filter(data_source=self.data_source)} # {dept_code: parent_dept_code} dept_parent_code_map = {dept.code: dept.parent for dept in self.raw_departments} + + # 如果是增量同步模式,则需要将存量的部门关系捞出来,和新的合并下,再删除重建 + # Q: 为什么不是增量模式时候,一通对比之后,直接往现有的 MPTT 森林里面塞节点? + # A: MPTT 树结构复杂,变更操作可能存在风险,且后台任务对性能要求不高,先用简单的删除重建方案 + if self.incremental: + for relation in DataSourceDepartmentRelation.objects.filter(data_source=self.data_source): + # 如果某个部门有新的父部门,则跳过 + if relation.department.code in dept_parent_code_map: + continue + + dept_parent_code_map[relation.department.code] = ( + relation.parent.department.code if relation.parent else None + ) + # {dept_code: data_source_dept_relation} dept_code_rel_map: Dict[str, DataSourceDepartmentRelation] = {} - + mptt_tree_ids: Set[int] = set() # 目前采用全部删除,再重建的方式 - mptt_tree_ids = set() with DataSourceDepartmentRelation.objects.disable_mptt_updates(): DataSourceDepartmentRelation.objects.filter(data_source=self.data_source).delete() parent_relations = list(dept_parent_code_map.items()) @@ -384,8 +401,11 @@ def _sync_user_leader_relations(self): f"create {len(waiting_create_user_leader_relations)} user-leader relations" # noqa: G004 ) - # 集合做差,再转换成 relation ID,得到需要删除的 relation ID 列表 - if waiting_delete_user_leader_id_tuples := exists_user_leader_id_tuples - user_leader_id_tuples: + # 集合做差,再转换成 relation ID,得到需要删除的 relation ID 列表(注意:增量更新时,不应该删除关系) + waiting_delete_user_leader_id_tuples = ( + exists_user_leader_id_tuples - user_leader_id_tuples if not self.incremental else set() + ) + if waiting_delete_user_leader_id_tuples: waiting_delete_user_leader_relation_ids = [ exists_user_leader_relations_map[t] for t in waiting_delete_user_leader_id_tuples ] @@ -436,8 +456,11 @@ def _sync_user_department_relations(self): f"create {len(waiting_create_user_dept_relations)} user-department relations" # noqa: G004 ) - # 集合做差,再转换成 relation ID,得到需要删除的 relation ID 列表 - if waiting_delete_user_dept_id_tuples := exists_user_dept_id_tuples - user_dept_id_tuples: + # 集合做差,再转换成 relation ID,得到需要删除的 relation ID 列表(注意:增量更新时,不应该删除关系) + waiting_delete_user_dept_id_tuples = ( + exists_user_dept_id_tuples - user_dept_id_tuples if not self.incremental else set() + ) + if waiting_delete_user_dept_id_tuples: waiting_delete_user_dept_relation_ids = [ exists_user_dept_relations_map[t] for t in waiting_delete_user_dept_id_tuples ] diff --git a/src/bk-user/bkuser/plugins/local/parser.py b/src/bk-user/bkuser/plugins/local/parser.py index 7072db3e8..efb8acdbd 100644 --- a/src/bk-user/bkuser/plugins/local/parser.py +++ b/src/bk-user/bkuser/plugins/local/parser.py @@ -29,7 +29,7 @@ SheetColumnsNotMatch, UserSheetNotExists, ) -from bkuser.plugins.local.utils import gen_code +from bkuser.plugins.local.utils import gen_dept_code from bkuser.plugins.models import RawDataSourceDepartment, RawDataSourceUser @@ -209,7 +209,7 @@ def _parse_departments(self): organizations.add(cur_org.strip()) # 组织路径:本数据源部门 Code 映射表 - org_code_map = {org: gen_code(org) for org in organizations} + org_code_map = {org: gen_dept_code(org) for org in organizations} for org in organizations: parent_org, __, dept_name = org.rpartition("/") self.departments.append( @@ -235,7 +235,7 @@ def _parse_users(self): if organizations := properties.pop("organizations"): for org in organizations.split(","): if org := org.strip(): - departments.append(gen_code(org)) + departments.append(gen_dept_code(org)) if leader_names := properties.pop("leaders"): for ld in leader_names.split(","): diff --git a/src/bk-user/bkuser/plugins/local/utils.py b/src/bk-user/bkuser/plugins/local/utils.py index 82d78055f..636a3e1bf 100644 --- a/src/bk-user/bkuser/plugins/local/utils.py +++ b/src/bk-user/bkuser/plugins/local/utils.py @@ -11,7 +11,7 @@ from hashlib import sha256 -def gen_code(org: str) -> str: +def gen_dept_code(org: str) -> str: # 本地数据源数据没有提供部门 code 的方式, # 因此使用 sha256 计算以避免冲突,也便于后续插入 DB 时进行比较 # 注意:本地数据源用户 code 就是 username,不需要额外计算 code diff --git a/src/bk-user/tests/apps/sync/conftest.py b/src/bk-user/tests/apps/sync/conftest.py index c40aadf29..9ad4f2aea 100644 --- a/src/bk-user/tests/apps/sync/conftest.py +++ b/src/bk-user/tests/apps/sync/conftest.py @@ -235,6 +235,17 @@ def raw_users() -> List[RawDataSourceUser]: ] +@pytest.fixture() +def random_raw_department() -> RawDataSourceDepartment: + """生成随机部门""" + return RawDataSourceDepartment( + code=generate_random_string(), + name="dept_random", + parent="company", + extras={"region": "random"}, + ) + + @pytest.fixture() def random_raw_user() -> RawDataSourceUser: """生成随机用户""" @@ -250,6 +261,6 @@ def random_raw_user() -> RawDataSourceUser: "gender": "other", "region": "britain", }, - leaders=[], - departments=[], + leaders=["zhangsan"], + departments=["company"], ) diff --git a/src/bk-user/tests/apps/sync/test_syncers.py b/src/bk-user/tests/apps/sync/test_syncers.py index 5d3bd4c88..5a0cc449d 100644 --- a/src/bk-user/tests/apps/sync/test_syncers.py +++ b/src/bk-user/tests/apps/sync/test_syncers.py @@ -35,7 +35,9 @@ class TestDataSourceDepartmentSyncer: def test_initial(self, data_source_sync_task_ctx, bare_local_data_source, raw_departments): - DataSourceDepartmentSyncer(data_source_sync_task_ctx, bare_local_data_source, raw_departments).sync() + DataSourceDepartmentSyncer( + data_source_sync_task_ctx, bare_local_data_source, raw_departments, overwrite=True, incremental=False + ).sync() # 验证部门信息 departments = DataSourceDepartment.objects.filter(data_source=bare_local_data_source) @@ -54,7 +56,9 @@ def test_update(self, data_source_sync_task_ctx, full_local_data_source): RawDataSourceDepartment(code="dept_c", name="部门C", parent="company", extras={"region": "SH"}), RawDataSourceDepartment(code="center_ca", name="中心CA", parent="dept_c", extras={"region": "CS"}), ] - DataSourceDepartmentSyncer(data_source_sync_task_ctx, full_local_data_source, raw_departments).sync() + DataSourceDepartmentSyncer( + data_source_sync_task_ctx, full_local_data_source, raw_departments, overwrite=True, incremental=False + ).sync() # 验证部门信息 departments = DataSourceDepartment.objects.filter(data_source=full_local_data_source) @@ -69,9 +73,35 @@ def test_update(self, data_source_sync_task_ctx, full_local_data_source): data_source=full_local_data_source ) == self._gen_parent_relations_from_raw_departments(raw_departments) + def test_update_with_incremental(self, data_source_sync_task_ctx, full_local_data_source, random_raw_department): + dept_relation_cnt_before_sync = DataSourceDepartmentRelation.objects.filter( + data_source=full_local_data_source + ).count() + excepted_dept_codes = set( + DataSourceDepartment.objects.filter(data_source=full_local_data_source).values_list("code", flat=True) + ) + excepted_dept_codes.add(random_raw_department.code) + + DataSourceDepartmentSyncer( + data_source_sync_task_ctx, + full_local_data_source, + [random_raw_department], + overwrite=True, + incremental=True, + ).sync() + + depts = DataSourceDepartment.objects.filter(data_source=full_local_data_source) + assert set(depts.values_list("code", flat=True)) == excepted_dept_codes + # 随机部门只有一个父部门,所以应该会多一个关系 + assert DataSourceDepartmentRelation.objects.filter(data_source=full_local_data_source).count() == ( + dept_relation_cnt_before_sync + 1 + ) + def test_destroy(self, data_source_sync_task_ctx, full_local_data_source): raw_departments: List[RawDataSourceDepartment] = [] - DataSourceDepartmentSyncer(data_source_sync_task_ctx, full_local_data_source, raw_departments).sync() + DataSourceDepartmentSyncer( + data_source_sync_task_ctx, full_local_data_source, raw_departments, overwrite=True, incremental=False + ).sync() # 同步了空的数据,导致该数据源的所有部门,部门关系信息都被删除 assert not DataSourceDepartment.objects.filter(data_source=full_local_data_source).exists() @@ -99,13 +129,11 @@ def test_initial( raw_users, ): # 先同步部门数据,再同步用户数据 - DataSourceDepartmentSyncer(data_source_sync_task_ctx, bare_local_data_source, raw_departments).sync() + DataSourceDepartmentSyncer( + data_source_sync_task_ctx, bare_local_data_source, raw_departments, overwrite=True, incremental=False + ).sync() DataSourceUserSyncer( - data_source_sync_task_ctx, - bare_local_data_source, - raw_users, - overwrite=True, - incremental=False, + data_source_sync_task_ctx, bare_local_data_source, raw_users, overwrite=True, incremental=False ).sync() # 验证用户信息 @@ -239,12 +267,17 @@ def test_update_without_overwrite( assert zhangsan.extras == {} def test_update_with_incremental(self, data_source_sync_task_ctx, full_local_data_source, random_raw_user): + dept_user_relation_cnt_before_sync = DataSourceDepartmentUserRelation.objects.filter( + data_source=full_local_data_source + ).count() + user_leader_relation_cnt_before_sync = DataSourceUserLeaderRelation.objects.filter( + data_source=full_local_data_source + ).count() user_codes = set( - DataSourceUser.objects.filter( - data_source=full_local_data_source, - ).values_list("code", flat=True) + DataSourceUser.objects.filter(data_source=full_local_data_source).values_list("code", flat=True) ) user_codes.add(random_raw_user.code) + DataSourceUserSyncer( data_source_sync_task_ctx, full_local_data_source, @@ -255,6 +288,13 @@ def test_update_with_incremental(self, data_source_sync_task_ctx, full_local_dat users = DataSourceUser.objects.filter(data_source=full_local_data_source) assert set(users.values_list("code", flat=True)) == user_codes + # 随机用户属于一个部门 & 拥有一个 leader,因此两种关系的数量应该都是 + 1 + assert DataSourceDepartmentUserRelation.objects.filter(data_source=full_local_data_source).count() == ( + dept_user_relation_cnt_before_sync + 1 + ) + assert DataSourceUserLeaderRelation.objects.filter(data_source=full_local_data_source).count() == ( + user_leader_relation_cnt_before_sync + 1 + ) def test_update_with_invalid_leader(self, data_source_sync_task_ctx, full_local_data_source, random_raw_user): """全量同步模式,要求用户的 leader 必须也在数据中,否则会有警告""" diff --git a/src/bk-user/tests/fixtures/__init__.py b/src/bk-user/tests/fixtures/__init__.py new file mode 100644 index 000000000..1060b7bf4 --- /dev/null +++ b/src/bk-user/tests/fixtures/__init__.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-用户管理(Bk-User) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" diff --git a/src/bk-user/tests/plugins/__init__.py b/src/bk-user/tests/plugins/__init__.py new file mode 100644 index 000000000..1060b7bf4 --- /dev/null +++ b/src/bk-user/tests/plugins/__init__.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-用户管理(Bk-User) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" diff --git a/src/bk-user/tests/plugins/general/__init__.py b/src/bk-user/tests/plugins/general/__init__.py new file mode 100644 index 000000000..1060b7bf4 --- /dev/null +++ b/src/bk-user/tests/plugins/general/__init__.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-用户管理(Bk-User) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" diff --git a/src/bk-user/tests/plugins/local/test_parser.py b/src/bk-user/tests/plugins/local/test_parser.py index e1fdb4ff0..f29345e19 100644 --- a/src/bk-user/tests/plugins/local/test_parser.py +++ b/src/bk-user/tests/plugins/local/test_parser.py @@ -23,7 +23,7 @@ UserSheetNotExists, ) from bkuser.plugins.local.parser import LocalDataSourceDataParser -from bkuser.plugins.local.utils import gen_code +from bkuser.plugins.local.utils import gen_dept_code from bkuser.plugins.models import RawDataSourceDepartment, RawDataSourceUser @@ -116,18 +116,18 @@ def test_get_departments(self, logger, user_wk): parser = LocalDataSourceDataParser(logger, user_wk) parser.parse() - company_code = gen_code("公司") - dept_a_code = gen_code("公司/部门A") - dept_b_code = gen_code("公司/部门B") - dept_c_code = gen_code("公司/部门C") - center_aa_code = gen_code("公司/部门A/中心AA") - center_ab_code = gen_code("公司/部门A/中心AB") - group_aaa_code = gen_code("公司/部门A/中心AA/小组AAA") - group_aba_code = gen_code("公司/部门A/中心AB/小组ABA") - center_ba_code = gen_code("公司/部门B/中心BA") - group_baa_code = gen_code("公司/部门B/中心BA/小组BAA") - center_ca_code = gen_code("公司/部门C/中心CA") - group_caa_code = gen_code("公司/部门C/中心CA/小组CAA") + company_code = gen_dept_code("公司") + dept_a_code = gen_dept_code("公司/部门A") + dept_b_code = gen_dept_code("公司/部门B") + dept_c_code = gen_dept_code("公司/部门C") + center_aa_code = gen_dept_code("公司/部门A/中心AA") + center_ab_code = gen_dept_code("公司/部门A/中心AB") + group_aaa_code = gen_dept_code("公司/部门A/中心AA/小组AAA") + group_aba_code = gen_dept_code("公司/部门A/中心AB/小组ABA") + center_ba_code = gen_dept_code("公司/部门B/中心BA") + group_baa_code = gen_dept_code("公司/部门B/中心BA/小组BAA") + center_ca_code = gen_dept_code("公司/部门C/中心CA") + group_caa_code = gen_dept_code("公司/部门C/中心CA/小组CAA") assert sorted(parser.get_departments(), key=lambda d: d.name) == [ RawDataSourceDepartment(code=center_aa_code, name="中心AA", parent=dept_a_code), @@ -149,7 +149,7 @@ def test_get_users(self, logger, user_wk): parser.parse() def gen_depts(orgs: List[str]) -> List[str]: - return [gen_code(o) for o in orgs] + return [gen_dept_code(o) for o in orgs] assert sorted(parser.get_users(), key=lambda u: u.properties["age"]) == [ RawDataSourceUser( diff --git a/src/bk-user/tests/plugins/local/test_utils.py b/src/bk-user/tests/plugins/local/test_utils.py index e45d42fff..530ecc518 100644 --- a/src/bk-user/tests/plugins/local/test_utils.py +++ b/src/bk-user/tests/plugins/local/test_utils.py @@ -9,7 +9,7 @@ specific language governing permissions and limitations under the License. """ import pytest -from bkuser.plugins.local.utils import gen_code +from bkuser.plugins.local.utils import gen_dept_code @pytest.mark.parametrize( @@ -22,7 +22,7 @@ ("公司/部门A/中心AA/小组AAA ", "e75be6462a8ff8b9b843b3c2e419db455b4477023f98941508bc19cfa3982ec0"), ], ) -def test_gen_code(raw, excepted): +def test_gen_dept_code(raw, excepted): # 重要:如果该单元测试挂了,说明修改了本地数据源部门的 Code 的生成规则 # 该行为会导致新同步的数据,无法与 DB 中的数据匹配上,将会触发数据重建!!! - assert gen_code(raw) == excepted + assert gen_dept_code(raw) == excepted