Skip to content

Commit

Permalink
Fix an unstable test for ack response of out_forward
Browse files Browse the repository at this point in the history
It fixes the following error:

  Error: test: a node supporting responses after stop(ForwardOutputTest): Fluent::Test::Driver::TestTimedOut: Test case timed out with hard limit.
  /home/runner/work/fluentd/fluentd/lib/fluent/test/driver/base.rb:201:in `rescue in run_actual'
  /home/runner/work/fluentd/fluentd/lib/fluent/test/driver/base.rb:196:in `run_actual'
  /home/runner/work/fluentd/fluentd/lib/fluent/test/driver/base.rb:95:in `run'
  /home/runner/work/fluentd/fluentd/lib/fluent/test/driver/base_owner.rb:130:in `run'
  /home/runner/work/fluentd/fluentd/test/plugin/test_out_forward.rb:651:in `block in <class:ForwardOutputTest>'

In addition, add a timeout and an assertion to help investigating on
failure/error cases.

Signed-off-by: Takuro Ashie <ashie@clear-code.com>
  • Loading branch information
ashie committed Apr 1, 2021
1 parent 74180e4 commit 92c2db9
Showing 1 changed file with 18 additions and 8 deletions.
26 changes: 18 additions & 8 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -587,31 +587,36 @@ def try_write(chunk)
flush_mode immediate
retry_type periodic
retry_wait 30s
flush_at_shutdown false # suppress errors in d.instance_shutdown
flush_at_shutdown true
</buffer>
])

time = event_time("2011-01-02 13:14:15 UTC")

acked_chunk_ids = []
nacked = false
mock.proxy(d.instance.ack_handler).read_ack_from_sock(anything) do |info, success|
if success
acked_chunk_ids << info.chunk_id
else
nacked = true
end
[chunk_id, success]
[info, success]
end

records = [
{"a" => 1},
{"a" => 2}
]
target_input_driver.run(expect_records: 2) do
d.end_if { acked_chunk_ids.size > 0 }
target_input_driver.run(expect_records: 2, timeout: 10) do
d.end_if { acked_chunk_ids.size > 0 || nacked }
d.run(default_tag: 'test', wait_flush_completion: false, shutdown: false) do
d.feed([[time, records[0]], [time,records[1]]])
end
end

assert(!nacked, d.instance.log.logs.join)

events = target_input_driver.events
assert_equal ['test', time, records[0]], events[0]
assert_equal ['test', time, records[1]], events[1]
Expand All @@ -630,26 +635,29 @@ def try_write(chunk)
flush_mode immediate
retry_type periodic
retry_wait 30s
flush_at_shutdown false # suppress errors in d.instance_shutdown
flush_at_shutdown true
</buffer>
])

time = event_time("2011-01-02 13:14:15 UTC")

acked_chunk_ids = []
nacked = false
mock.proxy(d.instance.ack_handler).read_ack_from_sock(anything) do |info, success|
if success
acked_chunk_ids << info.chunk_id
else
nacked = true
end
[chunk_id, success]
[info, success]
end

records = [
{"a" => 1},
{"a" => 2}
]
target_input_driver.run(expect_records: 2) do
d.end_if { acked_chunk_ids.size > 0 }
target_input_driver.run(expect_records: 2, timeout: 5) do
d.end_if { acked_chunk_ids.size > 0 || nacked }
d.run(default_tag: 'test', wait_flush_completion: false, shutdown: false) do
d.instance.stop
d.feed([[time, records[0]], [time,records[1]]])
Expand All @@ -659,6 +667,8 @@ def try_write(chunk)
end
end

assert(!nacked, d.instance.log.logs.join)

events = target_input_driver.events
assert_equal ['test', time, records[0]], events[0]
assert_equal ['test', time, records[1]], events[1]
Expand Down

0 comments on commit 92c2db9

Please sign in to comment.