Skip to content

Commit

Permalink
fix tracing_completion_api
Browse files Browse the repository at this point in the history
  • Loading branch information
taloric authored and sharang committed May 14, 2024
1 parent d52c96a commit 72dcbcf
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 40 deletions.
10 changes: 9 additions & 1 deletion app/app/application/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import uuid

from data.status import Status
from common.const import L7_FLOW_SIGNAL_SOURCE_OTEL

# 0: unspecified, 1: internal, 2: server, 3: client, 4: producer, 5: consumer
TAP_SIDE_BY_SPAN_KIND = {
Expand All @@ -24,12 +25,19 @@ def __init__(self, args, headers):
self.region = self.args.get("region", None)
self.signal_sources = self.args.get("signal_sources") or []

# Completing application span attribute information
def complete_app_span(self, app_spans):
"""
Fill application span attribute information
will be called in two scenario:
1. get external apm app spans
2. use tracing_completion api to fill sys spans & net spans
"""
for i, app_span in enumerate(app_spans):
tap_side_by_span_kind = TAP_SIDE_BY_SPAN_KIND.get(
app_span.get('span_kind'))
app_span["tap_side"] = tap_side_by_span_kind
# either external apm or tracing_completion should set this fixed value
app_span['signal_source'] = L7_FLOW_SIGNAL_SOURCE_OTEL
app_span.pop("span_kind", None)
for tag_int in [
"type", "req_tcp_seq", "resp_tcp_seq", "l7_protocol",
Expand Down
6 changes: 1 addition & 5 deletions app/app/application/l7_flow_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from .base import Base
from common import const
from common.utils import curl_perform, inner_defaultdict_set
from common.const import HTTP_OK
from common.const import (HTTP_OK, L7_FLOW_SIGNAL_SOURCE_PACKET, L7_FLOW_SIGNAL_SOURCE_EBPF, L7_FLOW_SIGNAL_SOURCE_OTEL)
from common.disjoint_set import DisjointSet
from opentelemetry.sdk.trace.id_generator import RandomIdGenerator

Expand All @@ -25,10 +25,6 @@
L7_FLOW_TYPE_RESPONSE = 1
L7_FLOW_TYPE_SESSION = 2

L7_FLOW_SIGNAL_SOURCE_PACKET = 0
L7_FLOW_SIGNAL_SOURCE_EBPF = 3
L7_FLOW_SIGNAL_SOURCE_OTEL = 4

TAP_SIDE_CLIENT_PROCESS = 'c-p'
TAP_SIDE_SERVER_PROCESS = 's-p'
TAP_SIDE_CLIENT_APP = 'c-app'
Expand Down
64 changes: 30 additions & 34 deletions app/app/application/tracing_completion.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
from collections import defaultdict
import pandas as pd
from log import logger
from pandas import DataFrame

from .l7_flow_tracing import (TAP_SIDE_CLIENT_PROCESS, TAP_SIDE_SERVER_PROCESS,
TAP_SIDE_CLIENT_APP, TAP_SIDE_SERVER_APP,
TAP_SIDE_APP, RETURN_FIELDS, L7_FLOW_SIGNAL_SOURCE_OTEL)
RETURN_FIELDS)
from .l7_flow_tracing import (L7FlowTracing, L7NetworkMeta, L7SyscallMeta,
L7XrequestMeta)
L7XrequestMeta, TraceInfo)
from .l7_flow_tracing import sort_all_flows, format_final_result, set_all_relate
from common import const
from common.const import L7_FLOW_SIGNAL_SOURCE_OTEL
from common.utils import inner_defaultdict_set
from config import config
from models.models import AppSpans

log = logger.getLogger(__name__)


class TracingCompletion(L7FlowTracing):

Expand Down Expand Up @@ -80,11 +84,9 @@ async def trace_l7_flow(self,
network_metas = set()
syscall_metas = set()
trace_ids = set()
app_metas = set()
x_request_metas = set()
l7_flow_ids = set()
xrequests = []
related_map = defaultdict(dict)
related_map = defaultdict(inner_defaultdict_set)
query_simple_trace_id = False
dataframe_flowmetas = self.app_spans_df
if dataframe_flowmetas.empty:
Expand Down Expand Up @@ -134,6 +136,10 @@ async def trace_l7_flow(self,
f"signal_source={L7_FLOW_SIGNAL_SOURCE_OTEL}")
new_trace_id_flows = await self.query_flowmetas(
time_filter, ' AND '.join(query_trace_filters))
if type(new_trace_id_flows
) == DataFrame and not new_trace_id_flows.empty:
new_trace_id_flows.rename(columns={'_id_str': '_id'},
inplace=True)
query_simple_trace_id = True
if delete_index:
dataframe_flowmetas = dataframe_flowmetas.drop(
Expand Down Expand Up @@ -162,6 +168,8 @@ async def trace_l7_flow(self,
f"signal_source={L7_FLOW_SIGNAL_SOURCE_OTEL}")
new_trace_id_flows = await self.query_flowmetas(
time_filter, ' AND '.join(query_trace_filters))
new_trace_id_flows.rename(columns={'_id_str': '_id'},
inplace=True)

if type(new_trace_id_flows) != DataFrame:
break
Expand Down Expand Up @@ -206,10 +214,6 @@ async def trace_l7_flow(self,
))
new_network_metas -= network_metas
network_metas |= new_network_metas
networks = [
L7NetworkMeta(nnm, network_delay_us)
for nnm in new_network_metas
]
for nnm in new_network_metas:
req_tcp_seq = nnm[2]
resp_tcp_seq = nnm[3]
Expand Down Expand Up @@ -251,7 +255,6 @@ async def trace_l7_flow(self,
))
new_syscall_metas -= syscall_metas
syscall_metas |= new_syscall_metas
syscalls = [L7SyscallMeta(nsm) for nsm in new_syscall_metas]
for nsm in new_syscall_metas:
syscall_trace_id_request = nsm[2]
syscall_trace_id_response = nsm[3]
Expand Down Expand Up @@ -289,9 +292,6 @@ async def trace_l7_flow(self,
dataframe_flowmetas['x_request_id_1'][index]))
new_x_request_metas -= x_request_metas
x_request_metas |= new_x_request_metas
xrequests = [
L7XrequestMeta(nxr) for nxr in new_x_request_metas
]
for nxr in new_x_request_metas:
x_request_id_0 = nxr[1]
x_request_id_1 = nxr[2]
Expand Down Expand Up @@ -374,28 +374,22 @@ async def trace_l7_flow(self,
log.debug(f"删除的trace id为:{deleted_trace_ids}")
new_flows.rename(columns={'_id_str': '_id'}, inplace=True)

new_related_map = defaultdict(dict)
new_flow_ids = set(new_flows['_id'])
if xrequests:
for x_request in xrequests:
x_request.set_relate(new_flow_ids, new_related_map,
id_to_related_tag)

if syscalls:
for syscall in syscalls:
syscall.set_relate(new_flow_ids, new_related_map,
id_to_related_tag)
trace_infos = TraceInfo.construct_from_dataframe(
dataframe_flowmetas
) + TraceInfo.construct_from_dataframe(new_flows)

if networks:
for network in networks:
network.set_relate(new_flow_ids, new_related_map,
id_to_related_tag)
set_all_relate(
trace_infos,
related_map,
network_delay_us,
fast_check=True,
skip_first_n_trace_infos=len(dataframe_flowmetas))

new_flow_delete_index = []
for index in range(len(new_flows.index)):
_id = new_flows['_id'][index]
# Delete unrelated data
if _id not in new_related_map:
if _id not in related_map:
new_flow_delete_index.append(index)
if new_flow_delete_index:
new_flows = new_flows.drop(
Expand All @@ -413,7 +407,8 @@ async def trace_l7_flow(self,
new_flows_length = len(dataframe_flowmetas)
if old_flows_length == new_flows_length:
break
set_all_relate(dataframe_flowmetas, related_map, network_delay_us)
# end of `for i in range(max_iteration):`

# 获取追踪到的所有应用流日志
return_fields += RETURN_FIELDS
flow_fields = list(RETURN_FIELDS)
Expand All @@ -437,10 +432,11 @@ async def trace_l7_flow(self,
l7_flows.at[index, 'related_ids'] = related_map[l7_flows.at[index,
'_id']]
# 对所有应用流日志排序
l7_flows_merged, app_flows, networks = sort_all_flows(
l7_flows_merged, app_flows, networks, flow_index_to_id0, related_flow_index_map = sort_all_flows(
l7_flows, network_delay_us, return_fields, ntp_delay_us)
return format(l7_flows_merged, networks, app_flows,
self.args.get('_id'), network_delay_us)
return format_final_result(l7_flows_merged, networks, app_flows,
self.args.get('_id'), network_delay_us,
flow_index_to_id0, related_flow_index_map)

# update start time and end time
def update_time(self):
Expand Down
5 changes: 5 additions & 0 deletions app/app/common/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,8 @@
TAP_SIDE_REST: 13,
TAP_SIDE_LOCAL: 13, # rest和local需要就近排列到其他位置上
}

# signal_source
L7_FLOW_SIGNAL_SOURCE_PACKET = 0
L7_FLOW_SIGNAL_SOURCE_EBPF = 3
L7_FLOW_SIGNAL_SOURCE_OTEL = 4

0 comments on commit 72dcbcf

Please sign in to comment.