Skip to content

Commit

Permalink
Merge pull request #3379 from ashie/fix-read_bytes_limit
Browse files Browse the repository at this point in the history
in_tail: read_bytes_limit_per_second should precede read_lines_limit
  • Loading branch information
ashie committed May 19, 2021
2 parents ad523b2 + ffe6e58 commit b6ba1c2
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 15 deletions.
10 changes: 5 additions & 5 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -965,16 +965,16 @@ def handle_notify
@fifo.read_lines(@lines)

@log.debug("reading file: #{@path}")
if @lines.size >= @read_lines_limit
# not to use too much memory in case the file is very large
read_more = true
break
end
if limit_bytes_per_second_reached?
# Just get out from tailing loop.
read_more = false
break
end
if @lines.size >= @read_lines_limit
# not to use too much memory in case the file is very large
read_more = true
break
end
end
rescue EOFError
end
Expand Down
36 changes: 26 additions & 10 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -375,13 +375,12 @@ def test_emit_with_read_bytes_limit_per_second(data)
when :parse_without_stat
config = CONFIG_READ_FROM_HEAD + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
end
d = create_driver(config)
msg = 'test' * 2000 # in_tail reads 8192 bytes at once.

msg = 'test' * 2000 # in_tail reads 8192 bytes at once.
start_time = Fluent::Clock.now

# We should not do shutdown here due to hard timeout.
d.run(expect_emits: 2, shutdown: false) do
d = create_driver(config)
d.run(expect_emits: 2) do
File.open("#{TMP_DIR}/tail.txt", "ab") {|f|
100.times do
f.puts msg
Expand All @@ -392,9 +391,29 @@ def test_emit_with_read_bytes_limit_per_second(data)
assert_true(Fluent::Clock.now - start_time > 1)
assert_equal(num_events.times.map { {"message" => msg} },
d.events.collect { |event| event[2] })
end

# Teardown in_tail plugin instance here.
d.instance.shutdown
def test_read_bytes_limit_precede_read_lines_limit
config = CONFIG_READ_FROM_HEAD +
SINGLE_LINE_CONFIG +
config_element("", "", {
"read_lines_limit" => 1000,
"read_bytes_limit_per_second" => 8192
})
msg = 'abc'
start_time = Fluent::Clock.now
d = create_driver(config)
d.run(expect_emits: 2) do
File.open("#{TMP_DIR}/tail.txt", "ab") {|f|
8000.times do
f.puts msg
end
}
end

assert_true(Fluent::Clock.now - start_time > 1)
assert_equal(4096.times.map { {"message" => msg} },
d.events.collect { |event| event[2] })
end
end

Expand Down Expand Up @@ -437,7 +456,7 @@ def test_emit_with_read_bytes_limit_per_second(data)
end

# We should not do shutdown here due to hard timeout.
d.run(shutdown: false) do
d.run do
start_time = Fluent::Clock.now
while Fluent::Clock.now - start_time < 0.8 do
File.open("#{TMP_DIR}/tail.txt", "ab") do |f|
Expand All @@ -449,9 +468,6 @@ def test_emit_with_read_bytes_limit_per_second(data)
end

assert_equal([], d.events)

# Teardown in_tail plugin instance here.
d.instance.shutdown
end
end
end
Expand Down

0 comments on commit b6ba1c2

Please sign in to comment.