Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix distributed queries with explicit timestamp #510

Merged
merged 7 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions engine/distributed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ func TestDistributedAggregations(t *testing.T) {
{name: "subquery over distributed binary expression", query: `max_over_time((bar / bar)[30s:15s])`},
{name: "timestamp", query: `timestamp(bar)`},
{name: "timestamp - step invariant", query: `timestamp(bar @ 6000.000)`},
{name: "query with @start() absolute timestamp", query: `sum(bar @ start())`},
{name: "query with @end() timestamp", query: `sum(bar @ end())`},
{name: "query with numeric timestamp", query: `sum(bar @ 140.000)`},
{name: "query with range and @end() timestamp", query: `sum(count_over_time(bar[1h] @ end()))`, expectFallback: true},
{name: `subquery with @end() timestamp`, query: `bar @ 100.000 - bar @ 150.000`},
}

lookbackDeltas := []time.Duration{0, 30 * time.Second, 5 * time.Minute}
Expand Down
4 changes: 2 additions & 2 deletions execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func newDeduplication(ctx context.Context, e logicalplan.Deduplicate, scanners s

func newRemoteExecution(ctx context.Context, e logicalplan.RemoteExecution, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) {
// Create a new remote query scoped to the calculated start time.
qry, err := e.Engine.NewRangeQuery(ctx, promql.NewPrometheusQueryOpts(false, opts.LookbackDelta), e.Query, e.QueryRangeStart, opts.End, opts.Step)
qry, err := e.Engine.NewRangeQuery(ctx, promql.NewPrometheusQueryOpts(false, opts.LookbackDelta), e.Query, e.QueryRangeStart, e.QueryRangeEnd, opts.Step)
if err != nil {
return nil, err
}
Expand All @@ -386,7 +386,7 @@ func newRemoteExecution(ctx context.Context, e logicalplan.RemoteExecution, opts
// We need to set the lookback for the selector to 0 since the remote query already applies one lookback.
selectorOpts := *opts
selectorOpts.LookbackDelta = 0
remoteExec := remote.NewExecution(qry, model.NewVectorPool(opts.StepsBatch), e.QueryRangeStart, e.Engine.LabelSets(), &selectorOpts, hints)
remoteExec := remote.NewExecution(qry, model.NewVectorPool(opts.StepsBatch), e.QueryRangeStart, e.QueryRangeEnd, e.Engine.LabelSets(), &selectorOpts, hints)
return exchange.NewConcurrent(remoteExec, 2, opts), nil
}

Expand Down
9 changes: 6 additions & 3 deletions execution/remote/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,20 @@ type Execution struct {
query promql.Query
opts *query.Options
queryRangeStart time.Time
vectorSelector model.VectorOperator
queryRangeEnd time.Time

vectorSelector model.VectorOperator
telemetry.OperatorTelemetry
}

func NewExecution(query promql.Query, pool *model.VectorPool, queryRangeStart time.Time, engineLabels []labels.Labels, opts *query.Options, _ storage.SelectHints) *Execution {
func NewExecution(query promql.Query, pool *model.VectorPool, queryRangeStart, queryRangeEnd time.Time, engineLabels []labels.Labels, opts *query.Options, _ storage.SelectHints) *Execution {
storage := newStorageFromQuery(query, opts, engineLabels)
oper := &Execution{
storage: storage,
query: query,
opts: opts,
queryRangeStart: queryRangeStart,
queryRangeEnd: queryRangeEnd,
vectorSelector: promstorage.NewVectorSelector(pool, storage, opts, 0, 0, false, 0, 1),
}

Expand All @@ -57,7 +60,7 @@ func (e *Execution) Series(ctx context.Context) ([]labels.Labels, error) {
}

func (e *Execution) String() string {
return fmt.Sprintf("[remoteExec] %s (%d, %d)", e.query, e.queryRangeStart.Unix(), e.opts.End.Unix())
return fmt.Sprintf("[remoteExec] %s (%d, %d)", e.query, e.queryRangeStart.Unix(), e.queryRangeEnd.Unix())
}

func (e *Execution) Next(ctx context.Context) ([]model.StepVector, error) {
Expand Down
46 changes: 43 additions & 3 deletions logicalplan/distribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type RemoteExecution struct {
Engine api.RemoteEngine
Query Node
QueryRangeStart time.Time
QueryRangeEnd time.Time
}

func (r RemoteExecution) Clone() Node {
Expand All @@ -95,7 +96,7 @@ func (r RemoteExecution) String() string {
if r.QueryRangeStart.UnixMilli() == 0 {
return fmt.Sprintf("remote(%s)", r.Query)
}
return fmt.Sprintf("remote(%s) [%s]", r.Query, r.QueryRangeStart.UTC().String())
return fmt.Sprintf("remote(%s) [%s, %s]", r.Query, r.QueryRangeStart.UTC().String(), r.QueryRangeEnd.UTC().String())
}

func (r RemoteExecution) Type() NodeType { return RemoteExecutionNode }
Expand Down Expand Up @@ -316,6 +317,21 @@ func (m DistributedExecutionOptimizer) distributeQuery(expr *Node, engines []api
return *expr
}

// Selectors in queries can be scoped to a single timestamp. This case is hard to
// distribute properly and can lead to flaky results.
// We only do it if all engines have sufficient scope for the full range of the query,
// adjusted for the timestamp.
// Otherwise, we fall back to the default mode of not executing the query remotely.
if timestamps := getQueryTimestamps(expr); len(timestamps) > 0 {
for _, e := range engines {
for _, ts := range timestamps {
if e.MinT() > ts-startOffset.Milliseconds() || e.MaxT() < ts {
return *expr
}
}
}
}

var globalMinT int64 = math.MaxInt64
for _, e := range engines {
if e.MinT() < globalMinT {
Expand Down Expand Up @@ -344,6 +360,7 @@ func (m DistributedExecutionOptimizer) distributeQuery(expr *Node, engines []api
Engine: e,
Query: (*expr).Clone(),
QueryRangeStart: start,
QueryRangeEnd: opts.End,
})
}

Expand All @@ -369,6 +386,7 @@ func (m DistributedExecutionOptimizer) distributeAbsent(expr Node, engines []api
Engine: engines[i],
Query: expr.Clone(),
QueryRangeStart: opts.Start,
QueryRangeEnd: opts.End,
})
}
// We need to make sure that absent is at least evaluated against one engine.
Expand All @@ -380,6 +398,7 @@ func (m DistributedExecutionOptimizer) distributeAbsent(expr Node, engines []api
Engine: engines[len(engines)-1],
Query: expr,
QueryRangeStart: opts.Start,
QueryRangeEnd: opts.End,
}
}

Expand Down Expand Up @@ -464,8 +483,10 @@ func calculateStartOffset(expr *Node, lookbackDelta time.Duration) time.Duration
return lookbackDelta
}

var selectRange time.Duration
var offset time.Duration
var (
selectRange time.Duration
offset time.Duration
)
Traverse(expr, func(node *Node) {
switch n := (*node).(type) {
case *Subquery:
Expand All @@ -479,6 +500,25 @@ func calculateStartOffset(expr *Node, lookbackDelta time.Duration) time.Duration
return maxDuration(offset+selectRange, lookbackDelta)
}

func getQueryTimestamps(expr *Node) []int64 {
var timestamps []int64
Traverse(expr, func(node *Node) {
switch n := (*node).(type) {
case *Subquery:
if n.Timestamp != nil {
timestamps = append(timestamps, *n.Timestamp)
return
}
case *VectorSelector:
if n.Timestamp != nil {
timestamps = append(timestamps, *n.Timestamp)
return
}
}
})
return timestamps
}

func numSteps(start, end time.Time, step time.Duration) int64 {
return (end.UnixMilli()-start.UnixMilli())/step.Milliseconds() + 1
}
Expand Down
52 changes: 41 additions & 11 deletions logicalplan/distribute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,9 @@ remote(sum by (pod, region) (rate(http_requests_total[2m]) * 60))))`,
expr: `sum_over_time(max(http_requests_total)[5m:1m])`,
expected: `
sum_over_time(max(dedup(
remote(max by (region) (http_requests_total)) [1969-12-31 23:55:00 +0000 UTC],
remote(max by (region) (http_requests_total)) [1969-12-31 23:55:00 +0000 UTC]
))[5m:1m])`,
remote(max by (region) (http_requests_total)) [1969-12-31 23:55:00 +0000 UTC, 1970-01-01 00:00:00 +0000 UTC],
remote(max by (region) (http_requests_total)) [1969-12-31 23:55:00 +0000 UTC, 1970-01-01 00:00:00 +0000 UTC])
)[5m:1m])`,
},
{
name: "label based pruning matches one engine",
Expand Down Expand Up @@ -493,7 +493,7 @@ func TestDistributedExecutionWithLongSelectorRanges(t *testing.T) {
expected: `
dedup(
remote(sum_over_time(metric[5m])),
remote(sum_over_time(metric[5m])) [1970-01-01 06:05:00 +0000 UTC]
remote(sum_over_time(metric[5m])) [1970-01-01 06:05:00 +0000 UTC, 1970-01-01 12:00:00 +0000 UTC]
)`,
},
{
Expand All @@ -510,7 +510,7 @@ dedup(
expected: `
dedup(
remote(sum_over_time(metric[2h])),
remote(sum_over_time(metric[2h])) [1970-01-01 08:00:00 +0000 UTC]
remote(sum_over_time(metric[2h])) [1970-01-01 08:00:00 +0000 UTC, 1970-01-01 12:00:00 +0000 UTC]
)`,
},
{
Expand All @@ -527,7 +527,7 @@ dedup(
expected: `
dedup(
remote(sum_over_time(sum_over_time(metric[1h])[1h:30m])),
remote(sum_over_time(sum_over_time(metric[1h])[1h:30m])) [1970-01-01 08:00:00 +0000 UTC]
remote(sum_over_time(sum_over_time(metric[1h])[1h:30m])) [1970-01-01 08:00:00 +0000 UTC, 1970-01-01 12:00:00 +0000 UTC]
)`,
},
{
Expand All @@ -543,7 +543,7 @@ dedup(
expr: `max_over_time(sum_over_time(sum_over_time(metric[5m])[45m:10m])[15m:15m])`,
expected: `dedup(
remote(max_over_time(sum_over_time(sum_over_time(metric[5m])[45m:10m])[15m:15m])),
remote(max_over_time(sum_over_time(sum_over_time(metric[5m])[45m:10m])[15m:15m])) [1970-01-01 07:05:00 +0000 UTC])`,
remote(max_over_time(sum_over_time(sum_over_time(metric[5m])[45m:10m])[15m:15m])) [1970-01-01 07:05:00 +0000 UTC, 1970-01-01 12:00:00 +0000 UTC])`,
},
{
name: "subquery with a total 4h range is cannot be distributed",
Expand Down Expand Up @@ -571,6 +571,36 @@ dedup(
expr: `sum_over_time(metric[3h])`,
expected: `sum_over_time(metric[3h])`,
},
{
name: "distribute queries with timestamp",
firstEngineOpts: engineOpts{
minTime: queryStart,
maxTime: time.Unix(0, 0).Add(eightHours),
},
secondEngineOpts: engineOpts{
minTime: time.Unix(0, 0).Add(sixHours),
maxTime: queryEnd,
},
expr: `sum(metric @ 25200)`,
expected: `
sum(dedup(
remote(sum by (region) (metric @ 25200.000)),
remote(sum by (region) (metric @ 25200.000)) [1970-01-01 06:00:00 +0000 UTC, 1970-01-01 12:00:00 +0000 UTC]
))`,
},
{
name: "skip distributing queries with timestamps outside of the range of an engine",
firstEngineOpts: engineOpts{
minTime: queryStart,
maxTime: time.Unix(0, 0).Add(eightHours),
},
secondEngineOpts: engineOpts{
minTime: time.Unix(0, 0).Add(sixHours),
maxTime: queryEnd,
},
expr: `sum(metric @ 18000)`,
expected: `sum(sum by (region) (metric @ 18000.000))`,
},
}

for _, tcase := range cases {
Expand Down Expand Up @@ -616,14 +646,14 @@ func TestDistributedExecutionPruningByTime(t *testing.T) {
expr: `sum(metric)`,
queryStart: time.Unix(0, 0).Add(7 * time.Hour),
queryEnd: time.Unix(0, 0).Add(8 * time.Hour),
expected: `sum(dedup(remote(sum by (region) (metric)) [1970-01-01 07:00:00 +0000 UTC]))`,
expected: `sum(dedup(remote(sum by (region) (metric)) [1970-01-01 07:00:00 +0000 UTC, 1970-01-01 08:00:00 +0000 UTC]))`,
},
{
name: "1 hour range query at the start of the range prunes the second engine",
expr: `sum(metric)`,
queryStart: time.Unix(0, 0).Add(1 * time.Hour),
queryEnd: time.Unix(0, 0).Add(2 * time.Hour),
expected: `sum(dedup(remote(sum by (region) (metric)) [1970-01-01 01:00:00 +0000 UTC]))`,
expected: `sum(dedup(remote(sum by (region) (metric)) [1970-01-01 01:00:00 +0000 UTC, 1970-01-01 02:00:00 +0000 UTC]))`,
},
{
name: "instant query in the overlapping range queries both engines",
Expand All @@ -633,8 +663,8 @@ func TestDistributedExecutionPruningByTime(t *testing.T) {
expected: `
sum(
dedup(
remote(sum by (region) (metric)) [1970-01-01 06:00:00 +0000 UTC],
remote(sum by (region) (metric)) [1970-01-01 06:00:00 +0000 UTC]
remote(sum by (region) (metric)) [1970-01-01 06:00:00 +0000 UTC, 1970-01-01 06:00:00 +0000 UTC],
remote(sum by (region) (metric)) [1970-01-01 06:00:00 +0000 UTC, 1970-01-01 06:00:00 +0000 UTC]
)
)`,
},
Expand Down
2 changes: 2 additions & 0 deletions logicalplan/passthrough.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func (m PassthroughOptimizer) Optimize(plan Node, opts *query.Options) (Node, an
Engine: engines[0],
Query: plan.Clone(),
QueryRangeStart: opts.Start,
QueryRangeEnd: opts.End,
}, nil
}

Expand All @@ -78,6 +79,7 @@ func (m PassthroughOptimizer) Optimize(plan Node, opts *query.Options) (Node, an
Engine: matchingLabelsEngines[0],
Query: plan.Clone(),
QueryRangeStart: opts.Start,
QueryRangeEnd: opts.End,
}, nil
}

Expand Down
Loading