Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add label select for k8s metadata processor #516

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions collector/docker/kindling-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,63 @@ processors:
# The default value is false. It should be enabled if the ReplicaSet
# is used to control pods in the third-party CRD except for Deployment.
enable_fetch_replicaset: false
# Add label filter by container info labels.
# label key can be:
# {
# "comm": "webserver",
# "container_id": "c9e4f11a1cac",
# "dnat_ip": "",
# "dnat_port": -1,
# "dns_domain": "",
# "dns_id": 0,
# "dns_rcode": 0,
# "dst_container": "",
# "dst_container_id": "",
# "dst_ip": "10.106.113.249",
# "dst_namespace": "",
# "dst_node": "",
# "dst_node_ip": "",
# "dst_pod": "",
# "dst_port": 3306,
# "dst_service": "",
# "dst_workload_kind": "",
# "dst_workload_name": "",
# "error_type": 0,
# "http_method": "",
# "http_status_code": 0,
# "http_url": "",
# "is_error": false,
# "is_server": false,
# "is_slow": false,
# "pid": 8647,
# "protocol": "mysql",
# "request_payload": ".....SET NAMES utf8",
# "request_tid": 8810,
# "response_payload": "...........",
# "response_tid": 8810,
# "sql": "SET NAMES utf8",
# "src_container": "",
# "src_container_id": "",
# "src_ip": "10.0.0.19",
# "src_namespace": "",
# "src_node": "",
# "src_node_ip": "",
# "src_pod": "",
# "src_port": 33748,
# "src_service": "",
# "src_workload_kind": "",
# "src_workload_name": ""
# }
# https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/label-selector/
label_selector: {}
# matchExpressions:
# - key: aaa
# operator: In
# values:
# - fa
# - fb
# matchLabels:
# container_id: b38b8f2f4e53
aggregateprocessor:
# Aggregation duration window size. The unit is second.
ticker_interval: 5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package k8sprocessor

import (
"github.com/Kindling-project/kindling/collector/pkg/metadata/kubernetes"

"k8s.io/apimachinery/pkg/apis/meta/v1"
)

type Config struct {
Expand All @@ -18,6 +20,8 @@ type Config struct {
// Set "Enable" false if you want to run the agent in the non-Kubernetes environment.
// Otherwise, the agent will panic if it can't connect to the API-server.
Enable bool `mapstructure:"enable"`

LabelSelector *v1.LabelSelector `mapstructure:"label_selector"`
}

var DefaultConfig Config = Config{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"strconv"

"go.uber.org/zap"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"

"github.com/Kindling-project/kindling/collector/pkg/component"
"github.com/Kindling-project/kindling/collector/pkg/component/consumer"
Expand All @@ -26,6 +28,7 @@ type K8sMetadataProcessor struct {
localNodeIp string
localNodeName string
telemetry *component.TelemetryTools
labelSelecotr labels.Selector
}

func NewKubernetesProcessor(cfg interface{}, telemetry *component.TelemetryTools, nextConsumer consumer.Consumer) processor.Processor {
Expand Down Expand Up @@ -62,20 +65,30 @@ func NewKubernetesProcessor(cfg interface{}, telemetry *component.TelemetryTools
if localNodeName, err = getHostNameFromEnv(); err != nil {
telemetry.Logger.Warn("Local NodeName can not found", zap.Error(err))
}
return &K8sMetadataProcessor{
res := &K8sMetadataProcessor{
config: config,
metadata: kubernetes.MetaDataCache,
nextConsumer: nextConsumer,
localNodeIp: localNodeIp,
localNodeName: localNodeName,
telemetry: telemetry,
}
if config.LabelSelector != nil {
res.labelSelecotr, err = v1.LabelSelectorAsSelector(config.LabelSelector)
if err != nil {
telemetry.Logger.Warn("load label selector failed %v, skip label selector. ", zap.Error(err))
}
}
return res
}

func (p *K8sMetadataProcessor) Consume(dataGroup *model.DataGroup) error {
if !p.config.Enable {
return p.nextConsumer.Consume(dataGroup)
}
if p.labelSelecotr != nil && !p.labelSelecotr.Matches(labels.Set(dataGroup.Labels.ToStringMap())) {
return nil
}
name := dataGroup.Name
switch name {
case constnames.NetRequestMetricGroupName:
Expand Down