-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: Add retention hours to discarded
metrics
#15875
base: main
Are you sure you want to change the base?
Conversation
pkg/validation/limits.go
Outdated
selectedRetention := o.getOverridesForUser(userID).RetentionPeriod | ||
highestPriority := -1 | ||
|
||
if len(ls) > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this check is to help using this same function for when you want to automatically gets the Tenant retention but have no labels available
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it'd be more idiomatic to return early if ls is empty and keep this loop in the top level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
87db09e
to
f269d13
Compare
discarded
metricsdiscarded
metrics
f269d13
to
62b8cfc
Compare
if len(ls) == 0 { | ||
validation.DiscardedSamples.WithLabelValues(validation.MissingLabels, ctx.userID).Inc() | ||
validation.DiscardedSamples.WithLabelValues(validation.MissingLabels, ctx.userID, retentionHours).Inc() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see we only added the retentionHours in the PR but, shouldn't this be updateMetrics
as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. We can fix that in a separate PR to involve more people in the discussion.
if err != nil { | ||
return i.onStreamCreationError(ctx, pushReqStream, err, labels) | ||
return i.onStreamCreationError(ctx, pushReqStream, err, labels, retentionHours) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This err handler looks a bit off-place. What about getting the retentionHours after getting the labels?
labels, err := syntax.ParseLabels(pushReqStream.Labels)
if err != nil {
...
}
retentionHours := i.limiter.limits.RetentionHours(i.instanceID, labels)
if record != nil {
err = i.streamCountLimiter.AssertNewStreamAllowed(i.instanceID)
return i.onStreamCreationError(ctx, pushReqStream, err, labels, retentionHours)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good call, fixed it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite my suggestion but it's fine since it's a nit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you sure? I'm doing it immediately after parsing the labels.
pkg/validation/limits.go
Outdated
selectedRetention := o.getOverridesForUser(userID).RetentionPeriod | ||
highestPriority := -1 | ||
|
||
if len(ls) > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it'd be more idiomatic to return early if ls is empty and keep this loop in the top level.
pkg/validation/limits.go
Outdated
@@ -959,6 +970,27 @@ func (o *Overrides) StreamRetention(userID string) []StreamRetention { | |||
return o.getOverridesForUser(userID).StreamRetention | |||
} | |||
|
|||
// RetentionHours returns the retention period for a given user. | |||
func (o *Overrides) RetentionHours(userID string, ls labels.Labels) string { | |||
streamRetentions := o.getOverridesForUser(userID).StreamRetention |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rather call the public StreamRetention() than the implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/validation/limits.go
Outdated
@@ -959,6 +970,27 @@ func (o *Overrides) StreamRetention(userID string) []StreamRetention { | |||
return o.getOverridesForUser(userID).StreamRetention | |||
} | |||
|
|||
// RetentionHours returns the retention period for a given user. | |||
func (o *Overrides) RetentionHours(userID string, ls labels.Labels) string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use this one instead which is the one the compactor actually uses
loki/pkg/compactor/retention/expiration.go
Line 134 in 4c88be0
func (tr *TenantsRetention) RetentionPeriodFor(userID string, lbs labels.Labels) time.Duration { |
if err != nil { | ||
d.writeFailuresManager.Log(tenantID, err) | ||
validationErrors.Add(err) | ||
validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, tenantID).Add(float64(len(stream.Entries))) | ||
validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, tenantID, tenantRetentionHours).Add(float64(len(stream.Entries))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may be counting these twice since we already update the discarded metrics inside parseStreamLabels
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, you are right @salvacorts
…na/loki into retention-as-label-when-discarding
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Left some more nits. I'm gonna ask Vlad to review it as well since he spotted the issue first.
pkg/distributor/distributor.go
Outdated
@@ -493,7 +492,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log | |||
} | |||
} | |||
|
|||
tenantRetentionHours := d.validator.Limits.RetentionHours(tenantID, nil) | |||
tenantRetentionHours := util.RetentionHours(d.tenantsRetention.RetentionPeriodFor(tenantID, nil)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: instead of calling util.RetentionHours
what about having tenantsRetention.RetentionHoursFor() string
that calls util.RetentionHours
for the result of tenantsRetention.RetentionPeriodFor
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/distributor/distributor.go
Outdated
@@ -524,7 +524,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log | |||
continue | |||
} | |||
|
|||
retentionHours := d.validator.Limits.RetentionHours(tenantID, lbs) | |||
retentionHours := util.RetentionHours(d.tenantsRetention.RetentionPeriodFor(tenantID, lbs)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit for a followup PR since we were resolving the retention for the streams before the changes. I think we could be able to speed up pushes by caching this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think optimizing that would do more harm than good (based on the benchmarks). Doing the new retention evaluation on pushes didn't make a difference but adding the cache will likely do, either by the extra memory usage or by the extra complexity of adding of caching more stuff.
@@ -538,7 +541,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log | |||
prevTs := stream.Entries[0].Timestamp | |||
|
|||
for _, entry := range stream.Entries { | |||
if err := d.validator.ValidateEntry(ctx, validationContext, lbs, entry); err != nil { | |||
if err := d.validator.ValidateEntry(ctx, validationContext, lbs, entry, retentionHours); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe validation context should have the validationMetrics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, wdyt?
if err != nil { | ||
return i.onStreamCreationError(ctx, pushReqStream, err, labels) | ||
return i.onStreamCreationError(ctx, pushReqStream, err, labels, retentionHours) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite my suggestion but it's fine since it's a nit
pkg/ingester/limiter.go
Outdated
@@ -33,6 +33,10 @@ type Limits interface { | |||
PerStreamRateLimit(userID string) validation.RateLimit | |||
ShardStreams(userID string) shardstreams.Config | |||
IngestionPartitionsTenantShardSize(userID string) int | |||
RetentionPeriod(userID string) time.Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can add retention.Limits
here instead
type Limits interface {
...
ShardStreams(userID string) shardstreams.Config
IngestionPartitionsTenantShardSize(userID string) int
retention.Limits
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hah good catch 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. however, there are some comments on distributor's side.
@@ -494,6 +492,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log | |||
} | |||
} | |||
|
|||
tenantRetentionHours := d.tenantsRetention.RetentionHoursFor(tenantID, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume that in this case, we use the global tenant's retention period, rather than the retention period for the particular stream.
If so, we can not do it because the metric that shows ingested bytes count always use the retention period from the stream, and we need to follow it exactly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll check but, we're only using tenantRetentionHours
when we fail to parse the stream labels, how can the ingested_bytes know the stream retention period if it couldn't parse the stream labels?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it completely fails parsing the labels, it returns an error :
Line 245 in ebc84ca
if err != nil { |
but if the labels are parsed, then the retention hours is calculated based on what we got
Line 282 in ebc84ca
retentionPeriod = tenantsRetention.RetentionPeriodFor(userID, lbs) |
even if the length of the labels breaks the validation....
if err != nil { | ||
d.writeFailuresManager.Log(tenantID, err) | ||
validationErrors.Add(err) | ||
validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, tenantID).Add(float64(len(stream.Entries))) | ||
validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, tenantID, tenantRetentionHours).Add(float64(len(stream.Entries))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, you are right @salvacorts
if !d.ingestionRateLimiter.AllowN(now, tenantID, validatedLineSize) { | ||
d.trackDiscardedData(ctx, req, validationContext, tenantID, validatedLineCount, validatedLineSize, validation.RateLimited) | ||
if !d.ingestionRateLimiter.AllowN(now, tenantID, validationContext.validationMetrics.lineSize) { | ||
d.trackDiscardedData(ctx, req, validationContext, tenantID, validationContext.validationMetrics, validation.RateLimited) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I would ask @trevorwhitney if it's necessary to pass retention_hours
to usageTacker for the discarded bytes metric. because they also expose this label in ingested bytes metric and might have a similar issue
What this PR does / why we need it:
We have no distinction between the type of data that gets discarded. On this PR I'm adding
retention_hours
to our discarded metrics so that we can better look at what data was discarded.I'm also using a more performant form of retention strigifying when loading the retention string. Benchmark results:
Which issue(s) this PR fixes:
Fixes #
Special notes for your reviewer:
Checklist
CONTRIBUTING.md
guide (required)feat
PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.docs/sources/setup/upgrade/_index.md
deprecated-config.yaml
anddeleted-config.yaml
files respectively in thetools/deprecated-config-checker
directory. Example PR