diff --git a/agent/backend/diode/scrape.go b/agent/backend/diode/scrape.go index b1fdfba9d..c4a2d86e2 100644 --- a/agent/backend/diode/scrape.go +++ b/agent/backend/diode/scrape.go @@ -26,7 +26,7 @@ const ( func (d *diodeBackend) createOtlpMqttExporter(ctx context.Context, cancelFunc context.CancelFunc) (exporter.Logs, error) { - bridgeService := otel.NewBridgeService(ctx, &d.policyRepo, d.agentTags) + bridgeService := otel.NewBridgeService(ctx, cancelFunc, &d.policyRepo, d.agentTags) if d.mqttClient != nil { cfg := otlpmqttexporter.CreateConfigClient(d.mqttClient, d.logTopic, d.version, bridgeService) set := otlpmqttexporter.CreateDefaultSettings(d.logger) diff --git a/agent/backend/pktvisor/scrape.go b/agent/backend/pktvisor/scrape.go index 120e42473..2c0423efd 100644 --- a/agent/backend/pktvisor/scrape.go +++ b/agent/backend/pktvisor/scrape.go @@ -37,26 +37,26 @@ func (p *pktvisorBackend) scrapeMetrics(period uint) (map[string]interface{}, er func (p *pktvisorBackend) createOtlpMqttExporter(ctx context.Context, cancelFunc context.CancelFunc) (exporter.Metrics, error) { - bridgeService := otel.NewBridgeService(ctx, &p.policyRepo, p.agentTags) + bridgeService := otel.NewBridgeService(ctx, cancelFunc, &p.policyRepo, p.agentTags) if p.mqttClient != nil { cfg := otlpmqttexporter.CreateConfigClient(p.mqttClient, p.otlpMetricsTopic, p.pktvisorVersion, bridgeService) set := otlpmqttexporter.CreateDefaultSettings(p.logger) // Create the OTLP metrics exporter that'll receive and verify the metrics produced. - exporter, err := otlpmqttexporter.CreateMetricsExporter(ctx, set, cfg) + metricsExporter, err := otlpmqttexporter.CreateMetricsExporter(ctx, set, cfg) if err != nil { return nil, err } - return exporter, nil + return metricsExporter, nil } else { cfg := otlpmqttexporter.CreateConfig(p.mqttConfig.Address, p.mqttConfig.Id, p.mqttConfig.Key, p.mqttConfig.ChannelID, p.pktvisorVersion, p.otlpMetricsTopic, bridgeService) set := otlpmqttexporter.CreateDefaultSettings(p.logger) // Create the OTLP metrics exporter that'll receive and verify the metrics produced. - exporter, err := otlpmqttexporter.CreateMetricsExporter(ctx, set, cfg) + metricsExporter, err := otlpmqttexporter.CreateMetricsExporter(ctx, set, cfg) if err != nil { return nil, err } - return exporter, nil + return metricsExporter, nil } } diff --git a/agent/otel/bridgeservice.go b/agent/otel/bridgeservice.go index 9e1ea7d2a..b933a344f 100644 --- a/agent/otel/bridgeservice.go +++ b/agent/otel/bridgeservice.go @@ -2,8 +2,9 @@ package otel import ( "context" - "github.com/orb-community/orb/agent/policies" "strings" + + "github.com/orb-community/orb/agent/policies" ) type AgentBridgeService interface { @@ -21,13 +22,15 @@ var _ AgentBridgeService = (*BridgeService)(nil) type BridgeService struct { bridgeContext context.Context + cancelFunc context.CancelFunc policyRepo policies.PolicyRepo AgentTags map[string]string } -func NewBridgeService(ctx context.Context, policyRepo *policies.PolicyRepo, agentTags map[string]string) *BridgeService { +func NewBridgeService(ctx context.Context, cancelFunc context.CancelFunc, policyRepo *policies.PolicyRepo, agentTags map[string]string) *BridgeService { return &BridgeService{ bridgeContext: ctx, + cancelFunc: cancelFunc, policyRepo: *policyRepo, AgentTags: agentTags, } @@ -47,5 +50,5 @@ func (b *BridgeService) RetrieveAgentInfoByPolicyName(policyName string) (*Agent func (b *BridgeService) NotifyAgentDisconnection(ctx context.Context, err error) { ctx.Done() - b.bridgeContext.Done() + b.cancelFunc() }