Skip to content

Commit

Permalink
Improve the GC efficiency of the otelexporter (#623)
Browse files Browse the repository at this point in the history
Signed-off-by: Daxin Wang <[email protected]>
  • Loading branch information
dxsup authored Feb 20, 2024
1 parent 8419fd7 commit 51da36d
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 139 deletions.
14 changes: 7 additions & 7 deletions collector/docker/kindling-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -235,19 +235,19 @@ exporters:
# that were previously reported but not updated in the most recent
# interval.
with_memory: false
memcleanup:
# If set to true, prometheus server will restart every `restart_period`
# to free up memory.
enable: false
# Specifies the frequency (in hours) to restart the server.
# A value of 0 disables the restart.
restart_period: 12
otlp:
collect_period: 15s
# Note: DO NOT add the prefix "http://"
endpoint: 10.10.10.10:8080
stdout:
collect_period: 15s
memcleanup:
# If set to true, prometheus server will restart every `restart_period`
# to free up memory.
enable: false
# Specifies the frequency (in hours) to restart the server.
# A value of 0 disables the restart.
restart_period: 12

observability:
logger:
Expand Down
15 changes: 7 additions & 8 deletions collector/pkg/component/consumer/exporter/otelexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ type Config struct {
CustomLabels map[string]string `mapstructure:"custom_labels"`
MetricAggregationMap map[string]MetricAggregationKind `mapstructure:"metric_aggregation_map"`
AdapterConfig *AdapterConfig `mapstructure:"adapter_config"`
MemCleanUpConfig *MemCleanUpConfig `mapstructure:"memcleanup"`
}

type PrometheusConfig struct {
Port string `mapstructure:"port,omitempty"`
WithMemory bool `mapstructure:"with_memory,omitempty"`
Port string `mapstructure:"port,omitempty"`
WithMemory bool `mapstructure:"with_memory,omitempty"`
MemCleanUpConfig *MemCleanUpConfig `mapstructure:"memcleanup"`
}

type OtlpGrpcConfig struct {
Expand All @@ -36,8 +36,7 @@ type AdapterConfig struct {
StoreExternalSrcIP bool `mapstructure:"store_external_src_ip"`
}

type MemCleanUpConfig struct{
Enabled bool `mapstructure:"enable,omitempty"`
RestartPeriod int `mapstructure:"restart_period,omitempty"`
RestartEveryNDays int `mapstructure:"restart_every_n_days,omitempty"`
}
type MemCleanUpConfig struct {
Enabled bool `mapstructure:"enable,omitempty"`
RestartPeriod int `mapstructure:"restart_period,omitempty"`
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"errors"
"fmt"
"io"
"net/http"
"os"
"sync"
"time"
Expand Down Expand Up @@ -47,8 +45,6 @@ const (
)

var serviceName string
var expCounter int = 0
var lastMetricsData string

type OtelOutputExporters struct {
metricExporter exportmetric.Exporter
Expand All @@ -67,7 +63,6 @@ type OtelExporter struct {
exp *prometheus.Exporter
rs *resource.Resource
mu sync.Mutex
restart bool

adapters []adapter.Adapter
}
Expand Down Expand Up @@ -144,9 +139,8 @@ func NewExporter(config interface{}, telemetry *component.TelemetryTools) export
instrumentFactory: newInstrumentFactory(exp.MeterProvider().Meter(MeterName), telemetry, customLabels),
metricAggregationMap: cfg.MetricAggregationMap,
telemetry: telemetry,
exp: exp,
exp: exp,
rs: rs,
restart: false,
adapters: []adapter.Adapter{
adapter.NewNetAdapter(customLabels, &adapter.NetAdapterConfig{
StoreTraceAsMetric: cfg.AdapterConfig.NeedTraceAsMetric,
Expand All @@ -166,56 +160,8 @@ func NewExporter(config interface{}, telemetry *component.TelemetryTools) export
}
}()

if cfg.MemCleanUpConfig != nil && cfg.MemCleanUpConfig.Enabled {
if cfg.MemCleanUpConfig.RestartPeriod != 0 {
ticker := time.NewTicker(time.Duration(cfg.MemCleanUpConfig.RestartPeriod) * time.Hour)
go func() {
for {
select {
case <-ticker.C:
otelexporter.restart = true
}
}
}()
}

go func() {
metricsTicker := time.NewTicker(250 * time.Millisecond)
for {
<-metricsTicker.C
currentMetricsData, err := fetchMetricsFromEndpoint()
if err != nil {
fmt.Println("Error fetching metrics:", err)
continue
}

if currentMetricsData != lastMetricsData {
lastMetricsData = currentMetricsData

if(otelexporter.restart == true){
otelexporter.NewMeter(otelexporter.telemetry)
otelexporter.restart = false
expCounter++
}
}
}
}()

cfg.MemCleanUpConfig.RestartEveryNDays = 0
if cfg.MemCleanUpConfig.RestartEveryNDays != 0 {
go func() {
for {
now := time.Now()
next := now.Add(time.Duration(cfg.MemCleanUpConfig.RestartEveryNDays) * time.Hour * 24)
next = time.Date(next.Year(), next.Month(), next.Day(), 0, 0, 0, 0, next.Location())
duration := next.Sub(now)

time.Sleep(duration)

otelexporter.restart = true
}
}()
}
if cfg.PromCfg.MemCleanUpConfig != nil && cfg.PromCfg.MemCleanUpConfig.Enabled {
registerResetSchedule(cfg.PromCfg.MemCleanUpConfig, otelexporter)
}

} else {
Expand Down Expand Up @@ -402,22 +348,20 @@ func (e *OtelExporter) NewMeter(telemetry *component.TelemetryTools) error {
telemetry.Logger.Warn("error starting otelexporter prometheus server: ", zap.Error(err))
}
}()

return nil
}

func fetchMetricsFromEndpoint() (string, error) {
resp, err := http.Get("http://127.0.0.1:9500/metrics")
if err != nil {
return "", err
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
func registerResetSchedule(cfg *MemCleanUpConfig, exporter *OtelExporter) {
if cfg.RestartPeriod != 0 {
ticker := time.NewTicker(time.Duration(cfg.RestartPeriod) * time.Hour)
go func() {
for {
select {
case <-ticker.C:
_ = exporter.NewMeter(exporter.telemetry)
}
}
}()
}

return string(body), nil
}

Loading

0 comments on commit 51da36d

Please sign in to comment.