Skip to content

Commit

Permalink
Add label select for k8s metadata processor
Browse files Browse the repository at this point in the history
Signed-off-by: longhui.li <[email protected]>
  • Loading branch information
llhhbc committed May 13, 2023
1 parent d07ce54 commit c87f868
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package k8sprocessor

import (
"github.com/Kindling-project/kindling/collector/pkg/metadata/kubernetes"

"k8s.io/apimachinery/pkg/apis/meta/v1"
)

type Config struct {
Expand All @@ -18,6 +20,8 @@ type Config struct {
// Set "Enable" false if you want to run the agent in the non-Kubernetes environment.
// Otherwise, the agent will panic if it can't connect to the API-server.
Enable bool `mapstructure:"enable"`

LabelSelector *v1.LabelSelector `mapstructure:"label_selector"`
}

var DefaultConfig Config = Config{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"strconv"

"go.uber.org/zap"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"

"github.com/Kindling-project/kindling/collector/pkg/component"
"github.com/Kindling-project/kindling/collector/pkg/component/consumer"
Expand All @@ -26,6 +28,7 @@ type K8sMetadataProcessor struct {
localNodeIp string
localNodeName string
telemetry *component.TelemetryTools
labelSelecotr labels.Selector
}

func NewKubernetesProcessor(cfg interface{}, telemetry *component.TelemetryTools, nextConsumer consumer.Consumer) processor.Processor {
Expand Down Expand Up @@ -62,20 +65,30 @@ func NewKubernetesProcessor(cfg interface{}, telemetry *component.TelemetryTools
if localNodeName, err = getHostNameFromEnv(); err != nil {
telemetry.Logger.Warn("Local NodeName can not found", zap.Error(err))
}
return &K8sMetadataProcessor{
res := &K8sMetadataProcessor{
config: config,
metadata: kubernetes.MetaDataCache,
nextConsumer: nextConsumer,
localNodeIp: localNodeIp,
localNodeName: localNodeName,
telemetry: telemetry,
}
if config.LabelSelector != nil {
res.labelSelecotr, err = v1.LabelSelectorAsSelector(config.LabelSelector)
if err != nil {
telemetry.Logger.Warn("load label selector failed %v, skip label selector. ", zap.Error(err))
}
}
return res
}

func (p *K8sMetadataProcessor) Consume(dataGroup *model.DataGroup) error {
if !p.config.Enable {
return p.nextConsumer.Consume(dataGroup)
}
if p.labelSelecotr != nil && !p.labelSelecotr.Matches(labels.Set(dataGroup.Labels.ToStringMap())) {
return nil
}
name := dataGroup.Name
switch name {
case constnames.NetRequestMetricGroupName:
Expand Down

0 comments on commit c87f868

Please sign in to comment.