From 8351b3b4dfc3e873c690d5dca5528ae901bb29d8 Mon Sep 17 00:00:00 2001 From: Charly Koza Date: Wed, 14 Sep 2016 15:21:35 +0200 Subject: [PATCH] nested_objects implemented fixes https://github.com/logstash-plugins/logstash-input-gelf/issues/24 --- lib/logstash/inputs/gelf.rb | 111 +++++++++++++++++++++++++++++++----- spec/inputs/gelf_spec.rb | 49 ++++++++++++++++ 2 files changed, 147 insertions(+), 13 deletions(-) diff --git a/lib/logstash/inputs/gelf.rb b/lib/logstash/inputs/gelf.rb index ec9443d..c1dd7fb 100644 --- a/lib/logstash/inputs/gelf.rb +++ b/lib/logstash/inputs/gelf.rb @@ -51,6 +51,9 @@ class LogStash::Inputs::Gelf < LogStash::Inputs::Base # config :strip_leading_underscore, :validate => :boolean, :default => true + # Whether or not to process dots in fields or leave them in place + config :nested_objects, :validate => :boolean, :default => false + RECONNECT_BACKOFF_SLEEP = 5 TIMESTAMP_GELF_FIELD = "timestamp".freeze SOURCE_HOST_FIELD = "source_host".freeze @@ -63,12 +66,16 @@ class LogStash::Inputs::Gelf < LogStash::Inputs::Base def initialize(params) super BasicSocket.do_not_reverse_lookup = true - end # def initialize + end + + # def initialize public def register require 'gelfd' - end # def register + end + + # def register public def run(output_queue) @@ -82,7 +89,9 @@ def run(output_queue) retry unless stop? end end # begin - end # def run + end + + # def run public def stop @@ -115,11 +124,14 @@ def udp_listener(output_queue) remap_gelf(event) if @remap strip_leading_underscore(event) if @strip_leading_underscore + nested_objects(event) if @nested_objects decorate(event) output_queue << event end - end # def udp_listener + end + + # def udp_listener # generate a new LogStash::Event from json input and assign host to source_host event field. # @param json_gelf [String] GELF json data @@ -156,7 +168,9 @@ def self.from_json_parse(json) rescue LogStash::Json::ParserError => e logger.error(PARSE_FAILURE_LOG_MESSAGE, :error => e, :data => json) LogStash::Event.new(MESSAGE_FIELD => json, TAGS_FIELD => [PARSE_FAILURE_TAG, '_fromjsonparser']) - end # def self.from_json_parse + end + + # def self.from_json_parse # legacy_parse uses the LogStash::Json class to deserialize json def self.legacy_parse(json) @@ -165,7 +179,9 @@ def self.legacy_parse(json) rescue LogStash::Json::ParserError => e logger.error(PARSE_FAILURE_LOG_MESSAGE, :error => e, :data => json) LogStash::Event.new(MESSAGE_FIELD => json, TAGS_FIELD => [PARSE_FAILURE_TAG, '_legacyjsonparser']) - end # def self.parse + end + + # def self.parse # keep compatibility with all v2.x distributions. only in 2.3 will the Event#from_json method be introduced # and we need to keep compatibility for all v2 releases. @@ -185,15 +201,84 @@ def remap_gelf(event) event.set("message", event.get("short_message").dup) event.remove("short_message") end - end # def remap_gelf + end + + # def remap_gelf private def strip_leading_underscore(event) - # Map all '_foo' fields to simply 'foo' - event.to_hash.keys.each do |key| - next unless key[0,1] == "_" + # Map all '_foo' fields to simply 'foo' + event.to_hash.keys.each do |key| + next unless key[0, 1] == "_" event.set(key[1..-1], event.get(key)) - event.remove(key) - end - end # deef removing_leading_underscores + event.remove(key) + end + end + + # deef removing_leading_underscores + + private + def nested_objects(event) + # process nested, create objects as needed, when key is 0, create an array. if object already exists and is an array push it. + base_target=event.to_hash + base_target.keys.each do |key| + next unless key.include? "." + value = event.get(key) + previous_key = nil + first_key=nil + target = base_target + + key.split(".").each do |subKey| + if previous_key.nil? + first_key=subKey + else#skip first subKey + if !container_has_element?(target, previous_key) + if subKey =~ /^\d+$/ + set_container_element(target, previous_key, Array.new) + else + set_container_element(target, previous_key, Hash.new) + end + end + target = get_container_element(target, previous_key) + end + previous_key = subKey + end + set_container_element(target, previous_key, value) + event.remove(key) + event.set(first_key, base_target[first_key]) + end + end + + private + def get_container_element(container, key) + if container.is_a?(Array) + container[Integer(key)] + elsif container.is_a?(Hash) + container[key] + else #Event + raise "not an array or hash" + end + end + + private + def set_container_element(container, key, value) + if container.is_a?(Array) + container[Integer(key)] = value + elsif container.is_a?(Hash) + container[key] = value + else #Event + raise "not an array or hash" + end + end + + private + def container_has_element?(container, key) + if container.is_a?(Array) + !container[Integer(key)].nil? + elsif container.is_a?(Hash) + container.key?(key) + else + raise "not an array or hash" + end + end end # class LogStash::Inputs::Gelf diff --git a/spec/inputs/gelf_spec.rb b/spec/inputs/gelf_spec.rb index 2eec09b..10d6a53 100644 --- a/spec/inputs/gelf_spec.rb +++ b/spec/inputs/gelf_spec.rb @@ -70,6 +70,55 @@ end end + it "reads nested gelf messages " do + port = 12210 + host = "127.0.0.1" + chunksize = 1420 + gelfclient = GELF::Notifier.new(host, port, chunksize) + + conf = <<-CONFIG + input { + gelf { + port => "#{port}" + host => "#{host}" + nested_objects => true + } + } + CONFIG + + result = input(conf) do |pipeline, queue| + # send a first message until plugin is up and receives it + while queue.size <= 0 + gelfclient.notify!("short_message" => "prime") + sleep(0.1) + end + gelfclient.notify!("short_message" => "start") + + e = queue.pop + while (e.get("message") != "start") + e = queue.pop + end + + gelfclient.notify!({ + "short_message" => "test nested", + "_toto.titi" => "objectValue", + "_foo.0" => "first", + "_foo.1" => "second", + "_ca.0.titi" => "1", + "_ca.1.titi" => "2", + }) + + queue.pop + end + + insist { result.get("message") } == "test nested" + insist { result.get("toto")["titi"] } == "objectValue" + insist { result.get("foo") } == ["first", "second"] + insist { result.get("ca")[0]["titi"] } == "1" + insist { result.get("ca")[1]["titi"] } == "2" + insist { result.get("host") } == Socket.gethostname + end + context "timestamp coercion" do # these test private methods, this is advisable for now until we roll out this coercion in the Timestamp class # and remove this