Skip to content

Commit

Permalink
feat: support type label for volume for backward compatibility (#2132)
Browse files Browse the repository at this point in the history
* feat: support type label for volume for backward compatibility

* feat: support type label for volume for backward compatibility

* feat: support type label for volume for backward compatibility
  • Loading branch information
rahulguptajss authored Jun 12, 2023
1 parent c60d3e5 commit 42a7b38
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 49 deletions.
65 changes: 41 additions & 24 deletions cmd/collectors/restperf/plugins/volume/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,35 @@ import (

type Volume struct {
*plugin.AbstractPlugin
styleType string
}

func New(p *plugin.AbstractPlugin) plugin.Plugin {
return &Volume{AbstractPlugin: p}
}

func (me *Volume) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matrix, error) {
func (v *Volume) Init() error {
var err error

if err = v.InitAbc(); err != nil {
return err
}

v.styleType = "style"

if v.Params.HasChildS("historicalLabels") {
v.styleType = "type"
}
return nil
}

func (v *Volume) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matrix, error) {

var (
err error
)
data := dataMap[me.Object]
data := dataMap[v.Object]
style := v.styleType
opsKeyPrefix := "temp_"

re := regexp.MustCompile(`^(.*)__(\d{4})$`)
Expand All @@ -35,10 +52,10 @@ func (me *Volume) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matrix, erro

metric, err := volumeAggrmetric.NewMetricFloat64(metricName)
if err != nil {
me.Logger.Error().Err(err).Msg("add metric")
v.Logger.Error().Err(err).Msg("add metric")
return nil, err
}
me.Logger.Trace().Msgf("added metric: (%s) %v", metricName, metric)
v.Logger.Trace().Msgf("added metric: (%s) %v", metricName, metric)

cache := data.Clone(false, true, false)
cache.UUID += ".Volume"
Expand All @@ -55,7 +72,7 @@ func (me *Volume) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matrix, erro
// Flexgroup don't show any aggregate, node
fg.SetLabel("aggr", "")
fg.SetLabel("node", "")
fg.SetLabel("style", "flexgroup")
fg.SetLabel(style, "flexgroup")
}

if volumeAggrmetric.GetInstance(key) == nil {
Expand All @@ -64,32 +81,32 @@ func (me *Volume) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matrix, erro
flexgroupInstance.SetLabel("volume", match[1])
// Flexgroup don't show any node
flexgroupInstance.SetLabel("node", "")
flexgroupInstance.SetLabel("style", "flexgroup")
flexgroupInstance.SetLabel(style, "flexgroup")
flexgroupAggrsMap[key] = set.New()
if err := metric.SetValueFloat64(flexgroupInstance, 1); err != nil {
me.Logger.Error().Err(err).Str("metric", metricName).Msg("Unable to set value on metric")
v.Logger.Error().Err(err).Str("metric", metricName).Msg("Unable to set value on metric")
}
}
flexgroupAggrsMap[key].Add(i.GetLabel("aggr"))
i.SetLabel("style", "flexgroup_constituent")
i.SetLabel(style, "flexgroup_constituent")
i.SetExportable(false)
} else {
i.SetLabel("style", "flexvol")
i.SetLabel(style, "flexvol")
key := i.GetLabel("svm") + "." + i.GetLabel("volume")
flexvolInstance, err := volumeAggrmetric.NewInstance(key)
if err != nil {
me.Logger.Error().Err(err).Str("key", key).Msg("Failed to create new instance")
v.Logger.Error().Err(err).Str("key", key).Msg("Failed to create new instance")
continue
}
flexvolInstance.SetLabels(i.GetLabels().Copy())
flexvolInstance.SetLabel("style", "flexvol")
flexvolInstance.SetLabel(style, "flexvol")
if err := metric.SetValueFloat64(flexvolInstance, 1); err != nil {
me.Logger.Error().Err(err).Str("metric", metricName).Msg("Unable to set value on metric")
v.Logger.Error().Err(err).Str("metric", metricName).Msg("Unable to set value on metric")
}
}
}

me.Logger.Debug().Int("flexgroup volume count", len(cache.GetInstances())).Msg("")
v.Logger.Debug().Int("flexgroup volume count", len(cache.GetInstances())).Msg("")

//cache.Reset()

Expand All @@ -110,7 +127,7 @@ func (me *Volume) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matrix, erro

fg := cache.GetInstance(key)
if fg == nil {
me.Logger.Error().Msgf("instance [%s] not in local cache", key)
v.Logger.Error().Msgf("instance [%s] not in local cache", key)
continue
}

Expand All @@ -122,11 +139,11 @@ func (me *Volume) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matrix, erro

fgm := cache.GetMetric(mkey)
if fgm == nil {
me.Logger.Error().Msgf("metric [%s] not in local cache", mkey)
v.Logger.Error().Msgf("metric [%s] not in local cache", mkey)
continue
}

me.Logger.Trace().Msgf("(%s) handling metric (%s)", fg.GetLabel("volume"), mkey)
v.Logger.Trace().Msgf("(%s) handling metric (%s)", fg.GetLabel("volume"), mkey)

if value, ok := m.GetValueFloat64(i); ok {

Expand All @@ -137,12 +154,12 @@ func (me *Volume) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matrix, erro

err := fgm.SetValueFloat64(fg, fgv+value)
if err != nil {
me.Logger.Error().Err(err).Msg("error")
v.Logger.Error().Err(err).Msg("error")
}
// just for debugging
fgv2, _ := fgm.GetValueFloat64(fg)

me.Logger.Trace().Msgf(" > simple increment %f + %f = %f", fgv, value, fgv2)
v.Logger.Trace().Msgf(" > simple increment %f + %f = %f", fgv, value, fgv2)
continue
}

Expand All @@ -151,7 +168,7 @@ func (me *Volume) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matrix, erro
if strings.Contains(mkey, "_latency") {
opsKey = m.GetComment()
}
me.Logger.Trace().Msgf(" > weighted increment <%s * %s>", mkey, opsKey)
v.Logger.Trace().Msgf(" > weighted increment <%s * %s>", mkey, opsKey)

if ops := data.GetMetric(opsKey); ops != nil {
if opsValue, ok := ops.GetValueFloat64(i); ok {
Expand All @@ -175,20 +192,20 @@ func (me *Volume) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matrix, erro
if value != 0 {
err = tempOps.SetValueFloat64(fg, tempOpsV+opsValue)
if err != nil {
me.Logger.Error().Err(err).Msg("error")
v.Logger.Error().Err(err).Msg("error")
}
}
err = fgm.SetValueFloat64(fg, fgv+prod)
if err != nil {
me.Logger.Error().Err(err).Msg("error")
v.Logger.Error().Err(err).Msg("error")
}

// debugging
fgv2, _ := fgm.GetValueFloat64(fg)

me.Logger.Trace().Msgf(" %f + (%f * %f) (=%f) = %f", fgv, value, opsValue, prod, fgv2)
v.Logger.Trace().Msgf(" %f + (%f * %f) (=%f) = %f", fgv, value, opsValue, prod, fgv2)
} else {
me.Logger.Trace().Msg(" no ops value SKIP")
v.Logger.Trace().Msg(" no ops value SKIP")
}
}

Expand All @@ -215,7 +232,7 @@ func (me *Volume) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matrix, erro
if opsValue, ok := ops.GetValueFloat64(i); ok && opsValue != 0 {
err := m.SetValueFloat64(i, value/opsValue)
if err != nil {
me.Logger.Error().Err(err).Msgf("error")
v.Logger.Error().Err(err).Msgf("error")
}
} else {
m.SetValueNAN(i)
Expand Down
66 changes: 42 additions & 24 deletions cmd/collectors/zapiperf/plugins/volume/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,40 @@ import (

type Volume struct {
*plugin.AbstractPlugin
styleType string
}

func New(p *plugin.AbstractPlugin) plugin.Plugin {
return &Volume{AbstractPlugin: p}
}

func (v *Volume) Init() error {
var err error

if err = v.InitAbc(); err != nil {
return err
}

v.styleType = "style"

if v.Params.HasChildS("historicalLabels") {
v.styleType = "type"
}
return nil
}

//@TODO cleanup logging
//@TODO rewrite using vector arithmetic
// will simplify the code a whole!!!

func (me *Volume) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matrix, error) {
func (v *Volume) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matrix, error) {

var (
err error
)
data := dataMap[me.Object]

data := dataMap[v.Object]
style := v.styleType
opsKeyPrefix := "temp_"
re := regexp.MustCompile(`^(.*)__(\d{4})$`)

Expand All @@ -42,10 +60,10 @@ func (me *Volume) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matrix, erro

metric, err := volumeAggrmetric.NewMetricFloat64(metricName)
if err != nil {
me.Logger.Error().Err(err).Msg("add metric")
v.Logger.Error().Err(err).Msg("add metric")
return nil, err
}
me.Logger.Trace().Msgf("added metric: (%s) %v", metricName, metric)
v.Logger.Trace().Msgf("added metric: (%s) %v", metricName, metric)

cache := data.Clone(false, true, false)
cache.UUID += ".Volume"
Expand All @@ -62,7 +80,7 @@ func (me *Volume) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matrix, erro
// Flexgroup don't show any aggregate, node
fg.SetLabel("aggr", "")
fg.SetLabel("node", "")
fg.SetLabel("style", "flexgroup")
fg.SetLabel(style, "flexgroup")
}

if volumeAggrmetric.GetInstance(key) == nil {
Expand All @@ -71,33 +89,33 @@ func (me *Volume) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matrix, erro
flexgroupInstance.SetLabel("volume", match[1])
// Flexgroup don't show any node
flexgroupInstance.SetLabel("node", "")
flexgroupInstance.SetLabel("style", "flexgroup")
flexgroupInstance.SetLabel(style, "flexgroup")
flexgroupAggrsMap[key] = set.New()
if err := metric.SetValueFloat64(flexgroupInstance, 1); err != nil {
me.Logger.Error().Err(err).Str("metric", metricName).Msg("Unable to set value on metric")
v.Logger.Error().Err(err).Str("metric", metricName).Msg("Unable to set value on metric")
}
}
flexgroupAggrsMap[key].Add(i.GetLabel("aggr"))
i.SetLabel("style", "flexgroup_constituent")
i.SetLabel(style, "flexgroup_constituent")
i.SetExportable(false)
} else {
i.SetLabel("style", "flexvol")
i.SetLabel(style, "flexvol")
key := i.GetLabel("svm") + "." + i.GetLabel("volume")
flexvolInstance, err := volumeAggrmetric.NewInstance(key)
if err != nil {
me.Logger.Error().Err(err).Str("key", key).Msg("Failed to create new instance")
v.Logger.Error().Err(err).Str("key", key).Msg("Failed to create new instance")
continue
}
flexvolInstance.SetLabels(i.GetLabels().Copy())
flexvolInstance.SetLabel("style", "flexvol")
flexvolInstance.SetLabel(style, "flexvol")
if err := metric.SetValueFloat64(flexvolInstance, 1); err != nil {
me.Logger.Error().Err(err).Str("metric", metricName).Msg("Unable to set value on metric")
v.Logger.Error().Err(err).Str("metric", metricName).Msg("Unable to set value on metric")
}
}

}

me.Logger.Debug().Msgf("extracted %d flexgroup volumes", len(cache.GetInstances()))
v.Logger.Debug().Msgf("extracted %d flexgroup volumes", len(cache.GetInstances()))

//cache.Reset()

Expand All @@ -118,7 +136,7 @@ func (me *Volume) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matrix, erro

fg := cache.GetInstance(key)
if fg == nil {
me.Logger.Error().Err(nil).Msgf("instance [%s] not in local cache", key)
v.Logger.Error().Err(nil).Msgf("instance [%s] not in local cache", key)
continue
}

Expand All @@ -130,11 +148,11 @@ func (me *Volume) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matrix, erro

fgm := cache.GetMetric(mkey)
if fgm == nil {
me.Logger.Error().Err(nil).Msgf("metric [%s] not in local cache", mkey)
v.Logger.Error().Err(nil).Msgf("metric [%s] not in local cache", mkey)
continue
}

me.Logger.Trace().Msgf("(%s) handling metric (%s)", fg.GetLabel("volume"), mkey)
v.Logger.Trace().Msgf("(%s) handling metric (%s)", fg.GetLabel("volume"), mkey)

if value, ok := m.GetValueFloat64(i); ok {

Expand All @@ -145,12 +163,12 @@ func (me *Volume) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matrix, erro

err := fgm.SetValueFloat64(fg, fgv+value)
if err != nil {
me.Logger.Error().Err(err).Msg("error")
v.Logger.Error().Err(err).Msg("error")
}
// just for debugging
fgv2, _ := fgm.GetValueFloat64(fg)

me.Logger.Trace().Msgf(" > simple increment %f + %f = %f", fgv, value, fgv2)
v.Logger.Trace().Msgf(" > simple increment %f + %f = %f", fgv, value, fgv2)
continue
}

Expand All @@ -159,7 +177,7 @@ func (me *Volume) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matrix, erro
if strings.Contains(mkey, "_latency") {
opsKey = m.GetComment()
}
me.Logger.Trace().Msgf(" > weighted increment <%s * %s>", mkey, opsKey)
v.Logger.Trace().Msgf(" > weighted increment <%s * %s>", mkey, opsKey)

if ops := data.GetMetric(opsKey); ops != nil {
if opsValue, ok := ops.GetValueFloat64(i); ok {
Expand All @@ -183,20 +201,20 @@ func (me *Volume) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matrix, erro
if value != 0 {
err = tempOps.SetValueFloat64(fg, tempOpsV+opsValue)
if err != nil {
me.Logger.Error().Err(err).Msg("error")
v.Logger.Error().Err(err).Msg("error")
}
}
err = fgm.SetValueFloat64(fg, fgv+prod)
if err != nil {
me.Logger.Error().Err(err).Msg("error")
v.Logger.Error().Err(err).Msg("error")
}

// debugging
fgv2, _ := fgm.GetValueFloat64(fg)

me.Logger.Trace().Msgf(" %f + (%f * %f) (=%f) = %f", fgv, value, opsValue, prod, fgv2)
v.Logger.Trace().Msgf(" %f + (%f * %f) (=%f) = %f", fgv, value, opsValue, prod, fgv2)
} else {
me.Logger.Trace().Msg(" no ops value SKIP")
v.Logger.Trace().Msg(" no ops value SKIP")
}
}
}
Expand All @@ -222,7 +240,7 @@ func (me *Volume) Run(dataMap map[string]*matrix.Matrix) ([]*matrix.Matrix, erro
if opsValue, ok := ops.GetValueFloat64(i); ok && opsValue != 0 {
err := m.SetValueFloat64(i, value/opsValue)
if err != nil {
me.Logger.Error().Err(err).Msgf("error")
v.Logger.Error().Err(err).Msgf("error")
}
} else {
m.SetValueNAN(i)
Expand Down
2 changes: 1 addition & 1 deletion cmd/tools/template/template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ func newZapiMetric(n *y3.Node, parents []string) metric {
return m
}

var setRe = regexp.MustCompile(`[sS]etLabel\("(\w+)",`)
var setRe = regexp.MustCompile(`[sS]etLabel\("?(\w+)"?,`)

func findCustomPlugins(path string, template *node.Node, model *TemplateModel) error {
plug := template.SearchChildren([]string{"plugins"})
Expand Down

0 comments on commit 42a7b38

Please sign in to comment.