Skip to content

Commit

Permalink
change getOutputs() and use it in subscribe and listen cmds
Browse files Browse the repository at this point in the history
  • Loading branch information
karimra committed Oct 27, 2020
1 parent 661de56 commit 26f67a0
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 46 deletions.
27 changes: 16 additions & 11 deletions cmd/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,24 @@ var listenCmd = &cobra.Command{
if len(address) > 1 {
fmt.Printf("multiple addresses specified, listening only on %s\n", address[0])
}
var err error
server.Outputs, err = getOutputs(ctx)
server.Outputs = make(map[string]outputs.Output)
outCfgs, err := getOutputs()
if err != nil {
return err
}
for name, outConf := range outCfgs {
if outType, ok := outConf["type"]; ok {
if initializer, ok := outputs.Outputs[outType.(string)]; ok {
out := initializer()
go out.Init(ctx, outConf, logger)
server.Outputs[name] = out
}
}
}

defer func() {
for _, outputs := range server.Outputs {
for _, o := range outputs {
o.Close()
}
for _, o := range server.Outputs {
o.Close()
}
}()
server.listener, err = net.Listen("tcp", address[0])
Expand Down Expand Up @@ -126,7 +133,7 @@ func init() {
type dialoutTelemetryServer struct {
listener net.Listener
grpcServer *grpc.Server
Outputs map[string][]outputs.Output
Outputs map[string]outputs.Output

ctx context.Context
}
Expand Down Expand Up @@ -189,10 +196,8 @@ func (s *dialoutTelemetryServer) Publish(stream nokiasros.DialoutTelemetry_Publi
// logger.Printf("failed to format subscribe response: %v", err)
// continue
// }
for _, outputs := range s.Outputs {
for _, o := range outputs {
go o.Write(s.ctx, subResp, outMeta)
}
for _, o := range s.Outputs {
go o.Write(s.ctx, subResp, outMeta)
}
// buff := new(bytes.Buffer)
// err = json.Indent(buff, b, "", " ")
Expand Down
59 changes: 24 additions & 35 deletions cmd/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ var subscribeCmd = &cobra.Command{
if debug {
logger.Printf("subscriptions: %s", subscriptionsConfig)
}
outs, err := getOutputs(ctx)
outs, err := getOutputs()
if err != nil {
return err
}
Expand All @@ -100,7 +100,7 @@ var subscribeCmd = &cobra.Command{
coll := collector.NewCollector(cfg, targetsConfig,
collector.WithDialOptions(createCollectorDialOpts()),
collector.WithSubscriptions(subscriptionsConfig),
collector.WithOutputs(outs),
collector.WithOutputs(ctx, outs, logger),
collector.WithLogger(logger))

wg := new(sync.WaitGroup)
Expand Down Expand Up @@ -245,58 +245,47 @@ func initSubscribeFlags(cmd *cobra.Command) {
viper.BindPFlag("subscribe-output", cmd.LocalFlags().Lookup("output"))
}

func getOutputs(ctx context.Context) (map[string][]outputs.Output, error) {
func getOutputs() (map[string]map[string]interface{}, error) {
outDef := viper.GetStringMap("outputs")
if len(outDef) == 0 && !viper.GetBool("quiet") {
stdoutConfig := map[string]interface{}{
"type": "file",
"file-type": "stdout",
"format": viper.GetString("format"),
}
outDef["stdout"] = []interface{}{stdoutConfig}
outDef["default-stdout"] = stdoutConfig
}
outputDestinations := make(map[string][]outputs.Output)
for name, d := range outDef {
dl := convert(d)
switch outs := dl.(type) {
case []interface{}:
for _, ou := range outs {
switch ou := ou.(type) {
case map[string]interface{}:
if outType, ok := ou["type"]; ok {
if initializer, ok := outputs.Outputs[outType.(string)]; ok {
format, ok := ou["format"]
if !ok || (ok && format == "") {
ou["format"] = viper.GetString("format")
}
o := initializer()
go o.Init(ctx, ou, logger)
if outputDestinations[name] == nil {
outputDestinations[name] = make([]outputs.Output, 0)
}
outputDestinations[name] = append(outputDestinations[name], o)
continue
}
logger.Printf("unknown output type '%s'", outType)
continue
outputsConfigs := make(map[string]map[string]interface{})
for name, outputCfg := range outDef {
outputCfgconv := convert(outputCfg)
switch outCfg := outputCfgconv.(type) {
case map[string]interface{}:
if outType, ok := outCfg["type"]; ok {
if _, ok := outputs.Outputs[outType.(string)]; ok {
format, ok := outCfg["format"]
if !ok || (ok && format == "") {
outCfg["format"] = viper.GetString("format")
}
logger.Printf("missing output 'type' under %v", ou)
default:
logger.Printf("unknown configuration format expecting a map[string]interface{}: got %T : %v", d, d)
outputsConfigs[name] = outCfg
continue
}
logger.Printf("unknown output type '%s'", outType)
continue
}
logger.Printf("missing output 'type' under %v", outCfg)
default:
return nil, fmt.Errorf("unknown configuration format: %T : %v", d, d)
logger.Printf("unknown configuration format expecting a map[string]interface{}: got %T : %v", outCfg, outCfg)
}
}

namedOutputs := viper.GetStringSlice("subscribe-output")
if len(namedOutputs) == 0 {
return outputDestinations, nil
return outputsConfigs, nil
}
filteredOutputs := make(map[string][]outputs.Output)
filteredOutputs := make(map[string]map[string]interface{})
notFound := make([]string, 0)
for _, name := range namedOutputs {
if o, ok := outputDestinations[name]; ok {
if o, ok := outputsConfigs[name]; ok {
filteredOutputs[name] = o
} else {
notFound = append(notFound, name)
Expand Down

0 comments on commit 26f67a0

Please sign in to comment.