Skip to content

Commit

Permalink
[app] fix generic type in python3.8
Browse files Browse the repository at this point in the history
  • Loading branch information
taloric authored and sharang committed May 29, 2024
1 parent 7e69ea1 commit 7204698
Showing 1 changed file with 48 additions and 47 deletions.
95 changes: 48 additions & 47 deletions app/app/application/l7_flow_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import uuid
import pandas as pd
from log import logger
from typing import List, Dict, Set

from ast import Tuple
from pandas import DataFrame
Expand Down Expand Up @@ -1122,7 +1123,7 @@ def __init__(self):
# 标识 span_id 方便匹配 app-span
self.span_id = None
# 分组聚合所有 tcp_seq 相同的 flow
self.spans: list[SpanNode] = []
self.spans: List[SpanNode] = []
self.id = uuid.uuid1().hex

def __eq__(self, other: 'NetworkSpanSet') -> bool:
Expand Down Expand Up @@ -1336,13 +1337,13 @@ def __init__(self, group_key: str):
# 当以 sys_span 构建 process_span_set 时,group_key=auto_instance+index(index 标记同进程 s-p 出现的次数)
self.group_key = group_key
# 所有 spans
self.spans: list[SpanNode] = []
self.spans: List[SpanNode] = []
# 用于存放 `app_span` 的所有 root
self.app_span_roots: list[SpanNode] = None
self.app_span_roots: List[SpanNode] = None
# 用于存放 `app_span` 的所有 leaf
self.app_span_leafs: list[SpanNode] = None
self.leaf_syscall_trace_id_request: set[int] = set[int]()
self.leaf_syscall_trace_id_response: set[int] = set[int]()
self.app_span_leafs: List[SpanNode] = None
self.leaf_syscall_trace_id_request: Set[int] = Set[int]()
self.leaf_syscall_trace_id_response: Set[int] = Set[int]()
# 用于显示调用拓扑使用
self.subnet_id = None
self.subnet = None
Expand Down Expand Up @@ -1456,19 +1457,19 @@ def append_sys_span(self, sys_span: SysSpanNode):
self.leaf_syscall_trace_id_response.add(
sys_span.get_syscall_trace_id_response())

def get_roots(self) -> list[SpanNode]:
def get_roots(self) -> List[SpanNode]:
return [span for span in self.spans if span.parent is None]

def get_leafs(self) -> list[SpanNode]:
has_child: set[int] = set()
def get_leafs(self) -> List[SpanNode]:
has_child: Set[int] = set()
for span in self.spans:
if span.parent:
has_child.add(span.parent)
leafs = [span for span in self.spans if span not in has_child]
return leafs

def _build_app_span_tree(self):
span_id_to_index: dict[str, int] = {}
span_id_to_index: Dict[str, int] = {}
for i in range(len(self.spans)):
if self.spans[i].signal_source != L7_FLOW_SIGNAL_SOURCE_OTEL:
continue
Expand All @@ -1485,13 +1486,13 @@ def _build_app_span_tree(self):
self.spans[parent_span_index],
"app_span mounted due to parent_span_id")

# return: list[ProcessSpanSet]
# return: List[ProcessSpanSet]
def split_to_multiple_process_span_set(self) -> list:
# 先构建树、app-span 内部的父子关系,确认 app-span 的结构
self._build_app_span_tree()

# 实际上 parent_id 是 flow_index,先构建一个 flow_index 到 span_index 的映射
flow_index_to_span_index: dict[int, int] = {}
flow_index_to_span_index: Dict[int, int] = {}
max_flow_index = 0
for i in range(len(self.spans)):
flow_index = self.spans[i].get_flow_index()
Expand All @@ -1510,7 +1511,7 @@ def split_to_multiple_process_span_set(self) -> list:
disjoint_set.get(i)

# root_parent_span_id => ProcessSpanSet
split_result: dict[str, ProcessSpanSet] = {}
split_result: Dict[str, ProcessSpanSet] = {}
for i in range(len(self.spans)):
root_span_index = disjoint_set.get(i)
root_parent_span_id = self.spans[
Expand Down Expand Up @@ -1886,10 +1887,10 @@ def sort_all_flows(dataframe_flows: DataFrame, network_delay_us: int,
for flow in flows:
flow_index_to_id0[flow['_index']] = flow['_id'][0]

network_spans: list[NetworkSpanNode] = []
app_spans: list[AppSpanNode] = []
server_sys_spans: list[SysSpanNode] = []
client_sys_spans: list[SysSpanNode] = []
network_spans: List[NetworkSpanNode] = []
app_spans: List[AppSpanNode] = []
server_sys_spans: List[SysSpanNode] = []
client_sys_spans: List[SysSpanNode] = []
# 对 flow 分类,而后分别做排序,方便做层级处理
# 对 network_flows: net-span 的排序按固定的顺序(TAP_SIDE_RANKS),然后根据 span_id 挂 app-span,根据 tcp_seq 挂 sys-span
# 对 app_flows: app-span 按固定的规则设置层级(span_id/parent_span_id),按 span_id 挂 sys-span 以及挂到 sys-span 构建的 <service> 上
Expand Down Expand Up @@ -1921,8 +1922,8 @@ def sort_all_flows(dataframe_flows: DataFrame, network_delay_us: int,
# 构建 Process Span Set
# 对 app_span 按 auto_instance_id/auto_instance 进行分组
# auto_instance => []
process_span_map: dict[str, list[ProcessSpanSet]] = defaultdict(
list[ProcessSpanSet])
process_span_map: Dict[str, List[ProcessSpanSet]] = defaultdict(
List[ProcessSpanSet])
process_span_map = _union_app_spans(process_span_map, app_spans)
process_span_map = _union_sys_spans(process_span_map, server_sys_spans,
client_sys_spans)
Expand Down Expand Up @@ -1973,8 +1974,8 @@ def sort_all_flows(dataframe_flows: DataFrame, network_delay_us: int,


def _union_app_spans(
process_span_map: dict[str, list[ProcessSpanSet]],
app_spans: list[AppSpanNode]) -> dict[str, list[ProcessSpanSet]]:
process_span_map: Dict[str, List[ProcessSpanSet]],
app_spans: List[AppSpanNode]) -> Dict[str, List[ProcessSpanSet]]:
for span in app_spans:
auto_instance = _get_auto_instance(span)
if auto_instance not in process_span_map:
Expand All @@ -1986,7 +1987,7 @@ def _union_app_spans(
# 如果这些 root 有同一个 parent_span_id: 说明只是还没关联 s-p 作为 parent,不需处理,后续逻辑会关联
# 如果这些 root 有不同的 parent_span_id: 说明这个服务被穿越了多次,要拆分为多个 ProcessSpanSet
for key, process_span_set_list in process_span_map.items():
split_process_span_set: list[ProcessSpanSet] = []
split_process_span_set: List[ProcessSpanSet] = []
for sp_span_pss in process_span_set_list:
split_result = sp_span_pss.split_to_multiple_process_span_set()
split_process_span_set.extend(split_result)
Expand All @@ -1995,14 +1996,14 @@ def _union_app_spans(


def _union_sys_spans(
process_span_map: dict[str, list[ProcessSpanSet]],
server_sys_spans: list[SysSpanNode],
client_sys_spans: list[SysSpanNode]
) -> dict[str, list[ProcessSpanSet]]:
process_span_map: Dict[str, List[ProcessSpanSet]],
server_sys_spans: List[SysSpanNode],
client_sys_spans: List[SysSpanNode]
) -> Dict[str, List[ProcessSpanSet]]:

# 先根据 syscall_trace_id_request 构建一个映射,方便查找
# syscall_trace_id_request => index
syscall_req_to_index: dict[int, int] = {}
syscall_req_to_index: Dict[int, int] = {}
for i in range(len(client_sys_spans)):
span = client_sys_spans[i]
if span.get_syscall_trace_id_request() > 0:
Expand All @@ -2022,7 +2023,7 @@ def _union_sys_spans(

# 构建一个 cp_infos 的关系,计算 syscall_trace_id_response 对应的所有有关联的 c-p 的索引
# root_index => { child_indexes }
cp_related_infos: dict[int, list[int]] = {}
cp_related_infos: Dict[int, List[int]] = {}
for i in range(len(client_sys_spans)):
root = cp_disjoint_set.get(i) # find root
if root != i:
Expand All @@ -2043,7 +2044,7 @@ def _union_sys_spans(

# 标记同一进程的 process 数量,同一个 process_span_set 内只允许存在最多一个 s-p
# auto_instance => same auto_instance span_set count
same_process_sp: dict[str, int] = dict.fromkeys(process_span_map.keys(), 1)
same_process_sp: Dict[str, int] = dict.fromkeys(process_span_map.keys(), 1)
for span in server_sys_spans:
if span.process_span_set is not None:
continue
Expand Down Expand Up @@ -2127,10 +2128,10 @@ def _union_sys_spans(


def _build_network_span_set(
united_spans: list[SpanNode],
united_spans: List[SpanNode],
related_flow_index_map: defaultdict(inner_defaultdict_set),
flow_index_to_span: list[SpanNode]) -> list[NetworkSpanSet]:
networks: list[NetworkSpanSet] = []
flow_index_to_span: List[SpanNode]) -> List[NetworkSpanSet]:
networks: List[NetworkSpanSet] = []

# 先构建一个 flow index to span 的映射
flow_aggregated = set() # set(flow._index)
Expand Down Expand Up @@ -2168,12 +2169,12 @@ def _same_span_set(lhs: SpanNode, rhs: SpanNode, spanset: str) -> bool:
return True


def _connect_process_and_networks(process_roots: list[SpanNode],
process_leafs: list[SpanNode],
network_roots: list[SpanNode],
network_leafs: list[SpanNode],
process_span_dict: dict[str, SpanNode],
flow_index_to_span: list[SpanNode]):
def _connect_process_and_networks(process_roots: List[SpanNode],
process_leafs: List[SpanNode],
network_roots: List[SpanNode],
network_leafs: List[SpanNode],
process_span_dict: Dict[str, SpanNode],
flow_index_to_span: List[SpanNode]):
# 1. process span set 的 leaf 作为 network span set root 的 parent
for ps_parent in process_leafs:
# 避免子循环多次访问字典
Expand Down Expand Up @@ -2278,7 +2279,7 @@ def _connect_process_and_networks(process_roots: list[SpanNode],

# 5. network span set 互相连接
# relations: child.x_request_id_0 == parent.x_request_id_1/child.span_id = parent.span_id
network_match_parent: dict[int, int] = {}
network_match_parent: Dict[int, int] = {}
for net_child in network_roots:
if net_child.get_parent_id() >= 0:
continue
Expand Down Expand Up @@ -2339,8 +2340,8 @@ def _connect_process_and_networks(process_roots: list[SpanNode],
"net_span mounted due to x_request_id or span_id passed")


def format_trace(services: list[ProcessSpanSet],
networks: list[NetworkSpanSet]) -> dict:
def format_trace(services: List[ProcessSpanSet],
networks: List[NetworkSpanSet]) -> dict:
"""
重新组织数据格式,并给 trace 排序
"""
Expand Down Expand Up @@ -2376,7 +2377,7 @@ def format_trace(services: list[ProcessSpanSet],


def format_selftime(traces: list, parent_trace: dict, child_ids: list,
uid_index_map: dict[int, int]):
uid_index_map: Dict[int, int]):
"""
计算每个服务的真实执行时间
这里要求按从上而下(父->子)的层级顺序来计算
Expand Down Expand Up @@ -2424,7 +2425,7 @@ def pruning_flows(_id, flows, network_delay_us):

# 记录所有 Trace Tree 的最小、最大时间和 trace_id 集合
# root_index => {min_start_time_us, max_end_time_us, set(trace_id)}
tree_infos: dict[int, dict] = {}
tree_infos: Dict[int, dict] = {}
root_of_initial_flow = -1
for flow in flows:
index = flow[_FLOW_INDEX_KEY]
Expand Down Expand Up @@ -2521,13 +2522,13 @@ def calculate_related_ids(
f"{_index}-{','.join(related_types)}-{_id}")


def merge_service(services: list[ProcessSpanSet], traces: list,
uid_to_trace_index: dict[int, int]) -> list:
def merge_service(services: List[ProcessSpanSet], traces: list,
uid_to_trace_index: Dict[int, int]) -> list:
"""
按 service 对 flow 分组并统计时延指标
"""
metrics_map = {}
services_from_process_span_set = set[ProcessSpanSet]()
services_from_process_span_set = Set[ProcessSpanSet]()
services_from_pruning_traces = set()
# 先获取剪枝后的所有 auto_service + app_service
for res in traces:
Expand Down Expand Up @@ -2592,7 +2593,7 @@ def merge_service(services: list[ProcessSpanSet], traces: list,


def format_final_result(
services: list[ProcessSpanSet], networks: list[NetworkSpanSet], _id,
services: List[ProcessSpanSet], networks: List[NetworkSpanSet], _id,
network_delay_us: int, flow_index_to_id0: list,
related_flow_index_map: defaultdict(inner_defaultdict_set)):
"""
Expand Down

0 comments on commit 7204698

Please sign in to comment.