Skip to content

Commit

Permalink
fix: incremental data source sync must preserve original relations
Browse files Browse the repository at this point in the history
  • Loading branch information
narasux committed May 8, 2024
1 parent 4eab07c commit 7298400
Show file tree
Hide file tree
Showing 11 changed files with 154 additions and 45 deletions.
9 changes: 7 additions & 2 deletions src/bk-user/bkuser/apps/sync/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,13 @@ def _initial_plugin(self, ctx: DataSourceSyncTaskContext, plugin_init_extra_kwar

def _sync_departments(self, ctx: DataSourceSyncTaskContext):
"""同步部门信息"""
departments = self.plugin.fetch_departments()
DataSourceDepartmentSyncer(ctx, self.data_source, departments).sync()
DataSourceDepartmentSyncer(
ctx=ctx,
data_source=self.data_source,
raw_departments=self.plugin.fetch_departments(),
overwrite=bool(self.task.extras.get("overwrite", False)),
incremental=bool(self.task.extras.get("incremental", False)),
).sync()

def _sync_users(self, ctx: DataSourceSyncTaskContext):
"""同步用户信息"""
Expand Down
39 changes: 31 additions & 8 deletions src/bk-user/bkuser/apps/sync/syncers.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,14 @@ def __init__(
ctx: DataSourceSyncTaskContext,
data_source: DataSource,
raw_departments: List[RawDataSourceDepartment],
overwrite: bool,
incremental: bool,
):
self.ctx = ctx
self.data_source = data_source
self.raw_departments = raw_departments
self.overwrite = overwrite
self.incremental = incremental

def sync(self):
self.ctx.logger.info(f"receive {len(self.raw_departments)} departments from data source plugin") # noqa: G004
Expand All @@ -67,8 +71,8 @@ def _sync_departments(self):
raw_dept_codes = {dept.code for dept in self.raw_departments}

waiting_create_dept_codes = raw_dept_codes - dept_codes
waiting_delete_dept_codes = dept_codes - raw_dept_codes
waiting_update_dept_codes = dept_codes & raw_dept_codes
waiting_delete_dept_codes = dept_codes - raw_dept_codes if not self.incremental else set()
waiting_update_dept_codes = dept_codes & raw_dept_codes if self.overwrite else set()

if waiting_delete_dept_codes:
self._delete_departments(waiting_delete_dept_codes)
Expand Down Expand Up @@ -135,11 +139,24 @@ def _sync_department_relations(self):
dept_code_map = {dept.code: dept for dept in DataSourceDepartment.objects.filter(data_source=self.data_source)}
# {dept_code: parent_dept_code}
dept_parent_code_map = {dept.code: dept.parent for dept in self.raw_departments}

# 如果是增量同步模式,则需要将存量的部门关系捞出来,和新的合并下,再删除重建
# Q: 为什么不是增量模式时候,一通对比之后,直接往现有的 MPTT 森林里面塞节点?
# A: MPTT 树结构复杂,变更操作可能存在风险,且后台任务对性能要求不高,先用简单的删除重建方案
if self.incremental:
for relation in DataSourceDepartmentRelation.objects.filter(data_source=self.data_source):
# 如果某个部门有新的父部门,则跳过
if relation.department.code in dept_parent_code_map:
continue

dept_parent_code_map[relation.department.code] = (
relation.parent.department.code if relation.parent else None
)

# {dept_code: data_source_dept_relation}
dept_code_rel_map: Dict[str, DataSourceDepartmentRelation] = {}

mptt_tree_ids: Set[int] = set()
# 目前采用全部删除,再重建的方式
mptt_tree_ids = set()
with DataSourceDepartmentRelation.objects.disable_mptt_updates():
DataSourceDepartmentRelation.objects.filter(data_source=self.data_source).delete()
parent_relations = list(dept_parent_code_map.items())
Expand Down Expand Up @@ -384,8 +401,11 @@ def _sync_user_leader_relations(self):
f"create {len(waiting_create_user_leader_relations)} user-leader relations" # noqa: G004
)

# 集合做差,再转换成 relation ID,得到需要删除的 relation ID 列表
if waiting_delete_user_leader_id_tuples := exists_user_leader_id_tuples - user_leader_id_tuples:
# 集合做差,再转换成 relation ID,得到需要删除的 relation ID 列表(注意:增量更新时,不应该删除关系)
waiting_delete_user_leader_id_tuples = (
exists_user_leader_id_tuples - user_leader_id_tuples if not self.incremental else set()
)
if waiting_delete_user_leader_id_tuples:
waiting_delete_user_leader_relation_ids = [
exists_user_leader_relations_map[t] for t in waiting_delete_user_leader_id_tuples
]
Expand Down Expand Up @@ -436,8 +456,11 @@ def _sync_user_department_relations(self):
f"create {len(waiting_create_user_dept_relations)} user-department relations" # noqa: G004
)

# 集合做差,再转换成 relation ID,得到需要删除的 relation ID 列表
if waiting_delete_user_dept_id_tuples := exists_user_dept_id_tuples - user_dept_id_tuples:
# 集合做差,再转换成 relation ID,得到需要删除的 relation ID 列表(注意:增量更新时,不应该删除关系)
waiting_delete_user_dept_id_tuples = (
exists_user_dept_id_tuples - user_dept_id_tuples if not self.incremental else set()
)
if waiting_delete_user_dept_id_tuples:
waiting_delete_user_dept_relation_ids = [
exists_user_dept_relations_map[t] for t in waiting_delete_user_dept_id_tuples
]
Expand Down
6 changes: 3 additions & 3 deletions src/bk-user/bkuser/plugins/local/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
SheetColumnsNotMatch,
UserSheetNotExists,
)
from bkuser.plugins.local.utils import gen_code
from bkuser.plugins.local.utils import gen_dept_code
from bkuser.plugins.models import RawDataSourceDepartment, RawDataSourceUser


Expand Down Expand Up @@ -209,7 +209,7 @@ def _parse_departments(self):
organizations.add(cur_org.strip())

# 组织路径:本数据源部门 Code 映射表
org_code_map = {org: gen_code(org) for org in organizations}
org_code_map = {org: gen_dept_code(org) for org in organizations}
for org in organizations:
parent_org, __, dept_name = org.rpartition("/")
self.departments.append(
Expand All @@ -235,7 +235,7 @@ def _parse_users(self):
if organizations := properties.pop("organizations"):
for org in organizations.split(","):
if org := org.strip():
departments.append(gen_code(org))
departments.append(gen_dept_code(org))

if leader_names := properties.pop("leaders"):
for ld in leader_names.split(","):
Expand Down
2 changes: 1 addition & 1 deletion src/bk-user/bkuser/plugins/local/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from hashlib import sha256


def gen_code(org: str) -> str:
def gen_dept_code(org: str) -> str:
# 本地数据源数据没有提供部门 code 的方式,
# 因此使用 sha256 计算以避免冲突,也便于后续插入 DB 时进行比较
# 注意:本地数据源用户 code 就是 username,不需要额外计算 code
Expand Down
15 changes: 13 additions & 2 deletions src/bk-user/tests/apps/sync/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,17 @@ def raw_users() -> List[RawDataSourceUser]:
]


@pytest.fixture()
def random_raw_department() -> RawDataSourceDepartment:
"""生成随机部门"""
return RawDataSourceDepartment(
code=generate_random_string(),
name="dept_random",
parent="company",
extras={"region": "random"},
)


@pytest.fixture()
def random_raw_user() -> RawDataSourceUser:
"""生成随机用户"""
Expand All @@ -250,6 +261,6 @@ def random_raw_user() -> RawDataSourceUser:
"gender": "other",
"region": "britain",
},
leaders=[],
departments=[],
leaders=["zhangsan"],
departments=["company"],
)
64 changes: 52 additions & 12 deletions src/bk-user/tests/apps/sync/test_syncers.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@

class TestDataSourceDepartmentSyncer:
def test_initial(self, data_source_sync_task_ctx, bare_local_data_source, raw_departments):
DataSourceDepartmentSyncer(data_source_sync_task_ctx, bare_local_data_source, raw_departments).sync()
DataSourceDepartmentSyncer(
data_source_sync_task_ctx, bare_local_data_source, raw_departments, overwrite=True, incremental=False
).sync()

# 验证部门信息
departments = DataSourceDepartment.objects.filter(data_source=bare_local_data_source)
Expand All @@ -54,7 +56,9 @@ def test_update(self, data_source_sync_task_ctx, full_local_data_source):
RawDataSourceDepartment(code="dept_c", name="部门C", parent="company", extras={"region": "SH"}),
RawDataSourceDepartment(code="center_ca", name="中心CA", parent="dept_c", extras={"region": "CS"}),
]
DataSourceDepartmentSyncer(data_source_sync_task_ctx, full_local_data_source, raw_departments).sync()
DataSourceDepartmentSyncer(
data_source_sync_task_ctx, full_local_data_source, raw_departments, overwrite=True, incremental=False
).sync()

# 验证部门信息
departments = DataSourceDepartment.objects.filter(data_source=full_local_data_source)
Expand All @@ -69,9 +73,35 @@ def test_update(self, data_source_sync_task_ctx, full_local_data_source):
data_source=full_local_data_source
) == self._gen_parent_relations_from_raw_departments(raw_departments)

def test_update_with_incremental(self, data_source_sync_task_ctx, full_local_data_source, random_raw_department):
dept_relation_cnt_before_sync = DataSourceDepartmentRelation.objects.filter(
data_source=full_local_data_source
).count()
excepted_dept_codes = set(
DataSourceDepartment.objects.filter(data_source=full_local_data_source).values_list("code", flat=True)
)
excepted_dept_codes.add(random_raw_department.code)

DataSourceDepartmentSyncer(
data_source_sync_task_ctx,
full_local_data_source,
[random_raw_department],
overwrite=True,
incremental=True,
).sync()

depts = DataSourceDepartment.objects.filter(data_source=full_local_data_source)
assert set(depts.values_list("code", flat=True)) == excepted_dept_codes
# 随机部门只有一个父部门,所以应该会多一个关系
assert DataSourceDepartmentRelation.objects.filter(data_source=full_local_data_source).count() == (
dept_relation_cnt_before_sync + 1
)

def test_destroy(self, data_source_sync_task_ctx, full_local_data_source):
raw_departments: List[RawDataSourceDepartment] = []
DataSourceDepartmentSyncer(data_source_sync_task_ctx, full_local_data_source, raw_departments).sync()
DataSourceDepartmentSyncer(
data_source_sync_task_ctx, full_local_data_source, raw_departments, overwrite=True, incremental=False
).sync()

# 同步了空的数据,导致该数据源的所有部门,部门关系信息都被删除
assert not DataSourceDepartment.objects.filter(data_source=full_local_data_source).exists()
Expand Down Expand Up @@ -99,13 +129,11 @@ def test_initial(
raw_users,
):
# 先同步部门数据,再同步用户数据
DataSourceDepartmentSyncer(data_source_sync_task_ctx, bare_local_data_source, raw_departments).sync()
DataSourceDepartmentSyncer(
data_source_sync_task_ctx, bare_local_data_source, raw_departments, overwrite=True, incremental=False
).sync()
DataSourceUserSyncer(
data_source_sync_task_ctx,
bare_local_data_source,
raw_users,
overwrite=True,
incremental=False,
data_source_sync_task_ctx, bare_local_data_source, raw_users, overwrite=True, incremental=False
).sync()

# 验证用户信息
Expand Down Expand Up @@ -239,12 +267,17 @@ def test_update_without_overwrite(
assert zhangsan.extras == {}

def test_update_with_incremental(self, data_source_sync_task_ctx, full_local_data_source, random_raw_user):
dept_user_relation_cnt_before_sync = DataSourceDepartmentUserRelation.objects.filter(
data_source=full_local_data_source
).count()
user_leader_relation_cnt_before_sync = DataSourceUserLeaderRelation.objects.filter(
data_source=full_local_data_source
).count()
user_codes = set(
DataSourceUser.objects.filter(
data_source=full_local_data_source,
).values_list("code", flat=True)
DataSourceUser.objects.filter(data_source=full_local_data_source).values_list("code", flat=True)
)
user_codes.add(random_raw_user.code)

DataSourceUserSyncer(
data_source_sync_task_ctx,
full_local_data_source,
Expand All @@ -255,6 +288,13 @@ def test_update_with_incremental(self, data_source_sync_task_ctx, full_local_dat

users = DataSourceUser.objects.filter(data_source=full_local_data_source)
assert set(users.values_list("code", flat=True)) == user_codes
# 随机用户属于一个部门 & 拥有一个 leader,因此两种关系的数量应该都是 + 1
assert DataSourceDepartmentUserRelation.objects.filter(data_source=full_local_data_source).count() == (
dept_user_relation_cnt_before_sync + 1
)
assert DataSourceUserLeaderRelation.objects.filter(data_source=full_local_data_source).count() == (
user_leader_relation_cnt_before_sync + 1
)

def test_update_with_invalid_leader(self, data_source_sync_task_ctx, full_local_data_source, random_raw_user):
"""全量同步模式,要求用户的 leader 必须也在数据中,否则会有警告"""
Expand Down
10 changes: 10 additions & 0 deletions src/bk-user/tests/fixtures/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-用户管理(Bk-User) available.
Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
10 changes: 10 additions & 0 deletions src/bk-user/tests/plugins/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-用户管理(Bk-User) available.
Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
10 changes: 10 additions & 0 deletions src/bk-user/tests/plugins/general/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-用户管理(Bk-User) available.
Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
28 changes: 14 additions & 14 deletions src/bk-user/tests/plugins/local/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
UserSheetNotExists,
)
from bkuser.plugins.local.parser import LocalDataSourceDataParser
from bkuser.plugins.local.utils import gen_code
from bkuser.plugins.local.utils import gen_dept_code
from bkuser.plugins.models import RawDataSourceDepartment, RawDataSourceUser


Expand Down Expand Up @@ -116,18 +116,18 @@ def test_get_departments(self, logger, user_wk):
parser = LocalDataSourceDataParser(logger, user_wk)
parser.parse()

company_code = gen_code("公司")
dept_a_code = gen_code("公司/部门A")
dept_b_code = gen_code("公司/部门B")
dept_c_code = gen_code("公司/部门C")
center_aa_code = gen_code("公司/部门A/中心AA")
center_ab_code = gen_code("公司/部门A/中心AB")
group_aaa_code = gen_code("公司/部门A/中心AA/小组AAA")
group_aba_code = gen_code("公司/部门A/中心AB/小组ABA")
center_ba_code = gen_code("公司/部门B/中心BA")
group_baa_code = gen_code("公司/部门B/中心BA/小组BAA")
center_ca_code = gen_code("公司/部门C/中心CA")
group_caa_code = gen_code("公司/部门C/中心CA/小组CAA")
company_code = gen_dept_code("公司")
dept_a_code = gen_dept_code("公司/部门A")
dept_b_code = gen_dept_code("公司/部门B")
dept_c_code = gen_dept_code("公司/部门C")
center_aa_code = gen_dept_code("公司/部门A/中心AA")
center_ab_code = gen_dept_code("公司/部门A/中心AB")
group_aaa_code = gen_dept_code("公司/部门A/中心AA/小组AAA")
group_aba_code = gen_dept_code("公司/部门A/中心AB/小组ABA")
center_ba_code = gen_dept_code("公司/部门B/中心BA")
group_baa_code = gen_dept_code("公司/部门B/中心BA/小组BAA")
center_ca_code = gen_dept_code("公司/部门C/中心CA")
group_caa_code = gen_dept_code("公司/部门C/中心CA/小组CAA")

assert sorted(parser.get_departments(), key=lambda d: d.name) == [
RawDataSourceDepartment(code=center_aa_code, name="中心AA", parent=dept_a_code),
Expand All @@ -149,7 +149,7 @@ def test_get_users(self, logger, user_wk):
parser.parse()

def gen_depts(orgs: List[str]) -> List[str]:
return [gen_code(o) for o in orgs]
return [gen_dept_code(o) for o in orgs]

assert sorted(parser.get_users(), key=lambda u: u.properties["age"]) == [
RawDataSourceUser(
Expand Down
6 changes: 3 additions & 3 deletions src/bk-user/tests/plugins/local/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
specific language governing permissions and limitations under the License.
"""
import pytest
from bkuser.plugins.local.utils import gen_code
from bkuser.plugins.local.utils import gen_dept_code


@pytest.mark.parametrize(
Expand All @@ -22,7 +22,7 @@
("公司/部门A/中心AA/小组AAA ", "e75be6462a8ff8b9b843b3c2e419db455b4477023f98941508bc19cfa3982ec0"),
],
)
def test_gen_code(raw, excepted):
def test_gen_dept_code(raw, excepted):
# 重要:如果该单元测试挂了,说明修改了本地数据源部门的 Code 的生成规则
# 该行为会导致新同步的数据,无法与 DB 中的数据匹配上,将会触发数据重建!!!
assert gen_code(raw) == excepted
assert gen_dept_code(raw) == excepted

0 comments on commit 7298400

Please sign in to comment.