Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Thread::Mutex to resolve race condition #38

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 33 additions & 11 deletions lib/fluent/plugin/filter_concat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def initialize

@buffer = Hash.new {|h, k| h[k] = [] }
@timeout_map = Hash.new {|h, k| h[k] = Fluent::Engine.now }
@mutex = Thread::Mutex.new
end

def configure(conf)
Expand Down Expand Up @@ -122,9 +123,13 @@ def process(tag, time, record)

def process_line(stream_identity, tag, time, record)
new_es = Fluent::MultiEventStream.new
@buffer[stream_identity] << [tag, time, record]
@mutex.synchronize do
@buffer[stream_identity] << [tag, time, record]
end
if @buffer[stream_identity].size >= @n_lines
new_time, new_record = flush_buffer(stream_identity)
new_time, new_record = @mutex.synchronize do
flush_buffer(stream_identity)
end
time = new_time if @use_first_timestamp
new_es.add(time, new_record)
end
Expand All @@ -136,26 +141,36 @@ def process_regexp(stream_identity, tag, time, record)
case
when firstline?(record[@key])
if @buffer[stream_identity].empty?
@buffer[stream_identity] << [tag, time, record]
@mutex.synchronize do
@buffer[stream_identity] << [tag, time, record]
end
if lastline?(record[@key])
new_time, new_record = flush_buffer(stream_identity)
new_time, new_record = @mutex.synchronize do
flush_buffer(stream_identity)
end
time = new_time if @use_first_timestamp
new_es.add(time, new_record)
end
else
new_time, new_record = flush_buffer(stream_identity, [tag, time, record])
new_time, new_record = @mutex.synchronize do
flush_buffer(stream_identity, [tag, time, record])
end
time = new_time if @use_first_timestamp
new_es.add(time, new_record)
if lastline?(record[@key])
new_time, new_record = flush_buffer(stream_identity)
new_time, new_record = @mutex.synchronize do
flush_buffer(stream_identity)
end
time = new_time if @use_first_timestamp
new_es.add(time, new_record)
end
return new_es
end
when lastline?(record[@key])
@buffer[stream_identity] << [tag, time, record]
new_time, new_record = flush_buffer(stream_identity)
@mutex.synchronize do
@buffer[stream_identity] << [tag, time, record]
new_time, new_record = flush_buffer(stream_identity)
end
time = new_time if @use_first_timestamp
new_es.add(time, new_record)
return new_es
Expand All @@ -166,9 +181,13 @@ def process_regexp(stream_identity, tag, time, record)
else
if continuous_line?(record[@key])
# Continuation of the previous line
@buffer[stream_identity] << [tag, time, record]
@mutex.synchronize do
@buffer[stream_identity] << [tag, time, record]
end
else
new_time, new_record = flush_buffer(stream_identity)
new_time, new_record = @mutex.synchronize do
flush_buffer(stream_identity)
end
time = new_time if @use_first_timestamp
new_es.add(time, new_record)
new_es.add(time, record)
Expand Down Expand Up @@ -211,7 +230,10 @@ def flush_timeout_buffer
@timeout_map.each do |stream_identity, previous_timestamp|
next if @flush_interval > (now - previous_timestamp)
next if @buffer[stream_identity].empty?
time, flushed_record = flush_buffer(stream_identity)
next if @mutex.locked?
time, flushed_record = @mutex.synchronize do
flush_buffer(stream_identity)
end
timeout_stream_identities << stream_identity
tag = stream_identity.split(":").first
message = "Timeout flush: #{stream_identity}"
Expand Down