From 661de5660818256cb79a1b4e464f736db05e008a Mon Sep 17 00:00:00 2001 From: Karim Radhouani Date: Tue, 27 Oct 2020 19:38:49 +0800 Subject: [PATCH] simplify Outputs field under Collector sturct --- collector/collector.go | 89 ++++++++++++++++++++++++++---------------- collector/target.go | 24 +----------- 2 files changed, 58 insertions(+), 55 deletions(-) diff --git a/collector/collector.go b/collector/collector.go index 002fc644..dca23acc 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -35,15 +35,15 @@ type Config struct { // Collector // type Collector struct { - Config *Config - Subscriptions map[string]*SubscriptionConfig - Outputs map[string][]outputs.Output - dialOpts []grpc.DialOption + Config *Config + dialOpts []grpc.DialOption // - m *sync.Mutex - Targets map[string]*Target - logger *log.Logger - httpServer *http.Server + m *sync.Mutex + Subscriptions map[string]*SubscriptionConfig + Outputs map[string]outputs.Output + Targets map[string]*Target + logger *log.Logger + httpServer *http.Server } type CollectorOption func(c *Collector) @@ -64,9 +64,17 @@ func WithSubscriptions(subs map[string]*SubscriptionConfig) CollectorOption { } } -func WithOutputs(outs map[string][]outputs.Output) CollectorOption { +func WithOutputs(ctx context.Context, outs map[string]map[string]interface{}, logger *log.Logger) CollectorOption { return func(c *Collector) { - c.Outputs = outs + for outputName, outputCfg := range outs { + if outType, ok := outputCfg["type"]; ok { + if initializer, ok := outputs.Outputs[outType.(string)]; ok { + out := initializer() + go out.Init(ctx, outputCfg, logger) + c.Outputs[outputName] = out + } + } + } } } @@ -90,6 +98,7 @@ func NewCollector(config *Config, targetConfigs map[string]*TargetConfig, opts . Config: config, m: new(sync.Mutex), Targets: make(map[string]*Target), + Outputs: make(map[string]outputs.Output), httpServer: httpServer, } for _, op := range opts { @@ -130,29 +139,15 @@ func (c *Collector) InitTarget(tc *TargetConfig) { } t := NewTarget(tc) // - t.Subscriptions = make([]*SubscriptionConfig, 0, len(tc.Subscriptions)) + t.Subscriptions = make(map[string]*SubscriptionConfig) for _, subName := range tc.Subscriptions { if sub, ok := c.Subscriptions[subName]; ok { - t.Subscriptions = append(t.Subscriptions, sub) + t.Subscriptions[subName] = sub } } if len(t.Subscriptions) == 0 { - t.Subscriptions = make([]*SubscriptionConfig, 0, len(c.Subscriptions)) for _, sub := range c.Subscriptions { - t.Subscriptions = append(t.Subscriptions, sub) - } - } - // - t.Outputs = make([]outputs.Output, 0, len(tc.Outputs)) - for _, outName := range tc.Outputs { - if outs, ok := c.Outputs[outName]; ok { - t.Outputs = append(t.Outputs, outs...) - } - } - if len(t.Outputs) == 0 { - t.Outputs = make([]outputs.Output, 0, len(c.Outputs)) - for _, o := range c.Outputs { - t.Outputs = append(t.Outputs, o...) + t.Subscriptions[sub.Name] = sub } } c.m.Lock() @@ -194,10 +189,8 @@ func (c *Collector) Start(ctx context.Context) { }() } defer func() { - for _, outputs := range c.Outputs { - for _, o := range outputs { - o.Close() - } + for _, o := range c.Outputs { + o.Close() } }() wg := new(sync.WaitGroup) @@ -214,10 +207,12 @@ func (c *Collector) Start(ctx context.Context) { if c.Config.Debug { c.logger.Printf("received gNMI Subscribe Response: %+v", rsp) } + m := outputs.Meta{"source": t.Config.Name, "format": c.Config.Format, "subscription-name": rsp.SubscriptionName} if c.subscriptionMode(rsp.SubscriptionName) == "ONCE" { - t.Export(ctx, rsp.Response, outputs.Meta{"source": t.Config.Name, "format": c.Config.Format, "subscription-name": rsp.SubscriptionName}) + c.Export(ctx, rsp.Response, m, t.Config.Outputs...) } else { - go t.Export(ctx, rsp.Response, outputs.Meta{"source": t.Config.Name, "format": c.Config.Format, "subscription-name": rsp.SubscriptionName}) + //go t.Export(ctx, rsp.Response, outputs.Meta{"source": t.Config.Name, "format": c.Config.Format, "subscription-name": rsp.SubscriptionName}) + go c.Export(ctx, rsp.Response, m, t.Config.Outputs...) } if remainingOnceSubscriptions > 0 { if c.subscriptionMode(rsp.SubscriptionName) == "ONCE" { @@ -300,3 +295,31 @@ func (c *Collector) subscriptionMode(name string) string { } return "" } + +func (c *Collector) Export(ctx context.Context, rsp *gnmi.SubscribeResponse, m outputs.Meta, outs ...string) { + if rsp == nil { + return + } + wg := new(sync.WaitGroup) + if len(outs) == 0 { + wg.Add(len(c.Outputs)) + for _, o := range c.Outputs { + go func(o outputs.Output) { + defer wg.Done() + o.Write(ctx, rsp, m) + }(o) + } + wg.Wait() + return + } + for _, name := range outs { + if o, ok := c.Outputs[name]; ok { + wg.Add(1) + go func(o outputs.Output) { + defer wg.Done() + o.Write(ctx, rsp, m) + }(o) + } + } + wg.Wait() +} diff --git a/collector/target.go b/collector/target.go index 3b9ac6ed..27a61e22 100644 --- a/collector/target.go +++ b/collector/target.go @@ -13,7 +13,6 @@ import ( "sync" "time" - "github.com/karimra/gnmic/outputs" "github.com/openconfig/gnmi/proto/gnmi" "github.com/openconfig/gnmi/proto/gnmi_ext" "google.golang.org/grpc" @@ -33,8 +32,7 @@ type TargetError struct { // Target represents a gNMI enabled box type Target struct { Config *TargetConfig - Subscriptions []*SubscriptionConfig - Outputs []outputs.Output + Subscriptions map[string]*SubscriptionConfig m *sync.Mutex Client gnmi.GNMIClient @@ -74,8 +72,7 @@ func (tc *TargetConfig) String() string { func NewTarget(c *TargetConfig) *Target { t := &Target{ Config: c, - Subscriptions: make([]*SubscriptionConfig, 0), - Outputs: make([]outputs.Output, 0), + Subscriptions: make(map[string]*SubscriptionConfig), m: new(sync.Mutex), SubscribeClients: make(map[string]gnmi.GNMI_SubscribeClient), PollChan: make(chan string), @@ -83,7 +80,6 @@ func NewTarget(c *TargetConfig) *Target { Errors: make(chan *TargetError), } return t - } // NewTLS // @@ -273,22 +269,6 @@ SUBSC: } } -// Export // -func (t *Target) Export(ctx context.Context, rsp *gnmi.SubscribeResponse, m outputs.Meta) { - if rsp == nil || len(t.Outputs) == 0 { - return - } - wg := new(sync.WaitGroup) - wg.Add(len(t.Outputs)) - for _, o := range t.Outputs { - go func(o outputs.Output) { - defer wg.Done() - o.Write(ctx, rsp, m) - }(o) - } - wg.Wait() -} - func loadCerts(tlscfg *tls.Config, c *TargetConfig) error { if *c.TLSCert != "" && *c.TLSKey != "" { certificate, err := tls.LoadX509KeyPair(*c.TLSCert, *c.TLSKey)