From 26f67a02209f54141bc0eada55e54eb02c203117 Mon Sep 17 00:00:00 2001 From: Karim Radhouani Date: Tue, 27 Oct 2020 19:40:13 +0800 Subject: [PATCH] change getOutputs() and use it in subscribe and listen cmds --- cmd/listen.go | 27 +++++++++++++--------- cmd/subscribe.go | 59 ++++++++++++++++++++---------------------------- 2 files changed, 40 insertions(+), 46 deletions(-) diff --git a/cmd/listen.go b/cmd/listen.go index c25fe36e..a3f6039c 100644 --- a/cmd/listen.go +++ b/cmd/listen.go @@ -52,17 +52,24 @@ var listenCmd = &cobra.Command{ if len(address) > 1 { fmt.Printf("multiple addresses specified, listening only on %s\n", address[0]) } - var err error - server.Outputs, err = getOutputs(ctx) + server.Outputs = make(map[string]outputs.Output) + outCfgs, err := getOutputs() if err != nil { return err } + for name, outConf := range outCfgs { + if outType, ok := outConf["type"]; ok { + if initializer, ok := outputs.Outputs[outType.(string)]; ok { + out := initializer() + go out.Init(ctx, outConf, logger) + server.Outputs[name] = out + } + } + } defer func() { - for _, outputs := range server.Outputs { - for _, o := range outputs { - o.Close() - } + for _, o := range server.Outputs { + o.Close() } }() server.listener, err = net.Listen("tcp", address[0]) @@ -126,7 +133,7 @@ func init() { type dialoutTelemetryServer struct { listener net.Listener grpcServer *grpc.Server - Outputs map[string][]outputs.Output + Outputs map[string]outputs.Output ctx context.Context } @@ -189,10 +196,8 @@ func (s *dialoutTelemetryServer) Publish(stream nokiasros.DialoutTelemetry_Publi // logger.Printf("failed to format subscribe response: %v", err) // continue // } - for _, outputs := range s.Outputs { - for _, o := range outputs { - go o.Write(s.ctx, subResp, outMeta) - } + for _, o := range s.Outputs { + go o.Write(s.ctx, subResp, outMeta) } // buff := new(bytes.Buffer) // err = json.Indent(buff, b, "", " ") diff --git a/cmd/subscribe.go b/cmd/subscribe.go index 57c98371..8b7c5ac0 100644 --- a/cmd/subscribe.go +++ b/cmd/subscribe.go @@ -81,7 +81,7 @@ var subscribeCmd = &cobra.Command{ if debug { logger.Printf("subscriptions: %s", subscriptionsConfig) } - outs, err := getOutputs(ctx) + outs, err := getOutputs() if err != nil { return err } @@ -100,7 +100,7 @@ var subscribeCmd = &cobra.Command{ coll := collector.NewCollector(cfg, targetsConfig, collector.WithDialOptions(createCollectorDialOpts()), collector.WithSubscriptions(subscriptionsConfig), - collector.WithOutputs(outs), + collector.WithOutputs(ctx, outs, logger), collector.WithLogger(logger)) wg := new(sync.WaitGroup) @@ -245,7 +245,7 @@ func initSubscribeFlags(cmd *cobra.Command) { viper.BindPFlag("subscribe-output", cmd.LocalFlags().Lookup("output")) } -func getOutputs(ctx context.Context) (map[string][]outputs.Output, error) { +func getOutputs() (map[string]map[string]interface{}, error) { outDef := viper.GetStringMap("outputs") if len(outDef) == 0 && !viper.GetBool("quiet") { stdoutConfig := map[string]interface{}{ @@ -253,50 +253,39 @@ func getOutputs(ctx context.Context) (map[string][]outputs.Output, error) { "file-type": "stdout", "format": viper.GetString("format"), } - outDef["stdout"] = []interface{}{stdoutConfig} + outDef["default-stdout"] = stdoutConfig } - outputDestinations := make(map[string][]outputs.Output) - for name, d := range outDef { - dl := convert(d) - switch outs := dl.(type) { - case []interface{}: - for _, ou := range outs { - switch ou := ou.(type) { - case map[string]interface{}: - if outType, ok := ou["type"]; ok { - if initializer, ok := outputs.Outputs[outType.(string)]; ok { - format, ok := ou["format"] - if !ok || (ok && format == "") { - ou["format"] = viper.GetString("format") - } - o := initializer() - go o.Init(ctx, ou, logger) - if outputDestinations[name] == nil { - outputDestinations[name] = make([]outputs.Output, 0) - } - outputDestinations[name] = append(outputDestinations[name], o) - continue - } - logger.Printf("unknown output type '%s'", outType) - continue + outputsConfigs := make(map[string]map[string]interface{}) + for name, outputCfg := range outDef { + outputCfgconv := convert(outputCfg) + switch outCfg := outputCfgconv.(type) { + case map[string]interface{}: + if outType, ok := outCfg["type"]; ok { + if _, ok := outputs.Outputs[outType.(string)]; ok { + format, ok := outCfg["format"] + if !ok || (ok && format == "") { + outCfg["format"] = viper.GetString("format") } - logger.Printf("missing output 'type' under %v", ou) - default: - logger.Printf("unknown configuration format expecting a map[string]interface{}: got %T : %v", d, d) + outputsConfigs[name] = outCfg + continue } + logger.Printf("unknown output type '%s'", outType) + continue } + logger.Printf("missing output 'type' under %v", outCfg) default: - return nil, fmt.Errorf("unknown configuration format: %T : %v", d, d) + logger.Printf("unknown configuration format expecting a map[string]interface{}: got %T : %v", outCfg, outCfg) } } + namedOutputs := viper.GetStringSlice("subscribe-output") if len(namedOutputs) == 0 { - return outputDestinations, nil + return outputsConfigs, nil } - filteredOutputs := make(map[string][]outputs.Output) + filteredOutputs := make(map[string]map[string]interface{}) notFound := make([]string, 0) for _, name := range namedOutputs { - if o, ok := outputDestinations[name]; ok { + if o, ok := outputsConfigs[name]; ok { filteredOutputs[name] = o } else { notFound = append(notFound, name)