diff --git a/app/app/application/l7_flow_tracing.py b/app/app/application/l7_flow_tracing.py index bcff66a..e6faee6 100644 --- a/app/app/application/l7_flow_tracing.py +++ b/app/app/application/l7_flow_tracing.py @@ -44,10 +44,10 @@ "resp_tcp_seq", "start_time_us", "end_time_us", - "vtap_id", - "tap_port", - "tap_port_name", - "tap_port_type", + "agent_id", + "capture_nic", + "capture_nic_name", + "capture_nic_type", "resource_from_vtap", "syscall_trace_id_request", "syscall_trace_id_response", @@ -65,8 +65,8 @@ # 资源信息 "process_id_0", "process_id_1", - "tap_side", - "Enum(tap_side)", + "observation_point", + "Enum(observation_point)", "subnet_id_0", "subnet_0", "ip_0", @@ -93,8 +93,8 @@ "auto_service_type_1", "auto_service_id_1", "auto_service_1", - "tap_id", - "tap", + "capture_network_type_id", + "capture_network_type", # 指标信息 "response_status", "response_duration", @@ -348,12 +348,12 @@ async def trace_l7_flow(self, for index in range(len(dataframe_flowmetas.index)): req_tcp_seq = dataframe_flowmetas['req_tcp_seq'][index] resp_tcp_seq = dataframe_flowmetas['resp_tcp_seq'][index] - tap_side = dataframe_flowmetas['tap_side'][index] + observation_point = dataframe_flowmetas['observation_point'][index] if req_tcp_seq == 0 and resp_tcp_seq == 0: continue - if tap_side not in [ + if observation_point not in [ TAP_SIDE_CLIENT_PROCESS, TAP_SIDE_SERVER_PROCESS - ] and tap_side not in const.TAP_SIDE_RANKS: + ] and observation_point not in const.TAP_SIDE_RANKS: continue new_network_metas.add(( dataframe_flowmetas['_id'][index], @@ -402,12 +402,12 @@ async def trace_l7_flow(self, if syscall_trace_id_request > 0 or syscall_trace_id_response > 0: new_syscall_metas.add(( dataframe_flowmetas['_id'][index], - dataframe_flowmetas['vtap_id'][index], + dataframe_flowmetas['agent_id'][index], dataframe_flowmetas['syscall_trace_id_request'] [index], dataframe_flowmetas['syscall_trace_id_response'] [index], - dataframe_flowmetas['tap_side'][index], + dataframe_flowmetas['observation_point'][index], dataframe_flowmetas['start_time_us'][index], dataframe_flowmetas['end_time_us'][index], )) @@ -486,10 +486,10 @@ async def trace_l7_flow(self, id_to_related_tag = dict() for index in new_flows.index: _id = new_flows.at[index, '_id_str'] - vtap_id = new_flows.at[index, 'vtap_id'] + agent_id = new_flows.at[index, 'agent_id'] req_tcp_seq = new_flows.at[index, 'req_tcp_seq'] resp_tcp_seq = new_flows.at[index, 'resp_tcp_seq'] - tap_side = new_flows.at[index, 'tap_side'] + observation_point = new_flows.at[index, 'observation_point'] _type = new_flows.at[index, 'type'] start_time_us = new_flows.at[index, 'start_time_us'] end_time_us = new_flows.at[index, 'end_time_us'] @@ -505,10 +505,10 @@ async def trace_l7_flow(self, id_to_related_tag[_id] = { '_id': _id, - 'vtap_id': vtap_id, + 'agent_id': agent_id, 'req_tcp_seq': req_tcp_seq, 'resp_tcp_seq': resp_tcp_seq, - 'tap_side': tap_side, + 'observation_point': observation_point, 'type': _type, 'start_time_us': start_time_us, 'end_time_us': end_time_us, @@ -630,7 +630,7 @@ async def query_flowmetas(self, time_filter: str, 通过tcp_seq及流日志的时间追踪 系统调用追踪信息: - vtap_id, syscall_trace_id_request, syscall_trace_id_response + agent_id, syscall_trace_id_request, syscall_trace_id_response 通过eBPF获取到的coroutine_trace_id追踪 主动注入的追踪信息: @@ -640,8 +640,8 @@ async def query_flowmetas(self, time_filter: str, """ sql = """SELECT type, req_tcp_seq, resp_tcp_seq, toUnixTimestamp64Micro(start_time) AS start_time_us, toUnixTimestamp64Micro(end_time) AS end_time_us, - vtap_id, syscall_trace_id_request, syscall_trace_id_response, span_id, parent_span_id, l7_protocol, - trace_id, x_request_id_0, x_request_id_1, toString(_id) AS `_id_str`, tap_side, auto_instance_0, auto_instance_1 + agent_id, syscall_trace_id_request, syscall_trace_id_response, span_id, parent_span_id, l7_protocol, + trace_id, x_request_id_0, x_request_id_1, toString(_id) AS `_id_str`, observation_point, auto_instance_0, auto_instance_1 FROM `l7_flow_log` WHERE (({time_filter}) AND ({base_filter})) limit {l7_tracing_limit} """.format(time_filter=time_filter, @@ -708,9 +708,9 @@ def set_all_relate(dataframe_flowmetas, related_map, network_delay_us): for index in dataframe_flowmetas.index: req_tcp_seq = dataframe_flowmetas.at[index, 'req_tcp_seq'] resp_tcp_seq = dataframe_flowmetas.at[index, 'resp_tcp_seq'] - tap_side = dataframe_flowmetas.at[index, 'tap_side'] + observation_point = dataframe_flowmetas.at[index, 'observation_point'] _id = dataframe_flowmetas.at[index, '_id'] - vtap_id = dataframe_flowmetas.at[index, 'vtap_id'] + agent_id = dataframe_flowmetas.at[index, 'agent_id'] _type = dataframe_flowmetas.at[index, 'type'] start_time_us = dataframe_flowmetas.at[index, 'start_time_us'] end_time_us = dataframe_flowmetas.at[index, 'end_time_us'] @@ -726,10 +726,10 @@ def set_all_relate(dataframe_flowmetas, related_map, network_delay_us): id_to_related_tag[_id] = { '_id': _id, - 'vtap_id': vtap_id, + 'agent_id': agent_id, 'req_tcp_seq': req_tcp_seq, 'resp_tcp_seq': resp_tcp_seq, - 'tap_side': tap_side, + 'observation_point': observation_point, 'type': _type, 'start_time_us': start_time_us, 'end_time_us': end_time_us, @@ -743,8 +743,8 @@ def set_all_relate(dataframe_flowmetas, related_map, network_delay_us): if req_tcp_seq == 0 and resp_tcp_seq == 0: continue - if tap_side not in [TAP_SIDE_CLIENT_PROCESS, TAP_SIDE_SERVER_PROCESS - ] and tap_side not in const.TAP_SIDE_RANKS: + if observation_point not in [TAP_SIDE_CLIENT_PROCESS, TAP_SIDE_SERVER_PROCESS + ] and observation_point not in const.TAP_SIDE_RANKS: continue new_network_metas.add((_id, _type, req_tcp_seq, resp_tcp_seq, start_time_us, end_time_us, span_id)) @@ -760,9 +760,9 @@ def set_all_relate(dataframe_flowmetas, related_map, network_delay_us): syscall_trace_id_response = dataframe_flowmetas.at[ index, 'syscall_trace_id_response'] _id = dataframe_flowmetas.at[index, '_id'] - vtap_id = dataframe_flowmetas.at[index, 'vtap_id'] + agent_id = dataframe_flowmetas.at[index, 'agent_id'] if syscall_trace_id_request > 0 or syscall_trace_id_response > 0: - new_syscall_metas.add((_id, vtap_id, syscall_trace_id_request, + new_syscall_metas.add((_id, agent_id, syscall_trace_id_request, syscall_trace_id_response)) if syscall_trace_id_request: syscall_req_to_ids[syscall_trace_id_request].add(_id) @@ -783,15 +783,15 @@ def set_all_relate(dataframe_flowmetas, related_map, network_delay_us): for index in dataframe_flowmetas.index: span_id = dataframe_flowmetas.at[index, 'span_id'] parent_span_id = dataframe_flowmetas.at[index, 'parent_span_id'] - tap_side = dataframe_flowmetas.at[index, 'tap_side'] + observation_point = dataframe_flowmetas.at[index, 'observation_point'] _id = dataframe_flowmetas.at[index, '_id'] - if tap_side not in [ + if observation_point not in [ TAP_SIDE_CLIENT_PROCESS, TAP_SIDE_SERVER_PROCESS, TAP_SIDE_CLIENT_APP, TAP_SIDE_SERVER_APP, TAP_SIDE_APP ] or not span_id: continue if span_id or parent_span_id: - new_app_metas.add((_id, tap_side, span_id, parent_span_id)) + new_app_metas.add((_id, observation_point, span_id, parent_span_id)) if span_id: span_id_to_ids[span_id].add(_id) if parent_span_id: @@ -932,17 +932,17 @@ def set_relate(self, _ids, related_map, id_to_related_tag): class L7SyscallMeta: """ 系统调用追踪信息: - vtap_id, syscall_trace_id_request, syscall_trace_id_response, tap_side, start_time_us, end_time_us + agent_id, syscall_trace_id_request, syscall_trace_id_response, observation_point, start_time_us, end_time_us """ def __init__(self, flow_metas: Tuple): self._id = flow_metas[0] - self.vtap_id = flow_metas[1] + self.agent_id = flow_metas[1] self.syscall_trace_id_request = flow_metas[2] self.syscall_trace_id_response = flow_metas[3] def __eq__(self, rhs): - return (self.vtap_id == rhs.vtap_id and self.syscall_trace_id_request + return (self.agent_id == rhs.agent_id and self.syscall_trace_id_request == rhs.syscall_trace_id_request and self.syscall_trace_id_response == rhs.syscall_trace_id_response) @@ -950,12 +950,12 @@ def __eq__(self, rhs): def set_relate(self, _ids, related_map, id_to_related_tag): for _id in _ids: _id_df = id_to_related_tag[_id]['_id'] - vtap_id_df = id_to_related_tag[_id]['vtap_id'] + agent_id_df = id_to_related_tag[_id]['agent_id'] syscall_trace_id_request_df = id_to_related_tag[_id][ 'syscall_trace_id_request'] syscall_trace_id_response_df = id_to_related_tag[_id][ 'syscall_trace_id_response'] - if _id_df == self._id or self.vtap_id != vtap_id_df: + if _id_df == self._id or self.agent_id != agent_id_df: continue if self.syscall_trace_id_request > 0: if self.syscall_trace_id_request == syscall_trace_id_request_df or self.syscall_trace_id_request == syscall_trace_id_response_df: @@ -977,12 +977,12 @@ class L7AppMeta: def __init__(self, flow_metas: Tuple): self._id = flow_metas[0] - self.tap_side = flow_metas[1] + self.observation_point = flow_metas[1] self.span_id = flow_metas[2] self.parent_span_id = flow_metas[3] def __eq__(self, rhs): - return (self.tap_side == rhs.tap_side and self.span_id == rhs.span_id + return (self.observation_point == rhs.observation_point and self.span_id == rhs.span_id and self.parent_span_id == rhs.parent_span_id) def set_relate(self, _ids, related_map, id_to_related_tag): @@ -1070,7 +1070,7 @@ def add_flow(self, flow, network_delay_us): if not self.span_id and flow["span_id"]: self.span_id = flow["span_id"] self.flows.append(flow) - if flow["tap_side"] in [ + if flow["observation_point"] in [ TAP_SIDE_SERVER_PROCESS, TAP_SIDE_CLIENT_PROCESS ]: self.has_syscall = True @@ -1096,8 +1096,8 @@ def sort_and_set_parent(self): class Service: - def __init__(self, vtap_id: int, process_id: int): - self.vtap_id = vtap_id + def __init__(self, agent_id: int, process_id: int): + self.agent_id = agent_id self.process_id = process_id self.direct_flows = [] @@ -1119,7 +1119,7 @@ def parent_set(self): self.app_flow_of_direct_flows, key=lambda x: x.get("start_time_us")) # 有s-p - if self.direct_flows[0]['tap_side'] == TAP_SIDE_SERVER_PROCESS: + if self.direct_flows[0]['observation_point'] == TAP_SIDE_SERVER_PROCESS: for i, direct_flow in enumerate(self.direct_flows[1:]): if not direct_flow.get('parent_id'): if direct_flow.get('parent_app_flow', None): @@ -1134,7 +1134,7 @@ def parent_set(self): self.app_flow_of_direct_flows[-1], "c-p mounted on latest app_flow") else: - # 3. 存在syscalltraceid相同且tap_side=s的系统span,该系统span的parent设置为该flow(syscalltraceid相同且tap_side=s) + # 3. 存在syscalltraceid相同且observation_point=s的系统span,该系统span的parent设置为该flow(syscalltraceid相同且observation_point=s) _set_parent(direct_flow, self.direct_flows[0], "c-p mounted on s-p") if self.direct_flows[0].get('parent_id', -1) < 0: @@ -1154,7 +1154,7 @@ def parent_set(self): def check_client_process_flow(self, flow: dict): """检查该flow是否与service有关联关系,s-p的时间范围需要覆盖c-p,否则拆分为两个service""" if self.process_id != flow["process_id_0"] \ - or self.vtap_id != flow["vtap_id"]: + or self.agent_id != flow["agent_id"]: return False if self.start_time_us > flow["start_time_us"] \ or self.end_time_us < flow["end_time_us"]: @@ -1164,10 +1164,10 @@ def check_client_process_flow(self, flow: dict): def add_direct_flow(self, flow: dict): """direct_flow是指该服务直接接收到的,或直接发出的flow""" #assert ( - # self.vtap_id == flow.get('vtap_id') + # self.agent_id == flow.get('agent_id') # and self.process_id == flow.get('process_id') #) - if flow['tap_side'] == TAP_SIDE_SERVER_PROCESS: + if flow['observation_point'] == TAP_SIDE_SERVER_PROCESS: self.start_time_us = flow["start_time_us"] self.end_time_us = flow["end_time_us"] for key in [ @@ -1179,7 +1179,7 @@ def add_direct_flow(self, flow: dict): 'auto_service_type', 'process_kname', ]: - if flow['tap_side'] == TAP_SIDE_CLIENT_PROCESS: + if flow['observation_point'] == TAP_SIDE_CLIENT_PROCESS: direction_key = key + "_0" else: direction_key = key + "_1" @@ -1196,7 +1196,7 @@ def add_direct_flow(self, flow: dict): self.direct_flows.append(flow) def attach_app_flow(self, flow: dict): - if flow["tap_side"] not in [ + if flow["observation_point"] not in [ TAP_SIDE_CLIENT_APP, TAP_SIDE_SERVER_APP, TAP_SIDE_APP ]: return @@ -1206,7 +1206,7 @@ def attach_app_flow(self, flow: dict): if direct_flow["span_id"] == flow["span_id"]: direct_flow["parent_app_flow"] = flow # 只有c-p和x-app的span_id相同时,属于同一个service - if direct_flow['tap_side'] == TAP_SIDE_CLIENT_PROCESS: + if direct_flow['observation_point'] == TAP_SIDE_CLIENT_PROCESS: flow["service"] = self self.app_flow_of_direct_flows.append(flow) return True @@ -1214,7 +1214,7 @@ def attach_app_flow(self, flow: dict): if flow['parent_span_id'] and self.direct_flows[0]['span_id'] and flow[ 'parent_span_id'] == self.direct_flows[0][ 'span_id'] and self.direct_flows[0][ - 'tap_side'] == TAP_SIDE_SERVER_PROCESS: + 'observation_point'] == TAP_SIDE_SERVER_PROCESS: # x-app的parent是c-p时,一定不属于同一个service for client_process_flow in self.direct_flows[1:]: if flow['parent_span_id'] == client_process_flow['span_id']: @@ -1244,15 +1244,15 @@ def merge_flow(flows: list, flow: dict) -> bool: Merge condition: Session cap_seq_1 == Response cap_seq_1 + 1 """ if flow['type'] == L7_FLOW_TYPE_SESSION \ - and flow['tap_side'] not in [TAP_SIDE_SERVER_PROCESS, TAP_SIDE_CLIENT_PROCESS]: + and flow['observation_point'] not in [TAP_SIDE_SERVER_PROCESS, TAP_SIDE_CLIENT_PROCESS]: return False - # vtap_id, l7_protocol, flow_id, request_id + # agent_id, l7_protocol, flow_id, request_id for i in range(len(flows)): if flow['_id'] == flows[i]['_id']: continue if flow['flow_id'] != flows[i]['flow_id']: continue - if flows[i]['tap_side'] not in [ + if flows[i]['observation_point'] not in [ TAP_SIDE_SERVER_PROCESS, TAP_SIDE_CLIENT_PROCESS ]: if flows[i]['type'] == L7_FLOW_TYPE_SESSION: @@ -1281,8 +1281,8 @@ def merge_flow(flows: list, flow: dict) -> bool: if not request_flow or not response_flow: continue for key in [ - 'vtap_id', 'tap_port', 'tap_port_type', 'l7_protocol', - 'request_id', 'tap_side' + 'agent_id', 'capture_nic', 'capture_nic_type', 'l7_protocol', + 'request_id', 'observation_point' ]: if _get_df_key(request_flow, key) != _get_df_key( response_flow, key): @@ -1291,7 +1291,7 @@ def merge_flow(flows: list, flow: dict) -> bool: # 请求的时间必须比响应的时间小 if request_flow['start_time_us'] > response_flow['end_time_us']: equal = False - if request_flow['tap_side'] in [ + if request_flow['observation_point'] in [ TAP_SIDE_SERVER_PROCESS, TAP_SIDE_CLIENT_PROCESS ]: # 应用span syscall_cap_seq判断合并 @@ -1395,7 +1395,7 @@ def sort_all_flows(dataframe_flows: DataFrame, network_delay_us: int, flowcount = len(flows) for i, flow in enumerate(reversed(flows)): # 单向的c-p和s-p进行第二轮merge - if len(flow['_id']) > 1 or flow['tap_side'] not in [ + if len(flow['_id']) > 1 or flow['observation_point'] not in [ TAP_SIDE_SERVER_PROCESS, TAP_SIDE_CLIENT_PROCESS ]: continue @@ -1408,13 +1408,13 @@ def sort_all_flows(dataframe_flows: DataFrame, network_delay_us: int, for _id in flow['_id']: id_map[str(_id)] = flow['_uid'] flow['duration'] = flow['end_time_us'] - flow['start_time_us'] - if flow['tap_side'] in [ + if flow['observation_point'] in [ TAP_SIDE_SERVER_PROCESS, TAP_SIDE_CLIENT_PROCESS ]: syscall_flows.append(flow) - elif flow['tap_side'] in const.TAP_SIDE_RANKS: + elif flow['observation_point'] in const.TAP_SIDE_RANKS: network_flows.append(flow) - elif flow['tap_side'] in [ + elif flow['observation_point'] in [ TAP_SIDE_CLIENT_APP, TAP_SIDE_SERVER_APP, TAP_SIDE_APP ]: app_flows.append(flow) @@ -1428,37 +1428,37 @@ def sort_all_flows(dataframe_flows: DataFrame, network_delay_us: int, f"{id_map[_id]}-{','.join(related_types)}-{_id}") flow["related_ids"] = list(related_ids) - # 从Flow中提取Service:一个二元组认为是一个Service。 + # 从Flow中提取Service:一个二元组认为是一个Service。 service_map = defaultdict(Service) for flow in syscall_flows: - if flow['tap_side'] != TAP_SIDE_SERVER_PROCESS: + if flow['observation_point'] != TAP_SIDE_SERVER_PROCESS: continue local_process_id = flow['process_id_1'] - vtap_id = flow['vtap_id'] - if (vtap_id, local_process_id, 0) not in service_map: - service = Service(vtap_id, local_process_id) - service_map[(vtap_id, local_process_id, 0)] = service + agent_id = flow['agent_id'] + if (agent_id, local_process_id, 0) not in service_map: + service = Service(agent_id, local_process_id) + service_map[(agent_id, local_process_id, 0)] = service # Service直接接收或发送的Flows_ service.add_direct_flow(flow) else: index = 0 for key in service_map.keys(): - if key[0] == vtap_id and key[1] == local_process_id: + if key[0] == agent_id and key[1] == local_process_id: index += 1 - service = Service(vtap_id, local_process_id) - service_map[(vtap_id, local_process_id, index)] = service + service = Service(agent_id, local_process_id) + service_map[(agent_id, local_process_id, index)] = service service.add_direct_flow(flow) for flow in syscall_flows: - if flow['tap_side'] != TAP_SIDE_CLIENT_PROCESS: + if flow['observation_point'] != TAP_SIDE_CLIENT_PROCESS: continue local_process_id = flow['process_id_0'] - vtap_id = flow['vtap_id'] + agent_id = flow['agent_id'] index = 0 max_start_time_service = None - if (vtap_id, local_process_id, 0) in service_map: + if (agent_id, local_process_id, 0) in service_map: for key, service in service_map.items(): - if key[0] == vtap_id and key[1] == local_process_id: + if key[0] == agent_id and key[1] == local_process_id: index += 1 if service.check_client_process_flow(flow): if not max_start_time_service: @@ -1470,8 +1470,8 @@ def sort_all_flows(dataframe_flows: DataFrame, network_delay_us: int, max_start_time_service.add_direct_flow(flow) continue # 没有attach到service上的flow生成一个新的service - service = Service(vtap_id, local_process_id) - service_map[(vtap_id, local_process_id, index)] = service + service = Service(agent_id, local_process_id) + service_map[(agent_id, local_process_id, index)] = service # Service直接接收或发送的Flow service.add_direct_flow(flow) @@ -1507,7 +1507,7 @@ def sort_all_flows(dataframe_flows: DataFrame, network_delay_us: int, ## 排序 ### 网络span排序 - # 1.网络span按照tap_side_rank或response_duration进行排序,系统span始终在网络span的两头 + # 1.网络span按照observation_point_rank或response_duration进行排序,系统span始终在网络span的两头 for network in networks: network.sort_and_set_parent() # 2. 存在span_id相同的应用span,将该网络span的parent设置为该span_id相同的应用span @@ -1600,7 +1600,7 @@ def app_flow_sort(array): if flow_0.get("service"): # 4. 若有所属service,将该应用span的parent设置为该service的s-p的flow if flow_0["service"].direct_flows[0][ - "tap_side"] == TAP_SIDE_SERVER_PROCESS: + "observation_point"] == TAP_SIDE_SERVER_PROCESS: _set_parent(flow_0, flow_0["service"].direct_flows[0], "app_flow mouted on s-p in service") continue @@ -1609,7 +1609,7 @@ def app_flow_sort(array): def service_sort(services, app_flows): app_flows_map = {app_flow["span_id"]: app_flow for app_flow in app_flows} for i in range(len(services)): - if services[i].direct_flows[0]['tap_side'] == TAP_SIDE_SERVER_PROCESS: + if services[i].direct_flows[0]['observation_point'] == TAP_SIDE_SERVER_PROCESS: # 1. 存在span_id相同的应用span,将该系统span的parent设置为该span_id相同的应用span if services[i].direct_flows[0].get("parent_app_flow"): if services[i].direct_flows[0].get("networks") and \ @@ -1661,7 +1661,7 @@ def format_trace(services: list, networks: list, app_flows: list) -> dict: ) if not flow.get('span_id') or len(str( flow['span_id'])) < 16 else flow['span_id'] id_map[flow[ - '_uid']] = f"{direct_flow_span_id}.{flow['tap_side']}.{flow['_uid']}" + '_uid']] = f"{direct_flow_span_id}.{flow['observation_point']}.{flow['_uid']}" if flow['_uid'] not in tracing: response["tracing"].append(_get_flow_dict(flow)) tracing.add(flow['_uid']) @@ -1674,12 +1674,12 @@ def format_trace(services: list, networks: list, app_flows: list) -> dict: for network in networks: for flow in network.flows: - if flow["tap_side"] in [ + if flow["observation_point"] in [ TAP_SIDE_SERVER_PROCESS, TAP_SIDE_CLIENT_PROCESS ]: continue id_map[flow[ - '_uid']] = f"{network.span_id}.{flow['tap_side']}.{flow['_uid']}" + '_uid']] = f"{network.span_id}.{flow['observation_point']}.{flow['_uid']}" if flow['_uid'] not in tracing: response["tracing"].append(_get_flow_dict(flow)) tracing.add(flow['_uid']) @@ -1924,10 +1924,10 @@ def _get_flow_dict(flow: DataFrame): flow["end_time_us"] - flow["start_time_us"], "selftime": flow["duration"], - "tap_side": - flow["tap_side"], - "Enum(tap_side)": - flow.get("Enum(tap_side)"), + "observation_point": + flow["observation_point"], + "Enum(observation_point)": + flow.get("Enum(observation_point)"), "l7_protocol": flow["l7_protocol"], "l7_protocol_str": @@ -1976,8 +1976,8 @@ def _get_flow_dict(flow: DataFrame): flow.get("childs", []), "process_id": flow.get("process_id", None), - "vtap_id": - flow.get("vtap_id", None), + "agent_id": + flow.get("agent_id", None), "service_uid": flow.get("service_uid", None), "service_uname": @@ -1986,24 +1986,24 @@ def _get_flow_dict(flow: DataFrame): flow.get("app_service", None), "app_instance": flow.get("app_instance", None), - "tap_port": - flow["tap_port"], - "tap_port_name": - flow["tap_port_name"], + "capture_nic": + flow["capture_nic"], + "capture_nic_name": + flow["capture_nic_name"], "resource_from_vtap": flow["resource_from_vtap"][2] if flow["resource_from_vtap"][0] else None, "set_parent_info": flow.get("set_parent_info"), "auto_instance": - flow["auto_instance_0"] if flow["tap_side"][0] == 'c' - and flow["tap_side"] != "app" else flow["auto_instance_1"], - "tap_id": - flow.get("tap_id", None), - "tap": - flow.get("tap", None) + flow["auto_instance_0"] if flow["observation_point"][0] == 'c' + and flow["observation_point"] != "app" else flow["auto_instance_1"], + "capture_network_type_id": + flow.get("capture_network_type_id", None), + "capture_network_type": + flow.get("capture_network_type", None) } - if flow["tap_side"] in [TAP_SIDE_SERVER_PROCESS, TAP_SIDE_CLIENT_PROCESS]: + if flow["observation_point"] in [TAP_SIDE_SERVER_PROCESS, TAP_SIDE_CLIENT_PROCESS]: flow_dict["subnet"] = flow.get("subnet") flow_dict["ip"] = flow.get("ip") flow_dict["auto_service"] = flow.get("auto_service") @@ -2037,23 +2037,23 @@ def network_flow_sort(traces): 对网络span进行排序,排序规则: 1. 按照TAP_SIDE_RANKS进行排序 2. 对Local和rest就近(比较采集器)排到其他位置附近(按时间排) - 3. 网络 Span 中如 tap_side = local 或 rest 或 xx_gw 或者 tap!= 虚拟网络,则取消 tap_side 排序逻辑,改为响应时延长度倒排,TAP_SIDE_RANKS正排 + 3. 网络 Span 中如 observation_point = local 或 rest 或 xx_gw 或者 capture_network_type!= 虚拟网络,则取消 observation_point 排序逻辑,改为响应时延长度倒排,TAP_SIDE_RANKS正排 """ local_rest_traces = [] sorted_traces = [] sys_traces = [] response_duration_sort = False for trace in traces: - if trace['tap_side'] in [ + if trace['observation_point'] in [ const.TAP_SIDE_LOCAL, const.TAP_SIDE_REST, const.TAP_SIDE_CLIENT_GATEWAY, const.TAP_SIDE_SERVER_GATEWAY, const.TAP_SIDE_CLIENT_GATEWAY_HAPERVISOR, const.TAP_SIDE_SERVER_GATEWAY_HAPERVISOR - ] or trace['tap'] != "虚拟网络": + ] or trace['capture_network_type'] != "虚拟网络": response_duration_sort = True - if trace['tap_side'] in [const.TAP_SIDE_LOCAL, const.TAP_SIDE_REST]: + if trace['observation_point'] in [const.TAP_SIDE_LOCAL, const.TAP_SIDE_REST]: local_rest_traces.append(trace) - elif trace['tap_side'] in [ + elif trace['observation_point'] in [ const.TAP_SIDE_CLIENT_PROCESS, const.TAP_SIDE_SERVER_PROCESS ]: sys_traces.append(trace) @@ -2063,33 +2063,33 @@ def network_flow_sort(traces): sorted_traces = sorted( sorted_traces + local_rest_traces, key=lambda x: - (-x['response_duration'], const.TAP_SIDE_RANKS.get(x['tap_side']), - x['tap_side'])) + (-x['response_duration'], const.TAP_SIDE_RANKS.get(x['observation_point']), + x['observation_point'])) for sys_trace in sys_traces: - if sys_trace['tap_side'] == const.TAP_SIDE_CLIENT_PROCESS: + if sys_trace['observation_point'] == const.TAP_SIDE_CLIENT_PROCESS: sorted_traces.insert(0, sys_trace) else: sorted_traces.append(sys_trace) return sorted_traces sorted_traces = sorted( sorted_traces + sys_traces, - key=lambda x: (const.TAP_SIDE_RANKS.get(x['tap_side']), x['tap_side'])) + key=lambda x: (const.TAP_SIDE_RANKS.get(x['observation_point']), x['observation_point'])) if not sorted_traces: sorted_traces += local_rest_traces else: for trace in local_rest_traces: - vtap_index = -1 + agent_index = -1 for i, sorted_trace in enumerate(sorted_traces): - if vtap_index > 0 and sorted_trace['vtap_id'] != trace[ - 'vtap_id']: + if agent_index > 0 and sorted_trace['agent_id'] != trace[ + 'agent_id']: break - if sorted_trace['vtap_id'] == trace['vtap_id']: + if sorted_trace['agent_id'] == trace['agent_id']: if sorted_trace['start_time_us'] < trace['start_time_us']: - vtap_index = i + 1 - elif vtap_index == -1: - vtap_index = i - if vtap_index >= 0: - sorted_traces.insert(vtap_index, trace) + agent_index = i + 1 + elif agent_index == -1: + agent_index = i + if agent_index >= 0: + sorted_traces.insert(agent_index, trace) else: for i, sorted_trace in enumerate(sorted_traces): if trace['start_time_us'] < sorted_trace['start_time_us']: diff --git a/app/app/application/tracing_completion.py b/app/app/application/tracing_completion.py index eab58b1..c71a9dc 100644 --- a/app/app/application/tracing_completion.py +++ b/app/app/application/tracing_completion.py @@ -240,7 +240,7 @@ async def trace_l7_flow(self, if syscall_trace_id_request > 0 or syscall_trace_id_response > 0: new_syscall_metas.add(( dataframe_flowmetas['_id'][index], - dataframe_flowmetas['vtap_id'][index], + dataframe_flowmetas['agent_id'][index], dataframe_flowmetas['syscall_trace_id_request'] [index], dataframe_flowmetas['syscall_trace_id_response'] @@ -323,7 +323,7 @@ async def trace_l7_flow(self, id_to_related_tag = dict() for index in new_flows.index: _id = new_flows.at[index, '_id_str'] - vtap_id = new_flows.at[index, 'vtap_id'] + agent_id = new_flows.at[index, 'agent_id'] req_tcp_seq = new_flows.at[index, 'req_tcp_seq'] resp_tcp_seq = new_flows.at[index, 'resp_tcp_seq'] tap_side = new_flows.at[index, 'tap_side'] @@ -342,7 +342,7 @@ async def trace_l7_flow(self, id_to_related_tag[_id] = { '_id': _id, - 'vtap_id': vtap_id, + 'agent_id': agent_id, 'req_tcp_seq': req_tcp_seq, 'resp_tcp_seq': resp_tcp_seq, 'tap_side': tap_side,