diff --git a/sinks/influxdb/influxdb.go b/sinks/influxdb/influxdb.go index 97d64cd4..ddce8150 100755 --- a/sinks/influxdb/influxdb.go +++ b/sinks/influxdb/influxdb.go @@ -28,6 +28,7 @@ import ( influxdb "github.com/influxdata/influxdb/client" kube_api "k8s.io/api/core/v1" "k8s.io/klog" + jsoniter "github.com/json-iterator/go" ) type influxdbSink struct { @@ -37,12 +38,35 @@ type influxdbSink struct { dbExists bool } +// eventStruct influxDB field more detial. +type eventStruct struct{ + kind string + evenType string + nameSpace string + podName string + reason string + message string + firstTimestamp string + lastTimestamp string +} + const ( eventMeasurementName = "log/events" // Event special tags eventUID = "uid" // Value Field name valueField = "value" + + // Kubernetes Event const + kubeKind = "kind" + evenType = "type" + nameSpace = "nameSpace" + podName = "podName" + eventReason = "reason" + eventMessage = "message" + firstTimestamp = "firstTimestamp" + lastTimestamp = "lastTimestamp" + // Event special tags dbNotFoundError = "database not found" @@ -90,17 +114,44 @@ func eventToPointWithFields(event *kube_api.Event) (*influxdb.Point, error) { return &point, nil } + +// selectEventData Select kubernetes events data from value +func selectEventData(value string) eventStruct{ + var eventData eventStruct + data := []byte(value) + eventData.evenType = jsoniter.Get(data,"type").ToString() + eventData.kind = jsoniter.Get(data,"involvedObject","kind").ToString() + eventData.nameSpace = jsoniter.Get(data,"involvedObject","namespace").ToString() + eventData.podName = jsoniter.Get(data,"involvedObject","name").ToString() + eventData.reason = jsoniter.Get(data,"reason").ToString() + eventData.message = jsoniter.Get(data,"message").ToString() + eventData.firstTimestamp = jsoniter.Get(data,"firstTimestamp").ToString() + eventData.lastTimestamp = jsoniter.Get(data,"lastTimestamp").ToString() + return eventData +} + + +// eventToPoint make influxdb point from kubernetes event data func eventToPoint(event *kube_api.Event) (*influxdb.Point, error) { value, err := getEventValue(event) if err != nil { return nil, err } + eventData := selectEventData(value) point := influxdb.Point{ Measurement: eventMeasurementName, Time: event.LastTimestamp.Time.UTC(), Fields: map[string]interface{}{ valueField: value, + kubeKind: eventData.kind, + evenType: eventData.evenType, + nameSpace: eventData.nameSpace, + podName: eventData.podName, + eventReason: eventData.reason, + eventMessage: eventData.message, + firstTimestamp: eventData.firstTimestamp, + lastTimestamp: eventData.lastTimestamp, }, Tags: map[string]string{ eventUID: string(event.UID),