Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow to have a dedicated key for headers content #498

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ Consume events by single consumer.
topics <listening topics(separate with comma',')>
format <input text type (text|json|ltsv|msgpack)> :default => json
message_key <key (Optional, for text format only, default is message)>
add_headers <If true, add kafka's message headers to record>
headers_key <key dedicated to store headers content>
add_prefix <tag prefix (Optional)>
add_suffix <tag suffix (Optional)>

Expand Down Expand Up @@ -122,6 +124,7 @@ Consume events by kafka consumer group features..
message_key <key (Optional, for text format only, default is message)>
kafka_message_key <key (Optional, If specified, set kafka's message key to this key)>
add_headers <If true, add kafka's message headers to record>
headers_key <key dedicated to store headers content>
add_prefix <tag prefix (Optional)>
add_suffix <tag suffix (Optional)>
retry_emit_limit <Wait retry_emit_limit x 1s when BuffereQueueLimitError happens. The default is nil and it means waiting until BufferQueueLimitError is resolved>
Expand Down Expand Up @@ -159,6 +162,7 @@ With the introduction of the rdkafka-ruby based input plugin we hope to support
message_key <key (Optional, for text format only, default is message)>
kafka_message_key <key (Optional, If specified, set kafka's message key to this key)>
add_headers <If true, add kafka's message headers to record>
headers_key <key dedicated to store headers content>
add_prefix <tag prefix (Optional)>
add_suffix <tag suffix (Optional)>
retry_emit_limit <Wait retry_emit_limit x 1s when BuffereQueueLimitError happens. The default is nil and it means waiting until BufferQueueLimitError is resolved>
Expand Down
21 changes: 20 additions & 1 deletion lib/fluent/plugin/in_kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ class Fluent::KafkaInput < Fluent::Input
config_param :kafka_message_key, :string, :default => nil,
:desc => "Set kafka's message key to this field"

config_param :add_headers, :bool, :default => false,
:desc => "Add kafka's message headers to event record"
config_param :headers_key, :string, :default => nil,
:desc => "Record key to store kafka's message headers"

# Kafka#fetch_messages options
config_param :max_bytes, :integer, :default => nil,
:desc => "Maximum number of bytes to fetch."
Expand Down Expand Up @@ -235,6 +240,8 @@ def start
@record_time_key,
@tag_source,
@record_tag_key,
@add_headers,
@headers_key,
opt)
}
@topic_watchers.each {|tw|
Expand All @@ -259,7 +266,7 @@ def run
end

class TopicWatcher < Coolio::TimerWatcher
def initialize(topic_entry, kafka, interval, parser, add_prefix, add_suffix, offset_manager, router, kafka_message_key, time_source, record_time_key, tag_source, record_tag_key, options={})
def initialize(topic_entry, kafka, interval, parser, add_prefix, add_suffix, offset_manager, router, kafka_message_key, time_source, record_time_key, tag_source, record_tag_key, add_headers, headers_key, options={})
@topic_entry = topic_entry
@kafka = kafka
@callback = method(:consume)
Expand All @@ -274,6 +281,8 @@ def initialize(topic_entry, kafka, interval, parser, add_prefix, add_suffix, off
@record_time_key = record_time_key
@tag_source = tag_source
@record_tag_key = record_tag_key
@add_headers = add_headers
@headers_key = headers_key

@next_offset = @topic_entry.offset
if @topic_entry.offset == -1 && offset_manager
Expand Down Expand Up @@ -332,6 +341,16 @@ def consume
if @kafka_message_key
record[@kafka_message_key] = msg.key
end
if @add_headers
if @headers_key
headers_record = record[@headers_key] = {}
else
headers_record = record
end
msg.headers.each_pair { |k, v|
headers_record[k] = v
}
end
es.add(record_time, record)
rescue => e
$log.warn "parser error in #{@topic_entry.topic}/#{@topic_entry.partition}", :error => e.to_s, :value => msg.value, :offset => msg.offset
Expand Down
20 changes: 16 additions & 4 deletions lib/fluent/plugin/in_kafka_group.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class Fluent::KafkaGroupInput < Fluent::Input
:desc => "For 'text' format only."
config_param :add_headers, :bool, :default => false,
:desc => "Add kafka's message headers to event record"
config_param :headers_key, :string, :default => nil,
:desc => "Record key to store kafka's message headers"
config_param :add_prefix, :string, :default => nil,
:desc => "Tag prefix (Optional)"
config_param :add_suffix, :string, :default => nil,
Expand Down Expand Up @@ -259,7 +261,7 @@ def reconnect_consumer
end

def process_batch_with_record_tag(batch)
es = {}
es = {}
batch.messages.each { |msg|
begin
record = @parser_proc.call(msg)
Expand All @@ -285,8 +287,13 @@ def process_batch_with_record_tag(batch)
record[@kafka_message_key] = msg.key
end
if @add_headers
if @headers_key
headers_record = record[@headers_key] = {}
else
headers_record = record
end
msg.headers.each_pair { |k, v|
record[k] = v
headers_record[k] = v
}
end
es[tag].add(record_time, record)
Expand Down Expand Up @@ -332,8 +339,13 @@ def process_batch(batch)
record[@kafka_message_key] = msg.key
end
if @add_headers
if @headers_key
headers_record = record[@headers_key] = {}
else
headers_record = record
end
msg.headers.each_pair { |k, v|
record[k] = v
headers_record[k] = v
}
end
es.add(record_time, record)
Expand All @@ -355,7 +367,7 @@ def run
if @tag_source == :record
process_batch_with_record_tag(batch)
else
process_batch(batch)
process_batch(batch)
end
}
rescue ForShutdown
Expand Down
9 changes: 8 additions & 1 deletion lib/fluent/plugin/in_rdkafka_group.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class Fluent::Plugin::RdKafkaGroupInput < Fluent::Plugin::Input
:desc => "For 'text' format only."
config_param :add_headers, :bool, :default => false,
:desc => "Add kafka's message headers to event record"
config_param :headers_key, :string, :default => nil,
:desc => "Record key to store kafka's message headers"
config_param :add_prefix, :string, :default => nil,
:desc => "Tag prefix (Optional)"
config_param :add_suffix, :string, :default => nil,
Expand Down Expand Up @@ -254,8 +256,13 @@ def run
record[@kafka_message_key] = msg.key
end
if @add_headers
if @headers_key
headers_record = record[@headers_key] = {}
else
headers_record = record
end
msg.headers.each_pair { |k, v|
record[k] = v
headers_record[k] = v
}
end
es.add(record_time, record)
Expand Down
60 changes: 59 additions & 1 deletion test/plugin/test_in_kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def create_driver(conf = CONFIG)
Fluent::Test::Driver::Input.new(Fluent::KafkaInput).configure(conf)
end


def test_configure
d = create_driver
assert_equal TOPIC_NAME, d.instance.topics
Expand Down Expand Up @@ -63,4 +62,63 @@ def test_consume
assert_equal expected, d.events[0][2]
end
end

class ConsumeWithHeadersTest < self
CONFIG_TEMPLATE = %[
@type kafka
brokers localhost:9092
format text
@label @kafka
topics %<topic>s
%<conf_adds>s
].freeze

def topic_random
"kafka-input-#{SecureRandom.uuid}"
end

def kafka_test_context(conf_adds: '', topic: topic_random, conf_template: CONFIG_TEMPLATE)
kafka = Kafka.new(['localhost:9092'], client_id: 'kafka')
producer = kafka.producer(required_acks: 1)

config = format(conf_template, topic: topic, conf_adds: conf_adds)
driver = create_driver(config)

yield topic, producer, driver
ensure
kafka.delete_topic(topic)
kafka.close
end

def test_with_headers_content_merged_into_record
conf_adds = 'add_headers true'
kafka_test_context(conf_adds: conf_adds) do |topic, producer, driver|
driver.run(expect_records: 1, timeout: 5) do
producer.produce('Hello, fluent-plugin-kafka!', topic: topic, headers: { header1: 'content1' })
producer.deliver_messages
end

expected = { 'message' => 'Hello, fluent-plugin-kafka!',
'header1' => 'content1' }
assert_equal expected, driver.events[0][2]
end
end

def test_with_headers_content_merged_under_dedicated_key
conf_adds = %(
add_headers true
headers_key kafka_headers
)
kafka_test_context(conf_adds: conf_adds) do |topic, producer, driver|
driver.run(expect_records: 1, timeout: 5) do
producer.produce('Hello, fluent-plugin-kafka!', topic: topic, headers: { header1: 'content1' })
producer.deliver_messages
end

expected = { 'message' => 'Hello, fluent-plugin-kafka!',
'kafka_headers' => { 'header1' => 'content1' } }
assert_equal expected, driver.events[0][2]
end
end
end
end
70 changes: 61 additions & 9 deletions test/plugin/test_in_kafka_group.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ def create_driver(conf = CONFIG)
Fluent::Test::Driver::Input.new(Fluent::KafkaGroupInput).configure(conf)
end


def test_configure
d = create_driver
assert_equal [TOPIC_NAME], d.instance.topics
Expand All @@ -48,14 +47,6 @@ def teardown
end

def test_consume
conf = %[
@type kafka
brokers localhost:9092
format text
@label @kafka
refresh_topic_interval 0
topics #{TOPIC_NAME}
]
d = create_driver

d.run(expect_records: 1, timeout: 10) do
Expand All @@ -66,4 +57,65 @@ def test_consume
assert_equal expected, d.events[0][2]
end
end

class ConsumeWithHeadersTest < self
CONFIG_TEMPLATE = %(
@type kafka
brokers localhost:9092
consumer_group fluentd
format text
refresh_topic_interval 0
@label @kafka
topics %<topic>s
%<conf_adds>s
).freeze

def topic_random
"kafka-input-#{SecureRandom.uuid}"
end

def kafka_test_context(conf_adds: '', topic: topic_random, conf_template: CONFIG_TEMPLATE)
kafka = Kafka.new(['localhost:9092'], client_id: 'kafka')
producer = kafka.producer(required_acks: 1)

config = format(conf_template, topic: topic, conf_adds: conf_adds)
driver = create_driver(config)

yield topic, producer, driver
ensure
kafka.delete_topic(topic)
kafka.close
end

def test_with_headers_content_merged_into_record
conf_adds = 'add_headers true'
kafka_test_context(conf_adds: conf_adds) do |topic, producer, driver|
driver.run(expect_records: 1, timeout: 5) do
producer.produce('Hello, fluent-plugin-kafka!', topic: topic, headers: { header1: 'content1' })
producer.deliver_messages
end

expected = { 'message' => 'Hello, fluent-plugin-kafka!',
'header1' => 'content1' }
assert_equal expected, driver.events[0][2]
end
end

def test_with_headers_content_merged_under_dedicated_key
conf_adds = %(
add_headers true
headers_key kafka_headers
)
kafka_test_context(conf_adds: conf_adds) do |topic, producer, driver|
driver.run(expect_records: 1, timeout: 5) do
producer.produce('Hello, fluent-plugin-kafka!', topic: topic, headers: { header1: 'content1' })
producer.deliver_messages
end

expected = { 'message' => 'Hello, fluent-plugin-kafka!',
'kafka_headers' => { 'header1' => 'content1' } }
assert_equal expected, driver.events[0][2]
end
end
end
end
Loading