Skip to content

Fix timeout logs #121

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions lib/fluent/plugin/filter_concat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,10 @@ class TimeoutError < StandardError

def initialize
super

@buffer = Hash.new {|h, k| h[k] = [] }
@timeout_map_mutex = Thread::Mutex.new
@timeout_map_mutex.synchronize do
@timeout_map = Hash.new {|h, k| h[k] = Fluent::Engine.now }
@timeout_map = Hash.new {|h, k| h[k] = time_event_now }
end
end

Expand Down Expand Up @@ -241,7 +240,7 @@ def process(tag, time, record)
end
end
@timeout_map_mutex.synchronize do
@timeout_map[stream_identity] = Fluent::Engine.now
@timeout_map[stream_identity] = time_event_now
end
case @mode
when :line
Expand Down Expand Up @@ -389,7 +388,7 @@ def flush_buffer(stream_identity, new_element = nil)
end

def flush_timeout_buffer
now = Fluent::Engine.now
now = time_event_now
timeout_stream_identities = []
@timeout_map_mutex.synchronize do
@timeout_map.each do |stream_identity, previous_timestamp|
Expand Down Expand Up @@ -432,5 +431,11 @@ def handle_timeout_error(tag, time, record, message)
router.emit_error_event(tag, time, record, TimeoutError.new(message))
end
end

def time_event_now
now = Process.clock_gettime(Process::CLOCK_REALTIME, :nanosecond)
Fluent::EventTime.new(now / 1_000_000_000, now % 1_000_000_000)
end

end
end
54 changes: 51 additions & 3 deletions test/plugin/test_filter_concat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
class FilterConcatTest < Test::Unit::TestCase
def setup
Fluent::Test.setup
@time = Fluent::Engine.now
@time = time_event_now
end

CONFIG = %[
Expand Down Expand Up @@ -34,13 +34,22 @@ def filter_with_time(conf, messages, wait: nil)
d.run(default_tag: "test") do
sleep 0.1 # run event loop
messages.each do |time, message|
now = time_event_now
if now < time
sleep time-now
end
d.feed(time, message)
end
sleep wait if wait
end
d.filtered
end

def time_event_now
now = Process.clock_gettime(Process::CLOCK_REALTIME, :nanosecond)
Fluent::EventTime.new(now / 1_000_000_000, now % 1_000_000_000)
end

sub_test_case "config" do
test "empty" do
assert_raise(Fluent::ConfigError.new("'key' parameter is required")) do
Expand Down Expand Up @@ -242,6 +251,43 @@ def filter_with_time(conf, messages, wait: nil)
assert_equal([], filtered)
end

test "timeout with timeout_label for multiline start regex" do
config = <<-CONFIG
key message
multiline_start_regexp /^start/
flush_interval 1s
timeout_label @TIMEOUT
CONFIG
wait = 3
delay_message_4_to_5 = 3
delay_message_5_to_6 = 1

messages = [
[@time, { "container_id" => "1", "message" => "start" }],
[@time, { "container_id" => "1", "message" => " message 1" }],
[@time, { "container_id" => "1", "message" => " message 2" }],
[@time, { "container_id" => "1", "message" => "starting" }],
[@time + delay_message_4_to_5, { "container_id" => "1", "message" => " message 3" }],
[@time + delay_message_4_to_5 + delay_message_5_to_6 , { "container_id" => "1", "message" => " message 4" }],
]

filtered = filter_with_time(config, messages, wait: wait) do |d|
errored = { "container_id" => "1", "message" => "starting" }
event_router = mock(Object.new).emit("test", anything, errored)
mock(Fluent::Test::Driver::TestEventRouter).new(anything) { event_router }
#commented out due to timing inconsistency
#stub.proxy(d.instance).flush_timeout_buffer.times(wait + delay_message_4_to_5 + delay_message_5_to_6)
stub.proxy(d.instance).handle_timeout_error.times(1)

end
expected = [
[@time, { "container_id" => "1", "message" => "start\n message 1\n message 2" }],
[@time + 3, { "container_id" => "1", "message" => " message 3" }],
[@time + 4, { "container_id" => "1", "message" => " message 4" }],
]
assert_equal(expected, filtered)
end

test "no timeout" do
messages = [
{ "container_id" => "1", "message" => "message 1" },
Expand Down Expand Up @@ -975,11 +1021,13 @@ def filter_with_time(conf, messages, wait: nil)
[@time + 2, { "container_id" => "1", "message" => " message 4" }],
]
filtered = filter_with_time(config, messages, wait: 3) do |d|
errored = { "container_id" => "1", "message" => "start\n message 3\n message 4" }
errored = { "container_id" => "1", "message" => "start" }
mock(d.instance.router).emit_error_event("test", @time, errored, anything)
end
expected = [
[@time, { "container_id" => "1", "message" => "start\n message 1\n message 2" }]
[@time, { "container_id" => "1", "message" => "start\n message 1\n message 2" }],
[@time + 1, { "container_id" => "1", "message" => " message 3" }],
[@time + 2, { "container_id" => "1", "message" => " message 4" }],
]
assert_equal(expected, filtered)
end
Expand Down