Skip to content

Commit

Permalink
Fix tempo call apm
Browse files Browse the repository at this point in the history
**Phenomenon and reproduction steps**

**Root cause and solution**

**Impactions**

**Test method**

**Affected branch(es)**

* main

**Checklist**

- [ ] Dependencies update required
- [ ] Common bug (similar problem in other repo)
  • Loading branch information
xiaochaoren1 committed Sep 7, 2023
1 parent 884ea8b commit c405df0
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 16 deletions.
14 changes: 9 additions & 5 deletions app/app/application/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ async def application_log_l7_tracing(request):
args.validate()
l7_flow_tracing = L7FlowTracing(args, request.headers)
if config.call_apm_api_to_supplement_trace:
trace_id, ch_res = await l7_flow_tracing.get_trace_id_by_id()
l7_flow_tracing.status.append("Query trace_id", ch_res)
ch_res = None
if not args.get("_id"):
trace_id = self.args.get("trace_id")
else:
trace_id, ch_res = await l7_flow_tracing.get_trace_id_by_id()
l7_flow_tracing.status.append("Query trace_id", ch_res)
if not trace_id:
status, response, failed_regions = await l7_flow_tracing.query()
else:
Expand All @@ -43,9 +47,9 @@ async def application_log_l7_tracing(request):
args.app_spans = app_spans
tracing_completion = TracingCompletion(
args, request.headers)
tracing_completion.status.append("Query trace_id", ch_res)
status, response, failed_regions = await tracing_completion.query(
)
if ch_res:
tracing_completion.status.append("Query trace_id", ch_res)
status, response, failed_regions = await tracing_completion.query()
else:
status, response, failed_regions = await L7FlowTracing(
args, request.headers).query()
Expand Down
23 changes: 15 additions & 8 deletions app/app/application/l7_flow_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@


class L7FlowTracing(Base):

async def query(self):
max_iteration = self.args.get("max_iteration", 30)
network_delay_us = self.args.get("network_delay_us")
Expand Down Expand Up @@ -201,7 +202,7 @@ async def trace_l7_flow(self,
dataframe_flowmetas = await self.query_flowmetas(
time_filter, base_filter)
if type(dataframe_flowmetas) != DataFrame:
return []
return {}
related_map[dataframe_flowmetas['_id'][0]] = [
f"{dataframe_flowmetas['_id'][0]}-base"
]
Expand Down Expand Up @@ -401,7 +402,7 @@ async def trace_l7_flow(self,
if len(set(dataframe_flowmetas['_id'])) - len_of_flows < 1:
break
if not l7_flow_ids:
return []
return {}
# 获取追踪到的所有应用流日志
return_fields += RETURN_FIELDS
flow_fields = list(RETURN_FIELDS)
Expand All @@ -411,7 +412,7 @@ async def trace_l7_flow(self,
l7_flows = await self.query_all_flows(time_filter, l7_flow_ids,
flow_fields)
if type(l7_flows) != DataFrame:
return []
return {}
l7_flows.insert(0, "related_ids", "")
l7_flows = l7_flows.where(l7_flows.notnull(), None)
for index in range(len(l7_flows.index)):
Expand Down Expand Up @@ -510,6 +511,7 @@ class L7XrequestMeta:
"""
x_request_id追踪:
"""

def __init__(self, flow_metas: Tuple):
self._id = flow_metas[0]
self.x_request_id_0 = flow_metas[1]
Expand Down Expand Up @@ -556,6 +558,7 @@ class L7AppMeta:
应用span追踪:
span_id, parent_span_id
"""

def __init__(self, flow_metas: Tuple):
self._id = flow_metas[0]
self.tap_side = flow_metas[1]
Expand Down Expand Up @@ -604,6 +607,7 @@ class L7NetworkMeta:
网络流量追踪信息:
req_tcp_seq, resp_tcp_seq, start_time_us, end_time_us
"""

def __init__(self, flow_metas: Tuple, network_delay_us: int):
self._id = flow_metas[0]
self.type = flow_metas[1]
Expand Down Expand Up @@ -679,6 +683,7 @@ class L7SyscallMeta:
系统调用追踪信息:
vtap_id, syscall_trace_id_request, syscall_trace_id_response, tap_side, start_time_us, end_time_us
"""

def __init__(self, flow_metas: Tuple):
self._id = flow_metas[0]
self.vtap_id = flow_metas[1]
Expand All @@ -689,11 +694,10 @@ def __init__(self, flow_metas: Tuple):
self.end_time_us = flow_metas[6]

def __eq__(self, rhs):
return (
self.vtap_id == rhs.vtap_id
and self.syscall_trace_id_request == rhs.syscall_trace_id_request
and
self.syscall_trace_id_response == rhs.syscall_trace_id_response)
return (self.vtap_id == rhs.vtap_id and self.syscall_trace_id_request
== rhs.syscall_trace_id_request
and self.syscall_trace_id_response
== rhs.syscall_trace_id_response)

def to_tuple(self):
return (self.vtap_id, self.syscall_trace_id_request,
Expand Down Expand Up @@ -739,6 +743,7 @@ def to_sql_filter(self) -> str:


class Networks:

def __init__(self):
self.req_tcp_seq = None
self.resp_tcp_seq = None
Expand Down Expand Up @@ -820,6 +825,7 @@ def sort_and_set_parent(self):


class Service:

def __init__(self, vtap_id: int, process_id: int):
self.vtap_id = vtap_id
self.process_id = process_id
Expand Down Expand Up @@ -1551,6 +1557,7 @@ def format(services, networks, app_flows, _id, network_delay_us):


class TraceSort:

def __init__(self, traces):
self.traces = traces
self.sorted_indexs = []
Expand Down
6 changes: 3 additions & 3 deletions app/app/application/tracing_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@


class TracingCompletion(L7FlowTracing):

def __init__(self, args, headers):
super().__init__(args, headers)
self.app_spans = [
Expand Down Expand Up @@ -52,7 +53,6 @@ async def query(self):
for res in rst.get("tracing", []):
res.pop("selftime", None)
res.pop("Enum(tap_side)", None)
res.pop("attribute", None)
res.pop("id", None)
res.pop("parent_id", None)
res.pop("childs", None)
Expand Down Expand Up @@ -93,7 +93,7 @@ async def trace_l7_flow(self,
related_map = defaultdict(list)
dataframe_flowmetas = self.app_spans_df
if dataframe_flowmetas.empty:
return []
return {}
for i in range(len(self.app_spans)):
for j in range(len(self.app_spans)):
if i == j:
Expand Down Expand Up @@ -309,7 +309,7 @@ async def trace_l7_flow(self,
l7_flows = await self.query_all_flows(time_filter, l7_flow_ids,
flow_fields)
if type(l7_flows) != DataFrame:
return []
return {}
# Merge Incoming App Spans
l7_flows = pd.concat([l7_flows, self.app_spans_df],
join="outer",
Expand Down

0 comments on commit c405df0

Please sign in to comment.