Skip to content

Commit

Permalink
Fix timestamps in steps
Browse files Browse the repository at this point in the history
Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
  • Loading branch information
harry671003 committed Jan 10, 2025
1 parent d89d82a commit bb05b72
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 7 deletions.
19 changes: 19 additions & 0 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,9 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts
t: InstantQuery,
resultSort: resultSort,
scanners: scanners,
start: ts,
end: ts,
step: 0,
}, nil
}

Expand Down Expand Up @@ -354,6 +357,9 @@ func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryab
// TODO(fpetkovski): Infer the sort order from the plan, ideally without copying the newResultSort function.
resultSort: noSortResultSort{},
scanners: scnrs,
start: ts,
end: ts,
step: 0,
}, nil
}

Expand Down Expand Up @@ -406,6 +412,9 @@ func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *
warns: warns,
t: RangeQuery,
scanners: scnrs,
start: start,
end: end,
step: step,
}, nil
}

Expand Down Expand Up @@ -448,6 +457,9 @@ func (e *Engine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable
warns: warns,
t: RangeQuery,
scanners: scnrs,
start: start,
end: end,
step: step,
}, nil
}

Expand Down Expand Up @@ -536,6 +548,9 @@ type compatibilityQuery struct {
plan logicalplan.Plan
ts time.Time // Empty for range queries.
warns annotations.Annotations
start time.Time
end time.Time
step time.Duration

t QueryType
resultSort resultSorter
Expand Down Expand Up @@ -709,6 +724,10 @@ func (q *compatibilityQuery) Stats() *stats.Statistics {

analysis := q.Analyze()
samples := stats.NewQuerySamples(enablePerStepStats)
if enablePerStepStats {
samples.InitStepTracking(q.start.UnixMilli(), q.end.UnixMilli(), telemetry.StepTrackingInterval(q.step))
}

if analysis != nil {
samples.PeakSamples = int(analysis.PeakSamples())
samples.TotalSamples = analysis.TotalSamples()
Expand Down
10 changes: 5 additions & 5 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5780,13 +5780,13 @@ var (
if x.TotalSamples != y.TotalSamples {
return false
}
if len(x.TotalSamplesPerStep) != len(y.TotalSamplesPerStep) {

if !cmp.Equal(x.TotalSamplesPerStep, y.TotalSamplesPerStep) {
return false
}
for i, xSample := range x.TotalSamplesPerStep {
if y.TotalSamplesPerStep[i] != xSample {
return false
}

if !cmp.Equal(x.TotalSamplesPerStepMap(), y.TotalSamplesPerStepMap()) {
return false
}
return true
})
Expand Down
4 changes: 2 additions & 2 deletions execution/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ type TrackedTelemetry struct {

func NewTrackedTelemetry(operator fmt.Stringer, opts *query.Options, logicalPlanNode logicalplan.Node) *TrackedTelemetry {
ss := stats.NewQuerySamples(opts.EnablePerStepStats)
ss.InitStepTracking(opts.Start.UnixMilli(), opts.End.UnixMilli(), stepTrackingInterval(opts.Step))
ss.InitStepTracking(opts.Start.UnixMilli(), opts.End.UnixMilli(), StepTrackingInterval(opts.Step))
return &TrackedTelemetry{
Stringer: operator,
LoadedSamples: ss,
logicalNode: logicalPlanNode,
}
}

func stepTrackingInterval(step time.Duration) int64 {
func StepTrackingInterval(step time.Duration) int64 {
if step == 0 {
return 1
}
Expand Down

0 comments on commit bb05b72

Please sign in to comment.