Skip to content

Commit

Permalink
nested_objects implemented fixes logstash-plugins#24
Browse files Browse the repository at this point in the history
  • Loading branch information
Cactusbone committed Sep 14, 2016
1 parent 0921453 commit 8351b3b
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 13 deletions.
111 changes: 98 additions & 13 deletions lib/logstash/inputs/gelf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ class LogStash::Inputs::Gelf < LogStash::Inputs::Base
#
config :strip_leading_underscore, :validate => :boolean, :default => true

# Whether or not to process dots in fields or leave them in place
config :nested_objects, :validate => :boolean, :default => false

RECONNECT_BACKOFF_SLEEP = 5
TIMESTAMP_GELF_FIELD = "timestamp".freeze
SOURCE_HOST_FIELD = "source_host".freeze
Expand All @@ -63,12 +66,16 @@ class LogStash::Inputs::Gelf < LogStash::Inputs::Base
def initialize(params)
super
BasicSocket.do_not_reverse_lookup = true
end # def initialize
end

# def initialize

public
def register
require 'gelfd'
end # def register
end

# def register

public
def run(output_queue)
Expand All @@ -82,7 +89,9 @@ def run(output_queue)
retry unless stop?
end
end # begin
end # def run
end

# def run

public
def stop
Expand Down Expand Up @@ -115,11 +124,14 @@ def udp_listener(output_queue)

remap_gelf(event) if @remap
strip_leading_underscore(event) if @strip_leading_underscore
nested_objects(event) if @nested_objects
decorate(event)

output_queue << event
end
end # def udp_listener
end

# def udp_listener

# generate a new LogStash::Event from json input and assign host to source_host event field.
# @param json_gelf [String] GELF json data
Expand Down Expand Up @@ -156,7 +168,9 @@ def self.from_json_parse(json)
rescue LogStash::Json::ParserError => e
logger.error(PARSE_FAILURE_LOG_MESSAGE, :error => e, :data => json)
LogStash::Event.new(MESSAGE_FIELD => json, TAGS_FIELD => [PARSE_FAILURE_TAG, '_fromjsonparser'])
end # def self.from_json_parse
end

# def self.from_json_parse

# legacy_parse uses the LogStash::Json class to deserialize json
def self.legacy_parse(json)
Expand All @@ -165,7 +179,9 @@ def self.legacy_parse(json)
rescue LogStash::Json::ParserError => e
logger.error(PARSE_FAILURE_LOG_MESSAGE, :error => e, :data => json)
LogStash::Event.new(MESSAGE_FIELD => json, TAGS_FIELD => [PARSE_FAILURE_TAG, '_legacyjsonparser'])
end # def self.parse
end

# def self.parse

# keep compatibility with all v2.x distributions. only in 2.3 will the Event#from_json method be introduced
# and we need to keep compatibility for all v2 releases.
Expand All @@ -185,15 +201,84 @@ def remap_gelf(event)
event.set("message", event.get("short_message").dup)
event.remove("short_message")
end
end # def remap_gelf
end

# def remap_gelf

private
def strip_leading_underscore(event)
# Map all '_foo' fields to simply 'foo'
event.to_hash.keys.each do |key|
next unless key[0,1] == "_"
# Map all '_foo' fields to simply 'foo'
event.to_hash.keys.each do |key|
next unless key[0, 1] == "_"
event.set(key[1..-1], event.get(key))
event.remove(key)
end
end # deef removing_leading_underscores
event.remove(key)
end
end

# deef removing_leading_underscores

private
def nested_objects(event)
# process nested, create objects as needed, when key is 0, create an array. if object already exists and is an array push it.
base_target=event.to_hash
base_target.keys.each do |key|
next unless key.include? "."
value = event.get(key)
previous_key = nil
first_key=nil
target = base_target

key.split(".").each do |subKey|
if previous_key.nil?
first_key=subKey
else#skip first subKey
if !container_has_element?(target, previous_key)
if subKey =~ /^\d+$/
set_container_element(target, previous_key, Array.new)
else
set_container_element(target, previous_key, Hash.new)
end
end
target = get_container_element(target, previous_key)
end
previous_key = subKey
end
set_container_element(target, previous_key, value)
event.remove(key)
event.set(first_key, base_target[first_key])
end
end

private
def get_container_element(container, key)
if container.is_a?(Array)
container[Integer(key)]
elsif container.is_a?(Hash)
container[key]
else #Event
raise "not an array or hash"
end
end

private
def set_container_element(container, key, value)
if container.is_a?(Array)
container[Integer(key)] = value
elsif container.is_a?(Hash)
container[key] = value
else #Event
raise "not an array or hash"
end
end

private
def container_has_element?(container, key)
if container.is_a?(Array)
!container[Integer(key)].nil?
elsif container.is_a?(Hash)
container.key?(key)
else
raise "not an array or hash"
end
end
end # class LogStash::Inputs::Gelf
49 changes: 49 additions & 0 deletions spec/inputs/gelf_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,55 @@
end
end

it "reads nested gelf messages " do
port = 12210
host = "127.0.0.1"
chunksize = 1420
gelfclient = GELF::Notifier.new(host, port, chunksize)

conf = <<-CONFIG
input {
gelf {
port => "#{port}"
host => "#{host}"
nested_objects => true
}
}
CONFIG

result = input(conf) do |pipeline, queue|
# send a first message until plugin is up and receives it
while queue.size <= 0
gelfclient.notify!("short_message" => "prime")
sleep(0.1)
end
gelfclient.notify!("short_message" => "start")

e = queue.pop
while (e.get("message") != "start")
e = queue.pop
end

gelfclient.notify!({
"short_message" => "test nested",
"_toto.titi" => "objectValue",
"_foo.0" => "first",
"_foo.1" => "second",
"_ca.0.titi" => "1",
"_ca.1.titi" => "2",
})

queue.pop
end

insist { result.get("message") } == "test nested"
insist { result.get("toto")["titi"] } == "objectValue"
insist { result.get("foo") } == ["first", "second"]
insist { result.get("ca")[0]["titi"] } == "1"
insist { result.get("ca")[1]["titi"] } == "2"
insist { result.get("host") } == Socket.gethostname
end

context "timestamp coercion" do
# these test private methods, this is advisable for now until we roll out this coercion in the Timestamp class
# and remove this
Expand Down

0 comments on commit 8351b3b

Please sign in to comment.