Skip to content

Commit

Permalink
cancel func on bridge service to shutdown agent main routine
Browse files Browse the repository at this point in the history
  • Loading branch information
Bruno Valenca committed Jul 17, 2023
1 parent 1e33b81 commit 0c8d074
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 9 deletions.
2 changes: 1 addition & 1 deletion agent/backend/diode/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (

func (d *diodeBackend) createOtlpMqttExporter(ctx context.Context, cancelFunc context.CancelFunc) (exporter.Logs, error) {

bridgeService := otel.NewBridgeService(ctx, &d.policyRepo, d.agentTags)
bridgeService := otel.NewBridgeService(ctx, cancelFunc, &d.policyRepo, d.agentTags)
if d.mqttClient != nil {
cfg := otlpmqttexporter.CreateConfigClient(d.mqttClient, d.logTopic, d.version, bridgeService)
set := otlpmqttexporter.CreateDefaultSettings(d.logger)
Expand Down
10 changes: 5 additions & 5 deletions agent/backend/pktvisor/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,26 @@ func (p *pktvisorBackend) scrapeMetrics(period uint) (map[string]interface{}, er

func (p *pktvisorBackend) createOtlpMqttExporter(ctx context.Context, cancelFunc context.CancelFunc) (exporter.Metrics, error) {

bridgeService := otel.NewBridgeService(ctx, &p.policyRepo, p.agentTags)
bridgeService := otel.NewBridgeService(ctx, cancelFunc, &p.policyRepo, p.agentTags)
if p.mqttClient != nil {
cfg := otlpmqttexporter.CreateConfigClient(p.mqttClient, p.otlpMetricsTopic, p.pktvisorVersion, bridgeService)
set := otlpmqttexporter.CreateDefaultSettings(p.logger)
// Create the OTLP metrics exporter that'll receive and verify the metrics produced.
exporter, err := otlpmqttexporter.CreateMetricsExporter(ctx, set, cfg)
metricsExporter, err := otlpmqttexporter.CreateMetricsExporter(ctx, set, cfg)
if err != nil {
return nil, err
}
return exporter, nil
return metricsExporter, nil
} else {
cfg := otlpmqttexporter.CreateConfig(p.mqttConfig.Address, p.mqttConfig.Id, p.mqttConfig.Key,
p.mqttConfig.ChannelID, p.pktvisorVersion, p.otlpMetricsTopic, bridgeService)
set := otlpmqttexporter.CreateDefaultSettings(p.logger)
// Create the OTLP metrics exporter that'll receive and verify the metrics produced.
exporter, err := otlpmqttexporter.CreateMetricsExporter(ctx, set, cfg)
metricsExporter, err := otlpmqttexporter.CreateMetricsExporter(ctx, set, cfg)
if err != nil {
return nil, err
}
return exporter, nil
return metricsExporter, nil
}

}
Expand Down
9 changes: 6 additions & 3 deletions agent/otel/bridgeservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package otel

import (
"context"
"github.com/orb-community/orb/agent/policies"
"strings"

"github.com/orb-community/orb/agent/policies"
)

type AgentBridgeService interface {
Expand All @@ -21,13 +22,15 @@ var _ AgentBridgeService = (*BridgeService)(nil)

type BridgeService struct {
bridgeContext context.Context
cancelFunc context.CancelFunc
policyRepo policies.PolicyRepo
AgentTags map[string]string
}

func NewBridgeService(ctx context.Context, policyRepo *policies.PolicyRepo, agentTags map[string]string) *BridgeService {
func NewBridgeService(ctx context.Context, cancelFunc context.CancelFunc, policyRepo *policies.PolicyRepo, agentTags map[string]string) *BridgeService {
return &BridgeService{
bridgeContext: ctx,
cancelFunc: cancelFunc,
policyRepo: *policyRepo,
AgentTags: agentTags,
}
Expand All @@ -47,5 +50,5 @@ func (b *BridgeService) RetrieveAgentInfoByPolicyName(policyName string) (*Agent

func (b *BridgeService) NotifyAgentDisconnection(ctx context.Context, err error) {
ctx.Done()
b.bridgeContext.Done()
b.cancelFunc()
}

0 comments on commit 0c8d074

Please sign in to comment.