Skip to content

Commit

Permalink
Add ability to pass durations for as arguments to WITH template funct…
Browse files Browse the repository at this point in the history
…ions

Support using WITH template vars in the following places where duration can be passed:

- Lookbehind windows in square brackets: `with (w=5m) m[w]` is transformed to `m[5m]`
- Steps in square brackets: `with (step=5m) m[1h:step]` is transformed to `m[1h:5m]`
- Offsets: `with (off=5m) m offset off` is transformed to `m offset 5m`

Updates VictoriaMetrics/VictoriaMetrics#4025
Updates #12

Thanks to @lujiajing1126 for the initial implementation at #13

Note that this feature doesn't allow specifying dynamic durations in the following way:

  with (w = ((day_of_month()-1) * 24 + hour()) + "h") m[w]

It allows using only static durations

Support for dynamic durations requires significant refactoring of the code responsible for calculating rollups in `/api/v1/query_range` handler.
It will be needed to use different lookbehind windows per each calculated data point per each `step`. Currently the code assumes
that the lookbehind window is static across every calculated data point.
  • Loading branch information
valyala committed Jul 19, 2023
1 parent cd99b0b commit f3dd382
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 20 deletions.
5 changes: 5 additions & 0 deletions lexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ func (lex *lexer) Init(s string) {
lex.sTail = s
}

func (lex *lexer) PushBack(currToken, sHead string) {
lex.Token = currToken
lex.sTail = sHead + lex.sTail
}

func (lex *lexer) Next() error {
if lex.err != nil {
return lex.err
Expand Down
146 changes: 126 additions & 20 deletions parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,18 @@ func expandWithExpr(was []*withArgExpr, e Expr) (Expr, error) {
}
re := *t
re.Expr = eNew
re.Window, err = expandDuration(was, re.Window)
if err != nil {
return nil, fmt.Errorf("cannot parse window for %s: %w", re.Expr.AppendString(nil), err)
}
re.Step, err = expandDuration(was, re.Step)
if err != nil {
return nil, fmt.Errorf("cannot parse step in %s: %w", re.Expr.AppendString(nil), err)
}
re.Offset, err = expandDuration(was, re.Offset)
if err != nil {
return nil, fmt.Errorf("cannot parse offset in %s: %w", re.Expr.AppendString(nil), err)
}
if t.At != nil {
atNew, err := expandWithExpr(was, t.At)
if err != nil {
Expand Down Expand Up @@ -828,7 +840,7 @@ func expandWithExpr(was []*withArgExpr, e Expr) (Expr, error) {
lfe.Label, t.AppendString(nil), eNew.AppendString(nil))
}
if len(wme.labelFilterss) > 0 {
panic(fmt.Errorf("BUG: wme.labelFilterss must be empty after WITH template expansion; got %s", wme.labelFilterss))
panic(fmt.Errorf("BUG: wme.labelFilterss must be empty after WITH template expansion; got %s", wme.AppendString(nil)))
}
lfssSrc := wme.LabelFilterss
if len(lfssSrc) > 1 {
Expand Down Expand Up @@ -888,7 +900,7 @@ func expandWithExpr(was []*withArgExpr, e Expr) (Expr, error) {
return nil, fmt.Errorf("cannot expand %q to non-metric expression %q", t.AppendString(nil), eNew.AppendString(nil))
}
if len(wme.labelFilterss) > 0 {
panic(fmt.Errorf("BUG: wme.labelFilterss must be empty after WITH templates expansion; got %s", wme.labelFilterss))
panic(fmt.Errorf("BUG: wme.labelFilterss must be empty after WITH templates expansion; got %s", wme.AppendString(nil)))
}
lfssSrc := wme.LabelFilterss
var lfssNew [][]LabelFilter
Expand Down Expand Up @@ -945,6 +957,38 @@ func expandWithArgs(was []*withArgExpr, args []Expr) ([]Expr, error) {
return dstArgs, nil
}

func expandDuration(was []*withArgExpr, d *DurationExpr) (*DurationExpr, error) {
if d == nil {
return nil, nil
}
if !d.needsParsing {
return d, nil
}
wa := getWithArgExpr(was, d.s)
if wa == nil {
return nil, fmt.Errorf("cannot find WITH template for %q", d.s)
}
e, err := expandWithExprExt(was, wa, []Expr{})
if err != nil {
return nil, err
}
switch t := e.(type) {
case *DurationExpr:
if t.needsParsing {
panic(fmt.Errorf("BUG: DurationExpr %q must be already parsed", t.s))
}
return t, nil
case *NumberExpr:
// Convert number of seconds to DurationExpr
de := &DurationExpr{
s: t.s,
}
return de, nil
default:
return nil, fmt.Errorf("unexpected value for WITH template %q; got %s; want duration", d.s, e.AppendString(nil))
}
}

func expandModifierArgs(was []*withArgExpr, args []string) ([]string, error) {
if len(args) == 0 {
return nil, nil
Expand Down Expand Up @@ -1335,8 +1379,24 @@ type labelFilterExpr struct {
IsNegative bool
}

func (lfe *labelFilterExpr) String() string {
return fmt.Sprintf("[label=%q, value=%+v, isRegexp=%v, isNegative=%v]", lfe.Label, lfe.Value, lfe.IsRegexp, lfe.IsNegative)
func (lfe *labelFilterExpr) AppendString(dst []byte) []byte {
dst = appendEscapedIdent(dst, lfe.Label)
if lfe.Value == nil {
return dst
}
dst = appendLabelFilterOp(dst, lfe.IsNegative, lfe.IsRegexp)
tokens := lfe.Value.tokens
if len(tokens) == 0 {
dst = strconv.AppendQuote(dst, lfe.Value.S)
return dst
}
for i, token := range tokens {
dst = append(dst, token...)
if i+1 < len(tokens) {
dst = append(dst, '+')
}
}
return dst
}

func (lfe *labelFilterExpr) toLabelFilter() (*LabelFilter, error) {
Expand Down Expand Up @@ -1452,13 +1512,28 @@ func (p *parser) parseDuration() (*DurationExpr, error) {

func (p *parser) parsePositiveDuration() (*DurationExpr, error) {
s := p.lex.Token
if isIdentPrefix(s) {
n := strings.IndexByte(s, ':')
if n >= 0 {
p.lex.PushBack(s[:n], s[n:])
s = s[:n]
}
if err := p.lex.Next(); err != nil {
return nil, err
}
de := &DurationExpr{
s: s,
needsParsing: true,
}
return de, nil
}
if isPositiveDuration(s) {
if err := p.lex.Next(); err != nil {
return nil, err
}
} else {
if !isPositiveNumberPrefix(s) {
return nil, fmt.Errorf(`duration: unexpected token %q; want "duration"`, s)
return nil, fmt.Errorf(`duration: unexpected token %q; want valid duration`, s)
}
// Verify the duration in seconds without explicit suffix.
if _, err := p.parsePositiveNumberExpr(); err != nil {
Expand All @@ -1478,13 +1553,19 @@ func (p *parser) parsePositiveDuration() (*DurationExpr, error) {
// DurationExpr contains the duration
type DurationExpr struct {
s string

// needsParsing is set to true if s isn't parsed yet with expandWithExpr()
needsParsing bool
}

// AppendString appends string representation of de to dst and returns the result.
func (de *DurationExpr) AppendString(dst []byte) []byte {
if de == nil {
return dst
}
if de.needsParsing {
panic(fmt.Errorf("BUG: duration %q must be already parsed with expandWithExpr()", de.s))
}
return append(dst, de.s...)
}

Expand All @@ -1493,6 +1574,9 @@ func (de *DurationExpr) Duration(step int64) int64 {
if de == nil {
return 0
}
if de.needsParsing {
panic(fmt.Errorf("BUG: duration %q must be already parsed", de.s))
}
d, err := DurationValue(de.s, step)
if err != nil {
panic(fmt.Errorf("BUG: cannot parse duration %q: %s", de.s, err))
Expand Down Expand Up @@ -1617,6 +1701,9 @@ type StringExpr struct {

// AppendString appends string representation of se to dst and returns the result.
func (se *StringExpr) AppendString(dst []byte) []byte {
if len(se.tokens) > 0 {
panic(fmt.Errorf("BUG: StringExpr=%q must be already parsed with expandWithExpr()", se.tokens))
}
return strconv.AppendQuote(dst, se.S)
}

Expand Down Expand Up @@ -2037,25 +2124,24 @@ type LabelFilter struct {
// AppendString appends string representation of me to dst and returns the result.
func (lf *LabelFilter) AppendString(dst []byte) []byte {
dst = appendEscapedIdent(dst, lf.Label)
var op string
if lf.IsNegative {
if lf.IsRegexp {
op = "!~"
} else {
op = "!="
}
} else {
if lf.IsRegexp {
op = "=~"
} else {
op = "="
}
}
dst = append(dst, op...)
dst = appendLabelFilterOp(dst, lf.IsNegative, lf.IsRegexp)
dst = strconv.AppendQuote(dst, lf.Value)
return dst
}

func appendLabelFilterOp(dst []byte, isNegative, isRegexp bool) []byte {
if isNegative {
if isRegexp {
return append(dst, "!~"...)
}
return append(dst, "!="...)
}
if isRegexp {
return append(dst, "=~"...)
}
return append(dst, '=')
}

// MetricExpr represents MetricsQL metric with optional filters, i.e. `foo{...}`.
//
// Curly braces may contain or-delimited list of filters. For example:
Expand All @@ -2082,8 +2168,28 @@ type MetricExpr struct {
labelFilterss [][]*labelFilterExpr
}

func appendLabelFilterss(dst []byte, lfss [][]*labelFilterExpr) []byte {
dst = append(dst, '{')
for i, lfs := range lfss {
for j, lf := range lfs {
dst = lf.AppendString(dst)
if j+1 < len(lfs) {
dst = append(dst, ',')
}
}
if i+1 < len(lfss) {
dst = append(dst, " or "...)
}
}
dst = append(dst, '}')
return dst
}

// AppendString appends string representation of me to dst and returns the result.
func (me *MetricExpr) AppendString(dst []byte) []byte {
if len(me.labelFilterss) > 0 {
return appendLabelFilterss(dst, me.labelFilterss)
}
lfss := me.LabelFilterss
if len(lfss) == 0 {
dst = append(dst, "{}"...)
Expand Down
14 changes: 14 additions & 0 deletions parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,14 @@ func TestParseSuccess(t *testing.T) {
another(`with (foo\-bar(baz) = baz + baz) foo\-bar(x\*y)`, `x\*y + x\*y`)
another(`with (foo\-bar(b\ az) = b\ az + b\ az) foo\-bar(x\*y)`, `x\*y + x\*y`)

// withExpr and durations
another(`with (w=5m) w + m[w] offset w`, `5m + (m[5m] offset 5m)`)
another(`with (f() = 5m + rate(m{x="a"}[5m:1h] offset 1h)) f()`, `5m + rate(m{x="a"}[5m:1h] offset 1h)`)
another(`with (f(w1, w2) = w1 + rate(m{x="a"}[w1:w2] offset w2)) f(5m, 1h)`, `5m + rate(m{x="a"}[5m:1h] offset 1h)`)
another(`with (f(w) = m[w], f2(x) = f(x) / x) f2(5m)`, `m[5m] / 5m`)
another(`with (f(w) = m[w:w], f2(x) = f(x) / x) f2(5i)`, `m[5i:5i] / 5i`)
another(`with (f(w,w1) = m[w:w1], f2(x) = f(x, 23.34) / x) f2(123.456)`, `m[123.456:23.34] / 123.456`)

// withExpr and 'or' filters
another(`with (x={a="b"}) x{c="d" or q="w",r="t"}`, `{a="b",c="d" or a="b",q="w",r="t"}`)
another(`with (x={a="b"}) foo{x,bar="baz" or c="d",x}`, `foo{a="b",bar="baz" or c="d",a="b"}`)
Expand All @@ -438,6 +446,12 @@ func TestParseSuccess(t *testing.T) {
another(`with (x={a="b",c="d"}) {bar="baz",x or x,c="d",x}`, `{bar="baz",a="b",c="d" or a="b",c="d"}`)
another(`with (x={a="b" or c="d"}) x / x{e="f"}`, `{a="b" or c="d"} / {a="b",e="f" or c="d",e="f"}`)

// withExpr and group_left()/group_right() prefix
another(`with (f(x)=a + on() group_left(a,b) prefix x b) f("bar")`, `a + on() group_left(a,b) prefix "bar" b`)
another(`with (f(x)=a + on() group_left(a,b) prefix x+"foo" b) f("bar")`, `a + on() group_left(a,b) prefix "barfoo" b`)
another(`with (f(x)=a + on() group_left(a,b) prefix "foo"+x b) f("bar")`, `a + on() group_left(a,b) prefix "foobar" b`)
another(`with (f(x)=a + on() group_left(a,b) prefix "foo"+x+"baz" b) f("bar")`, `a + on() group_left(a,b) prefix "foobarbaz" b`)

// override ttf with something new
another(`with (ttf = a) ttf + b`, `a + b`)

Expand Down

0 comments on commit f3dd382

Please sign in to comment.