Skip to content

Commit

Permalink
Merge pull request #542 from nokia/fix409
Browse files Browse the repository at this point in the history
properly set deleted tags when using influxdb output
  • Loading branch information
karimra authored Nov 5, 2024
2 parents cc35c12 + 44eee4c commit 57eba92
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 5 deletions.
8 changes: 6 additions & 2 deletions pkg/outputs/influxdb_output/influxdb_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,20 @@ func (i *influxDBOutput) stopCache() {
func (i *influxDBOutput) runCache(ctx context.Context, name string) {
for {
select {
case <-ctx.Done():
return
case <-i.done:
return
case <-i.cacheTicker.C:
if i.Cfg.Debug {
i.logger.Printf("cache timer tick")
}
i.readCache(ctx, name)
i.readCache(ctx)
}
}
}

func (i *influxDBOutput) readCache(ctx context.Context, name string) {
func (i *influxDBOutput) readCache(ctx context.Context) {
notifications, err := i.gnmiCache.ReadAll()
if err != nil {
i.logger.Printf("failed to read from cache: %v", err)
Expand Down Expand Up @@ -85,6 +87,8 @@ func (i *influxDBOutput) readCache(ctx context.Context, name string) {

for _, ev := range events {
select {
case <-ctx.Done():
return
case <-i.reset:
return
case i.eventChan <- ev:
Expand Down
9 changes: 6 additions & 3 deletions pkg/outputs/influxdb_output/influxdb_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,10 @@ START:
i.logger.Printf("worker-%d terminating...", idx)
return
case ev := <-i.eventChan:
if len(ev.Values) == 0 || (len(ev.Deletes) == 0 && i.Cfg.DeleteTag != "") {
if len(ev.Values) == 0 && len(ev.Deletes) == 0 {
continue
}
if len(ev.Values) == 0 && i.Cfg.DeleteTag == "" {
continue
}
for n, v := range ev.Values {
Expand All @@ -368,7 +371,7 @@ START:
i.convertUints(ev)
writer.WritePoint(influxdb2.NewPoint(ev.Name, ev.Tags, ev.Values, time.Unix(0, ev.Timestamp)))
}

if len(ev.Deletes) > 0 && i.Cfg.DeleteTag != "" {
tags := make(map[string]string, len(ev.Tags))
for k, v := range ev.Tags {
Expand All @@ -377,7 +380,7 @@ START:
tags[i.Cfg.DeleteTag] = deleteTagValue
values := make(map[string]any, len(ev.Deletes))
for _, del := range ev.Deletes {
values[del] = 0
values[del] = ""
}
writer.WritePoint(influxdb2.NewPoint(ev.Name, tags, values, time.Unix(0, ev.Timestamp)))
}
Expand Down

0 comments on commit 57eba92

Please sign in to comment.