From c405df03c799cea45bd0976702c1f62ef5f0296c Mon Sep 17 00:00:00 2001 From: Liu Chao Date: Thu, 7 Sep 2023 14:17:52 +0800 Subject: [PATCH] Fix tempo call apm **Phenomenon and reproduction steps** **Root cause and solution** **Impactions** **Test method** **Affected branch(es)** * main **Checklist** - [ ] Dependencies update required - [ ] Common bug (similar problem in other repo) --- app/app/application/application.py | 14 +++++++++----- app/app/application/l7_flow_tracing.py | 23 +++++++++++++++-------- app/app/application/tracing_completion.py | 6 +++--- 3 files changed, 27 insertions(+), 16 deletions(-) diff --git a/app/app/application/application.py b/app/app/application/application.py index b92fbd1..5a3a5c4 100644 --- a/app/app/application/application.py +++ b/app/app/application/application.py @@ -23,8 +23,12 @@ async def application_log_l7_tracing(request): args.validate() l7_flow_tracing = L7FlowTracing(args, request.headers) if config.call_apm_api_to_supplement_trace: - trace_id, ch_res = await l7_flow_tracing.get_trace_id_by_id() - l7_flow_tracing.status.append("Query trace_id", ch_res) + ch_res = None + if not args.get("_id"): + trace_id = self.args.get("trace_id") + else: + trace_id, ch_res = await l7_flow_tracing.get_trace_id_by_id() + l7_flow_tracing.status.append("Query trace_id", ch_res) if not trace_id: status, response, failed_regions = await l7_flow_tracing.query() else: @@ -43,9 +47,9 @@ async def application_log_l7_tracing(request): args.app_spans = app_spans tracing_completion = TracingCompletion( args, request.headers) - tracing_completion.status.append("Query trace_id", ch_res) - status, response, failed_regions = await tracing_completion.query( - ) + if ch_res: + tracing_completion.status.append("Query trace_id", ch_res) + status, response, failed_regions = await tracing_completion.query() else: status, response, failed_regions = await L7FlowTracing( args, request.headers).query() diff --git a/app/app/application/l7_flow_tracing.py b/app/app/application/l7_flow_tracing.py index aabb913..f88cf31 100644 --- a/app/app/application/l7_flow_tracing.py +++ b/app/app/application/l7_flow_tracing.py @@ -130,6 +130,7 @@ class L7FlowTracing(Base): + async def query(self): max_iteration = self.args.get("max_iteration", 30) network_delay_us = self.args.get("network_delay_us") @@ -201,7 +202,7 @@ async def trace_l7_flow(self, dataframe_flowmetas = await self.query_flowmetas( time_filter, base_filter) if type(dataframe_flowmetas) != DataFrame: - return [] + return {} related_map[dataframe_flowmetas['_id'][0]] = [ f"{dataframe_flowmetas['_id'][0]}-base" ] @@ -401,7 +402,7 @@ async def trace_l7_flow(self, if len(set(dataframe_flowmetas['_id'])) - len_of_flows < 1: break if not l7_flow_ids: - return [] + return {} # 获取追踪到的所有应用流日志 return_fields += RETURN_FIELDS flow_fields = list(RETURN_FIELDS) @@ -411,7 +412,7 @@ async def trace_l7_flow(self, l7_flows = await self.query_all_flows(time_filter, l7_flow_ids, flow_fields) if type(l7_flows) != DataFrame: - return [] + return {} l7_flows.insert(0, "related_ids", "") l7_flows = l7_flows.where(l7_flows.notnull(), None) for index in range(len(l7_flows.index)): @@ -510,6 +511,7 @@ class L7XrequestMeta: """ x_request_id追踪: """ + def __init__(self, flow_metas: Tuple): self._id = flow_metas[0] self.x_request_id_0 = flow_metas[1] @@ -556,6 +558,7 @@ class L7AppMeta: 应用span追踪: span_id, parent_span_id """ + def __init__(self, flow_metas: Tuple): self._id = flow_metas[0] self.tap_side = flow_metas[1] @@ -604,6 +607,7 @@ class L7NetworkMeta: 网络流量追踪信息: req_tcp_seq, resp_tcp_seq, start_time_us, end_time_us """ + def __init__(self, flow_metas: Tuple, network_delay_us: int): self._id = flow_metas[0] self.type = flow_metas[1] @@ -679,6 +683,7 @@ class L7SyscallMeta: 系统调用追踪信息: vtap_id, syscall_trace_id_request, syscall_trace_id_response, tap_side, start_time_us, end_time_us """ + def __init__(self, flow_metas: Tuple): self._id = flow_metas[0] self.vtap_id = flow_metas[1] @@ -689,11 +694,10 @@ def __init__(self, flow_metas: Tuple): self.end_time_us = flow_metas[6] def __eq__(self, rhs): - return ( - self.vtap_id == rhs.vtap_id - and self.syscall_trace_id_request == rhs.syscall_trace_id_request - and - self.syscall_trace_id_response == rhs.syscall_trace_id_response) + return (self.vtap_id == rhs.vtap_id and self.syscall_trace_id_request + == rhs.syscall_trace_id_request + and self.syscall_trace_id_response + == rhs.syscall_trace_id_response) def to_tuple(self): return (self.vtap_id, self.syscall_trace_id_request, @@ -739,6 +743,7 @@ def to_sql_filter(self) -> str: class Networks: + def __init__(self): self.req_tcp_seq = None self.resp_tcp_seq = None @@ -820,6 +825,7 @@ def sort_and_set_parent(self): class Service: + def __init__(self, vtap_id: int, process_id: int): self.vtap_id = vtap_id self.process_id = process_id @@ -1551,6 +1557,7 @@ def format(services, networks, app_flows, _id, network_delay_us): class TraceSort: + def __init__(self, traces): self.traces = traces self.sorted_indexs = [] diff --git a/app/app/application/tracing_completion.py b/app/app/application/tracing_completion.py index 81a9346..450b306 100644 --- a/app/app/application/tracing_completion.py +++ b/app/app/application/tracing_completion.py @@ -23,6 +23,7 @@ class TracingCompletion(L7FlowTracing): + def __init__(self, args, headers): super().__init__(args, headers) self.app_spans = [ @@ -52,7 +53,6 @@ async def query(self): for res in rst.get("tracing", []): res.pop("selftime", None) res.pop("Enum(tap_side)", None) - res.pop("attribute", None) res.pop("id", None) res.pop("parent_id", None) res.pop("childs", None) @@ -93,7 +93,7 @@ async def trace_l7_flow(self, related_map = defaultdict(list) dataframe_flowmetas = self.app_spans_df if dataframe_flowmetas.empty: - return [] + return {} for i in range(len(self.app_spans)): for j in range(len(self.app_spans)): if i == j: @@ -309,7 +309,7 @@ async def trace_l7_flow(self, l7_flows = await self.query_all_flows(time_filter, l7_flow_ids, flow_fields) if type(l7_flows) != DataFrame: - return [] + return {} # Merge Incoming App Spans l7_flows = pd.concat([l7_flows, self.app_spans_df], join="outer",