Skip to content

Commit

Permalink
simplify Outputs field under Collector sturct
Browse files Browse the repository at this point in the history
  • Loading branch information
karimra committed Oct 27, 2020
1 parent 6aac40f commit 661de56
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 55 deletions.
89 changes: 56 additions & 33 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ type Config struct {

// Collector //
type Collector struct {
Config *Config
Subscriptions map[string]*SubscriptionConfig
Outputs map[string][]outputs.Output
dialOpts []grpc.DialOption
Config *Config
dialOpts []grpc.DialOption
//
m *sync.Mutex
Targets map[string]*Target
logger *log.Logger
httpServer *http.Server
m *sync.Mutex
Subscriptions map[string]*SubscriptionConfig
Outputs map[string]outputs.Output
Targets map[string]*Target
logger *log.Logger
httpServer *http.Server
}

type CollectorOption func(c *Collector)
Expand All @@ -64,9 +64,17 @@ func WithSubscriptions(subs map[string]*SubscriptionConfig) CollectorOption {
}
}

func WithOutputs(outs map[string][]outputs.Output) CollectorOption {
func WithOutputs(ctx context.Context, outs map[string]map[string]interface{}, logger *log.Logger) CollectorOption {
return func(c *Collector) {
c.Outputs = outs
for outputName, outputCfg := range outs {
if outType, ok := outputCfg["type"]; ok {
if initializer, ok := outputs.Outputs[outType.(string)]; ok {
out := initializer()
go out.Init(ctx, outputCfg, logger)
c.Outputs[outputName] = out
}
}
}
}
}

Expand All @@ -90,6 +98,7 @@ func NewCollector(config *Config, targetConfigs map[string]*TargetConfig, opts .
Config: config,
m: new(sync.Mutex),
Targets: make(map[string]*Target),
Outputs: make(map[string]outputs.Output),
httpServer: httpServer,
}
for _, op := range opts {
Expand Down Expand Up @@ -130,29 +139,15 @@ func (c *Collector) InitTarget(tc *TargetConfig) {
}
t := NewTarget(tc)
//
t.Subscriptions = make([]*SubscriptionConfig, 0, len(tc.Subscriptions))
t.Subscriptions = make(map[string]*SubscriptionConfig)
for _, subName := range tc.Subscriptions {
if sub, ok := c.Subscriptions[subName]; ok {
t.Subscriptions = append(t.Subscriptions, sub)
t.Subscriptions[subName] = sub
}
}
if len(t.Subscriptions) == 0 {
t.Subscriptions = make([]*SubscriptionConfig, 0, len(c.Subscriptions))
for _, sub := range c.Subscriptions {
t.Subscriptions = append(t.Subscriptions, sub)
}
}
//
t.Outputs = make([]outputs.Output, 0, len(tc.Outputs))
for _, outName := range tc.Outputs {
if outs, ok := c.Outputs[outName]; ok {
t.Outputs = append(t.Outputs, outs...)
}
}
if len(t.Outputs) == 0 {
t.Outputs = make([]outputs.Output, 0, len(c.Outputs))
for _, o := range c.Outputs {
t.Outputs = append(t.Outputs, o...)
t.Subscriptions[sub.Name] = sub
}
}
c.m.Lock()
Expand Down Expand Up @@ -194,10 +189,8 @@ func (c *Collector) Start(ctx context.Context) {
}()
}
defer func() {
for _, outputs := range c.Outputs {
for _, o := range outputs {
o.Close()
}
for _, o := range c.Outputs {
o.Close()
}
}()
wg := new(sync.WaitGroup)
Expand All @@ -214,10 +207,12 @@ func (c *Collector) Start(ctx context.Context) {
if c.Config.Debug {
c.logger.Printf("received gNMI Subscribe Response: %+v", rsp)
}
m := outputs.Meta{"source": t.Config.Name, "format": c.Config.Format, "subscription-name": rsp.SubscriptionName}
if c.subscriptionMode(rsp.SubscriptionName) == "ONCE" {
t.Export(ctx, rsp.Response, outputs.Meta{"source": t.Config.Name, "format": c.Config.Format, "subscription-name": rsp.SubscriptionName})
c.Export(ctx, rsp.Response, m, t.Config.Outputs...)
} else {
go t.Export(ctx, rsp.Response, outputs.Meta{"source": t.Config.Name, "format": c.Config.Format, "subscription-name": rsp.SubscriptionName})
//go t.Export(ctx, rsp.Response, outputs.Meta{"source": t.Config.Name, "format": c.Config.Format, "subscription-name": rsp.SubscriptionName})
go c.Export(ctx, rsp.Response, m, t.Config.Outputs...)
}
if remainingOnceSubscriptions > 0 {
if c.subscriptionMode(rsp.SubscriptionName) == "ONCE" {
Expand Down Expand Up @@ -300,3 +295,31 @@ func (c *Collector) subscriptionMode(name string) string {
}
return ""
}

func (c *Collector) Export(ctx context.Context, rsp *gnmi.SubscribeResponse, m outputs.Meta, outs ...string) {
if rsp == nil {
return
}
wg := new(sync.WaitGroup)
if len(outs) == 0 {
wg.Add(len(c.Outputs))
for _, o := range c.Outputs {
go func(o outputs.Output) {
defer wg.Done()
o.Write(ctx, rsp, m)
}(o)
}
wg.Wait()
return
}
for _, name := range outs {
if o, ok := c.Outputs[name]; ok {
wg.Add(1)
go func(o outputs.Output) {
defer wg.Done()
o.Write(ctx, rsp, m)
}(o)
}
}
wg.Wait()
}
24 changes: 2 additions & 22 deletions collector/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"sync"
"time"

"github.com/karimra/gnmic/outputs"
"github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/gnmi/proto/gnmi_ext"
"google.golang.org/grpc"
Expand All @@ -33,8 +32,7 @@ type TargetError struct {
// Target represents a gNMI enabled box
type Target struct {
Config *TargetConfig
Subscriptions []*SubscriptionConfig
Outputs []outputs.Output
Subscriptions map[string]*SubscriptionConfig

m *sync.Mutex
Client gnmi.GNMIClient
Expand Down Expand Up @@ -74,16 +72,14 @@ func (tc *TargetConfig) String() string {
func NewTarget(c *TargetConfig) *Target {
t := &Target{
Config: c,
Subscriptions: make([]*SubscriptionConfig, 0),
Outputs: make([]outputs.Output, 0),
Subscriptions: make(map[string]*SubscriptionConfig),
m: new(sync.Mutex),
SubscribeClients: make(map[string]gnmi.GNMI_SubscribeClient),
PollChan: make(chan string),
SubscribeResponses: make(chan *SubscribeResponse, c.BufferSize),
Errors: make(chan *TargetError),
}
return t

}

// NewTLS //
Expand Down Expand Up @@ -273,22 +269,6 @@ SUBSC:
}
}

// Export //
func (t *Target) Export(ctx context.Context, rsp *gnmi.SubscribeResponse, m outputs.Meta) {
if rsp == nil || len(t.Outputs) == 0 {
return
}
wg := new(sync.WaitGroup)
wg.Add(len(t.Outputs))
for _, o := range t.Outputs {
go func(o outputs.Output) {
defer wg.Done()
o.Write(ctx, rsp, m)
}(o)
}
wg.Wait()
}

func loadCerts(tlscfg *tls.Config, c *TargetConfig) error {
if *c.TLSCert != "" && *c.TLSKey != "" {
certificate, err := tls.LoadX509KeyPair(*c.TLSCert, *c.TLSKey)
Expand Down

0 comments on commit 661de56

Please sign in to comment.