-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathopenshift-fluentd.conf
121 lines (106 loc) · 4.47 KB
/
openshift-fluentd.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
<system>
log_level warn
</system>
<source>
@type monitor_agent
bind "0.0.0.0"
port "24220"
</source>
@include syslog-input.conf
<source>
@type tail
path "#{ENV['DATA_DIR']}/docker/*.log"
pos_file "#{ENV['DATA_DIR']}/es-containers.log.pos"
time_format %Y-%m-%dT%H:%M:%S
tag kubernetes.*
format json
keep_time_key true
read_from_head true
</source>
<filter system.var.log.**>
type record_transformer
enable_ruby
<record>
# if host == 'localhost' do a fqdn lookup
## we pull the hostname from the host's /etc/hostname mounted at /etc/docker-hostname to correctly identify the hostname
hostname ${host.eql?('localhost') ? (begin; File.open('/etc/docker-hostname') { |f| f.readline }.rstrip; rescue; host; end) : host}
# tag_parts = ["system","messages-20150710"]
## we want to correct the time here in cases where we are reading logs from a prior year. By default Ruby will assume the current date for
## parsing, and since syslog messages do not include the year, it will populate the year with the current year which may resolve to a future date
## we attempt to use the date format used for rolled over logs 'YYYYMMDD' if possible to get the correct* year, otherwise we subtract the year by 1
## *we cannot know if the year is always correct e.g. logs from 20121230 may be in the file 20130101 which would resolve to still be in the past -- 20131230
time ${ Time.at(time) > Time.now ? (temp_time = Time.parse(Time.at(time).to_s.gsub(Time.at(time).year.to_s, (tag_parts[1].nil? ? Time.at(time).year.to_s : tag_parts[1][9,4]) )).to_datetime.to_s; Time.parse(temp_time) > Time.now ? Time.parse(temp_time.gsub(Time.parse(temp_time).year.to_s, (Time.parse(temp_time).year - 1).to_s )).to_datetime.to_s : Time.parse(temp_time).to_datetime.to_s ) : Time.at(time).to_datetime.to_s }
#tag ${tag}_.operations_log
version 1.1.4
</record>
remove_keys host
</filter>
<filter system.journal>
type record_transformer
enable_ruby
renew_record
<record>
# if the field name begins with an uppercase letter, you have to use the record["FIELDNAME"] style
hostname ${_HOSTNAME.eql?('localhost') ? (begin; File.open('/etc/docker-hostname') { |f| f.readline }.rstrip; rescue; _HOSTNAME; end) : _HOSTNAME}
time ${Time.at(_SOURCE_REALTIME_TIMESTAMP.to_f / 1000000.0).to_datetime.rfc3339(6)}
ident ${record["SYSLOG_IDENTIFIER"] || record["_COMM"]}
message ${record["MESSAGE"]}
version 1.1.4
</record>
</filter>
# simulate this filter
# <filter kubernetes.**>
# type kubernetes_metadata
# kubernetes_url "#{ENV['K8S_HOST_URL']}"
# bearer_token_file /var/run/secrets/kubernetes.io/serviceaccount/token
# ca_file /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
# include_namespace_id true
# </filter>
<filter kubernetes.**>
type record_transformer
enable_ruby
<record>
kubernetes_namespace_name ${tag_parts[3].split('_')[1]}
kubernetes_namespace_id 1
docker_container_id ${tag_parts[3].split('_')[2].split('-')[-1]}
kubernetes_pod_name ${tag_parts[3].split('_')[0]}
kubernetes_container_name ${tag_parts[3].split('_')[2].rpartition('-')[0]}
kubernetes_host localhost
</record>
</filter>
<filter kubernetes.**>
type record_transformer
enable_ruby
<record>
hostname ${(kubernetes_host rescue nil) || File.open('/etc/docker-hostname') { |f| f.readline }.rstrip}
message ${log}
version 1.1.4
</record>
remove_keys log,stream
</filter>
<match kubernetes.**>
@type elasticsearch_dynamic
host "#{ENV['ES_HOST']}"
port "#{ENV['ES_PORT']}"
scheme http
index_name ${record['kubernetes_namespace_name']}.${record['kubernetes_namespace_id']}.${Time.at(time).getutc.strftime(@logstash_dateformat)}
client_key "#{ENV['ES_CLIENT_KEY']}"
client_cert "#{ENV['ES_CLIENT_CERT']}"
ca_file "#{ENV['ES_CA']}"
flush_interval 5s
max_retry_wait 300
disable_retry_limit
</match>
<match system.** **_default_** **_openshift_** **_openshift-infra_**>
@type elasticsearch_dynamic
host "#{ENV['OPS_HOST']}"
port "#{ENV['OPS_PORT']}"
scheme http
index_name .operations.${record['time'].nil? ? Time.at(time).getutc.strftime(@logstash_dateformat) : Time.parse(record['time']).getutc.strftime(@logstash_dateformat)}
client_key "#{ENV['OPS_CLIENT_KEY']}"
client_cert "#{ENV['OPS_CLIENT_CERT']}"
ca_file "#{ENV['OPS_CA']}"
flush_interval 5s
max_retry_wait 300
disable_retry_limit
</match>