diff --git a/collector/pkg/component/consumer/processor/k8sprocessor/config.go b/collector/pkg/component/consumer/processor/k8sprocessor/config.go index 709416c8c..6a944fa85 100644 --- a/collector/pkg/component/consumer/processor/k8sprocessor/config.go +++ b/collector/pkg/component/consumer/processor/k8sprocessor/config.go @@ -2,6 +2,8 @@ package k8sprocessor import ( "github.com/Kindling-project/kindling/collector/pkg/metadata/kubernetes" + + "k8s.io/apimachinery/pkg/apis/meta/v1" ) type Config struct { @@ -18,6 +20,8 @@ type Config struct { // Set "Enable" false if you want to run the agent in the non-Kubernetes environment. // Otherwise, the agent will panic if it can't connect to the API-server. Enable bool `mapstructure:"enable"` + + LabelSelector *v1.LabelSelector `mapstructure:"label_selector"` } var DefaultConfig Config = Config{ diff --git a/collector/pkg/component/consumer/processor/k8sprocessor/kubernetes_processor.go b/collector/pkg/component/consumer/processor/k8sprocessor/kubernetes_processor.go index 3d85bc71b..c9f6f47de 100644 --- a/collector/pkg/component/consumer/processor/k8sprocessor/kubernetes_processor.go +++ b/collector/pkg/component/consumer/processor/k8sprocessor/kubernetes_processor.go @@ -4,6 +4,8 @@ import ( "strconv" "go.uber.org/zap" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "github.com/Kindling-project/kindling/collector/pkg/component" "github.com/Kindling-project/kindling/collector/pkg/component/consumer" @@ -26,6 +28,7 @@ type K8sMetadataProcessor struct { localNodeIp string localNodeName string telemetry *component.TelemetryTools + labelSelecotr labels.Selector } func NewKubernetesProcessor(cfg interface{}, telemetry *component.TelemetryTools, nextConsumer consumer.Consumer) processor.Processor { @@ -62,7 +65,7 @@ func NewKubernetesProcessor(cfg interface{}, telemetry *component.TelemetryTools if localNodeName, err = getHostNameFromEnv(); err != nil { telemetry.Logger.Warn("Local NodeName can not found", zap.Error(err)) } - return &K8sMetadataProcessor{ + res := &K8sMetadataProcessor{ config: config, metadata: kubernetes.MetaDataCache, nextConsumer: nextConsumer, @@ -70,12 +73,22 @@ func NewKubernetesProcessor(cfg interface{}, telemetry *component.TelemetryTools localNodeName: localNodeName, telemetry: telemetry, } + if config.LabelSelector != nil { + res.labelSelecotr, err = v1.LabelSelectorAsSelector(config.LabelSelector) + if err != nil { + telemetry.Logger.Warn("load label selector failed %v, skip label selector. ", zap.Error(err)) + } + } + return res } func (p *K8sMetadataProcessor) Consume(dataGroup *model.DataGroup) error { if !p.config.Enable { return p.nextConsumer.Consume(dataGroup) } + if p.labelSelecotr != nil && !p.labelSelecotr.Matches(labels.Set(dataGroup.Labels.ToStringMap())) { + return nil + } name := dataGroup.Name switch name { case constnames.NetRequestMetricGroupName: