Skip to content

Commit

Permalink
Vtap renamed to agent
Browse files Browse the repository at this point in the history
**Phenomenon and reproduction steps**

**Root cause and solution**

**Impactions**

**Test method**

**Affected branch(es)**

* main

**Checklist**

- [ ] Dependencies update required
- [ ] Common bug (similar problem in other repo)
  • Loading branch information
xiaochaoren1 committed Mar 12, 2024
1 parent 46c6536 commit dce2cc1
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 42 deletions.
78 changes: 39 additions & 39 deletions app/app/application/l7_flow_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
"resp_tcp_seq",
"start_time_us",
"end_time_us",
"vtap_id",
"agent_id",
"tap_port",
"tap_port_name",
"tap_port_type",
Expand Down Expand Up @@ -402,7 +402,7 @@ async def trace_l7_flow(self,
if syscall_trace_id_request > 0 or syscall_trace_id_response > 0:
new_syscall_metas.add((
dataframe_flowmetas['_id'][index],
dataframe_flowmetas['vtap_id'][index],
dataframe_flowmetas['agent_id'][index],
dataframe_flowmetas['syscall_trace_id_request']
[index],
dataframe_flowmetas['syscall_trace_id_response']
Expand Down Expand Up @@ -486,7 +486,7 @@ async def trace_l7_flow(self,
id_to_related_tag = dict()
for index in new_flows.index:
_id = new_flows.at[index, '_id_str']
vtap_id = new_flows.at[index, 'vtap_id']
agent_id = new_flows.at[index, 'agent_id']
req_tcp_seq = new_flows.at[index, 'req_tcp_seq']
resp_tcp_seq = new_flows.at[index, 'resp_tcp_seq']
tap_side = new_flows.at[index, 'tap_side']
Expand All @@ -505,7 +505,7 @@ async def trace_l7_flow(self,

id_to_related_tag[_id] = {
'_id': _id,
'vtap_id': vtap_id,
'agent_id': agent_id,
'req_tcp_seq': req_tcp_seq,
'resp_tcp_seq': resp_tcp_seq,
'tap_side': tap_side,
Expand Down Expand Up @@ -630,7 +630,7 @@ async def query_flowmetas(self, time_filter: str,
通过tcp_seq及流日志的时间追踪
系统调用追踪信息:
vtap_id, syscall_trace_id_request, syscall_trace_id_response
agent_id, syscall_trace_id_request, syscall_trace_id_response
通过eBPF获取到的coroutine_trace_id追踪
主动注入的追踪信息:
Expand All @@ -640,7 +640,7 @@ async def query_flowmetas(self, time_filter: str,
"""
sql = """SELECT
type, req_tcp_seq, resp_tcp_seq, toUnixTimestamp64Micro(start_time) AS start_time_us, toUnixTimestamp64Micro(end_time) AS end_time_us,
vtap_id, syscall_trace_id_request, syscall_trace_id_response, span_id, parent_span_id, l7_protocol,
agent_id, syscall_trace_id_request, syscall_trace_id_response, span_id, parent_span_id, l7_protocol,
trace_id, x_request_id_0, x_request_id_1, toString(_id) AS `_id_str`, tap_side, auto_instance_0, auto_instance_1
FROM `l7_flow_log`
WHERE (({time_filter}) AND ({base_filter})) limit {l7_tracing_limit}
Expand Down Expand Up @@ -710,7 +710,7 @@ def set_all_relate(dataframe_flowmetas, related_map, network_delay_us):
resp_tcp_seq = dataframe_flowmetas.at[index, 'resp_tcp_seq']
tap_side = dataframe_flowmetas.at[index, 'tap_side']
_id = dataframe_flowmetas.at[index, '_id']
vtap_id = dataframe_flowmetas.at[index, 'vtap_id']
agent_id = dataframe_flowmetas.at[index, 'agent_id']
_type = dataframe_flowmetas.at[index, 'type']
start_time_us = dataframe_flowmetas.at[index, 'start_time_us']
end_time_us = dataframe_flowmetas.at[index, 'end_time_us']
Expand All @@ -726,7 +726,7 @@ def set_all_relate(dataframe_flowmetas, related_map, network_delay_us):

id_to_related_tag[_id] = {
'_id': _id,
'vtap_id': vtap_id,
'agent_id': agent_id,
'req_tcp_seq': req_tcp_seq,
'resp_tcp_seq': resp_tcp_seq,
'tap_side': tap_side,
Expand Down Expand Up @@ -760,9 +760,9 @@ def set_all_relate(dataframe_flowmetas, related_map, network_delay_us):
syscall_trace_id_response = dataframe_flowmetas.at[
index, 'syscall_trace_id_response']
_id = dataframe_flowmetas.at[index, '_id']
vtap_id = dataframe_flowmetas.at[index, 'vtap_id']
agent_id = dataframe_flowmetas.at[index, 'agent_id']
if syscall_trace_id_request > 0 or syscall_trace_id_response > 0:
new_syscall_metas.add((_id, vtap_id, syscall_trace_id_request,
new_syscall_metas.add((_id, agent_id, syscall_trace_id_request,
syscall_trace_id_response))
if syscall_trace_id_request:
syscall_req_to_ids[syscall_trace_id_request].add(_id)
Expand Down Expand Up @@ -932,30 +932,30 @@ def set_relate(self, _ids, related_map, id_to_related_tag):
class L7SyscallMeta:
"""
系统调用追踪信息:
vtap_id, syscall_trace_id_request, syscall_trace_id_response, tap_side, start_time_us, end_time_us
agent_id, syscall_trace_id_request, syscall_trace_id_response, tap_side, start_time_us, end_time_us
"""

def __init__(self, flow_metas: Tuple):
self._id = flow_metas[0]
self.vtap_id = flow_metas[1]
self.agent_id = flow_metas[1]
self.syscall_trace_id_request = flow_metas[2]
self.syscall_trace_id_response = flow_metas[3]

def __eq__(self, rhs):
return (self.vtap_id == rhs.vtap_id and self.syscall_trace_id_request
return (self.agent_id == rhs.agent_id and self.syscall_trace_id_request
== rhs.syscall_trace_id_request
and self.syscall_trace_id_response
== rhs.syscall_trace_id_response)

def set_relate(self, _ids, related_map, id_to_related_tag):
for _id in _ids:
_id_df = id_to_related_tag[_id]['_id']
vtap_id_df = id_to_related_tag[_id]['vtap_id']
agent_id_df = id_to_related_tag[_id]['agent_id']
syscall_trace_id_request_df = id_to_related_tag[_id][
'syscall_trace_id_request']
syscall_trace_id_response_df = id_to_related_tag[_id][
'syscall_trace_id_response']
if _id_df == self._id or self.vtap_id != vtap_id_df:
if _id_df == self._id or self.agent_id != agent_id_df:
continue
if self.syscall_trace_id_request > 0:
if self.syscall_trace_id_request == syscall_trace_id_request_df or self.syscall_trace_id_request == syscall_trace_id_response_df:
Expand Down Expand Up @@ -1096,8 +1096,8 @@ def sort_and_set_parent(self):

class Service:

def __init__(self, vtap_id: int, process_id: int):
self.vtap_id = vtap_id
def __init__(self, agent_id: int, process_id: int):
self.agent_id = agent_id
self.process_id = process_id

self.direct_flows = []
Expand Down Expand Up @@ -1154,7 +1154,7 @@ def parent_set(self):
def check_client_process_flow(self, flow: dict):
"""检查该flow是否与service有关联关系,s-p的时间范围需要覆盖c-p,否则拆分为两个service"""
if self.process_id != flow["process_id_0"] \
or self.vtap_id != flow["vtap_id"]:
or self.agent_id != flow["agent_id"]:
return False
if self.start_time_us > flow["start_time_us"] \
or self.end_time_us < flow["end_time_us"]:
Expand All @@ -1164,7 +1164,7 @@ def check_client_process_flow(self, flow: dict):
def add_direct_flow(self, flow: dict):
"""direct_flow是指该服务直接接收到的,或直接发出的flow"""
#assert (
# self.vtap_id == flow.get('vtap_id')
# self.agent_id == flow.get('agent_id')
# and self.process_id == flow.get('process_id')
#)
if flow['tap_side'] == TAP_SIDE_SERVER_PROCESS:
Expand Down Expand Up @@ -1246,7 +1246,7 @@ def merge_flow(flows: list, flow: dict) -> bool:
if flow['type'] == L7_FLOW_TYPE_SESSION \
and flow['tap_side'] not in [TAP_SIDE_SERVER_PROCESS, TAP_SIDE_CLIENT_PROCESS]:
return False
# vtap_id, l7_protocol, flow_id, request_id
# agent_id, l7_protocol, flow_id, request_id
for i in range(len(flows)):
if flow['_id'] == flows[i]['_id']:
continue
Expand Down Expand Up @@ -1281,7 +1281,7 @@ def merge_flow(flows: list, flow: dict) -> bool:
if not request_flow or not response_flow:
continue
for key in [
'vtap_id', 'tap_port', 'tap_port_type', 'l7_protocol',
'agent_id', 'tap_port', 'tap_port_type', 'l7_protocol',
'request_id', 'tap_side'
]:
if _get_df_key(request_flow, key) != _get_df_key(
Expand Down Expand Up @@ -1428,37 +1428,37 @@ def sort_all_flows(dataframe_flows: DataFrame, network_delay_us: int,
f"{id_map[_id]}-{','.join(related_types)}-{_id}")
flow["related_ids"] = list(related_ids)

# 从Flow中提取Service:一个<vtap_id, local_process_id>二元组认为是一个Service。
# 从Flow中提取Service:一个<agent_id, local_process_id>二元组认为是一个Service。
service_map = defaultdict(Service)
for flow in syscall_flows:
if flow['tap_side'] != TAP_SIDE_SERVER_PROCESS:
continue
local_process_id = flow['process_id_1']
vtap_id = flow['vtap_id']
if (vtap_id, local_process_id, 0) not in service_map:
service = Service(vtap_id, local_process_id)
service_map[(vtap_id, local_process_id, 0)] = service
agent_id = flow['agent_id']
if (agent_id, local_process_id, 0) not in service_map:
service = Service(agent_id, local_process_id)
service_map[(agent_id, local_process_id, 0)] = service
# Service直接接收或发送的Flows_
service.add_direct_flow(flow)
else:
index = 0
for key in service_map.keys():
if key[0] == vtap_id and key[1] == local_process_id:
if key[0] == agent_id and key[1] == local_process_id:
index += 1
service = Service(vtap_id, local_process_id)
service_map[(vtap_id, local_process_id, index)] = service
service = Service(agent_id, local_process_id)
service_map[(agent_id, local_process_id, index)] = service
service.add_direct_flow(flow)

for flow in syscall_flows:
if flow['tap_side'] != TAP_SIDE_CLIENT_PROCESS:
continue
local_process_id = flow['process_id_0']
vtap_id = flow['vtap_id']
agent_id = flow['agent_id']
index = 0
max_start_time_service = None
if (vtap_id, local_process_id, 0) in service_map:
if (agent_id, local_process_id, 0) in service_map:
for key, service in service_map.items():
if key[0] == vtap_id and key[1] == local_process_id:
if key[0] == agent_id and key[1] == local_process_id:
index += 1
if service.check_client_process_flow(flow):
if not max_start_time_service:
Expand All @@ -1470,8 +1470,8 @@ def sort_all_flows(dataframe_flows: DataFrame, network_delay_us: int,
max_start_time_service.add_direct_flow(flow)
continue
# 没有attach到service上的flow生成一个新的service
service = Service(vtap_id, local_process_id)
service_map[(vtap_id, local_process_id, index)] = service
service = Service(agent_id, local_process_id)
service_map[(agent_id, local_process_id, index)] = service
# Service直接接收或发送的Flow
service.add_direct_flow(flow)

Expand Down Expand Up @@ -1976,8 +1976,8 @@ def _get_flow_dict(flow: DataFrame):
flow.get("childs", []),
"process_id":
flow.get("process_id", None),
"vtap_id":
flow.get("vtap_id", None),
"agent_id":
flow.get("agent_id", None),
"service_uid":
flow.get("service_uid", None),
"service_uname":
Expand Down Expand Up @@ -2080,10 +2080,10 @@ def network_flow_sort(traces):
for trace in local_rest_traces:
vtap_index = -1
for i, sorted_trace in enumerate(sorted_traces):
if vtap_index > 0 and sorted_trace['vtap_id'] != trace[
'vtap_id']:
if vtap_index > 0 and sorted_trace['agent_id'] != trace[
'agent_id']:
break
if sorted_trace['vtap_id'] == trace['vtap_id']:
if sorted_trace['agent_id'] == trace['agent_id']:
if sorted_trace['start_time_us'] < trace['start_time_us']:
vtap_index = i + 1
elif vtap_index == -1:
Expand Down
6 changes: 3 additions & 3 deletions app/app/application/tracing_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ async def trace_l7_flow(self,
if syscall_trace_id_request > 0 or syscall_trace_id_response > 0:
new_syscall_metas.add((
dataframe_flowmetas['_id'][index],
dataframe_flowmetas['vtap_id'][index],
dataframe_flowmetas['agent_id'][index],
dataframe_flowmetas['syscall_trace_id_request']
[index],
dataframe_flowmetas['syscall_trace_id_response']
Expand Down Expand Up @@ -323,7 +323,7 @@ async def trace_l7_flow(self,
id_to_related_tag = dict()
for index in new_flows.index:
_id = new_flows.at[index, '_id_str']
vtap_id = new_flows.at[index, 'vtap_id']
agent_id = new_flows.at[index, 'agent_id']
req_tcp_seq = new_flows.at[index, 'req_tcp_seq']
resp_tcp_seq = new_flows.at[index, 'resp_tcp_seq']
tap_side = new_flows.at[index, 'tap_side']
Expand All @@ -342,7 +342,7 @@ async def trace_l7_flow(self,

id_to_related_tag[_id] = {
'_id': _id,
'vtap_id': vtap_id,
'agent_id': agent_id,
'req_tcp_seq': req_tcp_seq,
'resp_tcp_seq': resp_tcp_seq,
'tap_side': tap_side,
Expand Down

0 comments on commit dce2cc1

Please sign in to comment.