Skip to content

Commit

Permalink
pkg/target:add sapling ratio
Browse files Browse the repository at this point in the history
Signed-off-by: Sumera Priyadarsini <[email protected]>
  • Loading branch information
Sylfrena committed Mar 18, 2022
1 parent 7d76f76 commit a9bf983
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 1 deletion.
1 change: 1 addition & 0 deletions cmd/parca-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func main() {
profileListener, debugInfoClient,
flags.ProfilingDuration,
externalLabels(flags.ExternalLabel, flags.Node),
flags.SamplingRatio,
flags.TempDir,
)

Expand Down
1 change: 1 addition & 0 deletions pkg/agent/write_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func isEqualLabel(a, b *profilestorepb.LabelSet) bool {
return ret
}

//verifies whether incoming p already is present in arr and returns end index for arr
func findIndex(arr []*profilestorepb.RawProfileSeries, p *profilestorepb.RawProfileSeries) (int, bool) {
for i, val := range arr {
if isEqualLabel(val.Labels, p.Labels) {
Expand Down
5 changes: 4 additions & 1 deletion pkg/target/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Manager struct {
writeClient profilestorepb.ProfileStoreServiceClient
debugInfoClient debuginfo.Client
profilingDuration time.Duration
samplingRatio float64
tmp string
}

Expand All @@ -49,6 +50,7 @@ func NewManager(
debugInfoClient debuginfo.Client,
profilingDuration time.Duration,
externalLabels model.LabelSet,
samplingRatio float64,
tmp string,
) *Manager {
return &Manager{
Expand All @@ -61,6 +63,7 @@ func NewManager(
writeClient: writeClient,
debugInfoClient: debugInfoClient,
profilingDuration: profilingDuration,
samplingRatio: samplingRatio,
tmp: tmp,
}
}
Expand Down Expand Up @@ -94,7 +97,7 @@ func (m *Manager) reconcileTargets(ctx context.Context, targetSets map[string][]
m.ksymCache, objectfile.NewCache(m.logger, cacheSize),
m.writeClient, m.debugInfoClient,
m.profilingDuration, m.externalLabels,
m.tmp,
m.samplingRatio, m.tmp,
)
m.profilerPools[name] = pp
}
Expand Down
30 changes: 30 additions & 0 deletions pkg/target/profiler_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package target

import (
"context"
"hash/fnv"
"math"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -56,6 +58,7 @@ type ProfilerPool struct {
writeClient profilestorepb.ProfileStoreServiceClient
debugInfoClient debuginfo.Client
profilingDuration time.Duration
samplingRatio float64
tmp string
}

Expand All @@ -69,6 +72,7 @@ func NewProfilerPool(
debugInfoClient debuginfo.Client,
profilingDuration time.Duration,
externalLabels model.LabelSet,
samplingRatio float64,
tmp string,
) *ProfilerPool {
return &ProfilerPool{
Expand All @@ -84,6 +88,7 @@ func NewProfilerPool(
writeClient: writeClient,
debugInfoClient: debugInfoClient,
profilingDuration: profilingDuration,
samplingRatio: samplingRatio,
tmp: tmp,
}
}
Expand Down Expand Up @@ -143,6 +148,11 @@ func (pp *ProfilerPool) Sync(tg []*Group) {
pp.tmp,
)

if !probabilisticSampling(pp.samplingRatio, labelsetToLabels(newTarget.labelSet)) {
// This target is not being sampled.
continue
}

go func() {
err := newProfiler.Run(pp.ctx)
level.Debug(pp.logger).Log("msg", "profiler ended with error", "error", err, "labels", newProfiler.Labels().String())
Expand Down Expand Up @@ -173,3 +183,23 @@ func labelsetToLabels(labelSet model.LabelSet) labels.Labels {
sort.Sort(ls)
return ls
}

var seps = []byte{'\xff'}

func probabilisticSampling(ratio float64, labels labels.Labels) bool {
if ratio == 1.0 {
return true
}

b := make([]byte, 0, 1024)
for _, v := range labels {
b = append(b, v.Name...)
b = append(b, seps[0])
b = append(b, v.Value...)
b = append(b, seps[0])
}
h := fnv.New32a()
h.Write(b)
v := h.Sum32()
return v <= uint32(float64(math.MaxUint32)*ratio)
}

0 comments on commit a9bf983

Please sign in to comment.