Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle both of stat and timer watchers are enabled and steady growing file case #3364

Merged
merged 7 commits into from
May 14, 2021
35 changes: 18 additions & 17 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,8 @@ def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:,
@io = nil
@notify_mutex = Mutex.new
@log = log
@start_reading_time = nil
@number_bytes_read = 0

@log.info "following tail of #{@path}"
end
Expand All @@ -929,37 +931,36 @@ def opened?

private

def read_bytes_limits_reached?(number_bytes_read)
number_bytes_read >= @read_bytes_limit_per_second && @read_bytes_limit_per_second > 0
end

def limit_bytes_per_second_reached?(start_reading, number_bytes_read)
return false unless read_bytes_limits_reached?(number_bytes_read)
def limit_bytes_per_second_reached?
return false if @read_bytes_limit_per_second < 0 # not enabled by conf
return false if @number_bytes_read < @read_bytes_limit_per_second

# sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion
time_spent_reading = Fluent::Clock.now - start_reading
@start_reading_time ||= Fluent::Clock.now
time_spent_reading = Fluent::Clock.now - @start_reading_time
@log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}")

if time_spent_reading < 1
needed_sleeping_time = 1 - time_spent_reading
@log.debug("log ingestion for `#{@path}' is suspended temporary. Read it again later.")
return true
true
else
@start_reading_time = nil
@number_bytes_read = 0
false
end

false
end

def handle_notify
return if limit_bytes_per_second_reached?

with_io do |io|
cosmo0920 marked this conversation as resolved.
Show resolved Hide resolved
begin
number_bytes_read = 0
start_reading = Fluent::Clock.now
read_more = false

if !io.nil? && @lines.empty?
begin
while true
@start_reading_time ||= Fluent::Clock.now
data = io.readpartial(BYTES_TO_READ, @iobuf)
number_bytes_read += data.bytesize
@number_bytes_read += data.bytesize
@fifo << data
@fifo.read_lines(@lines)

Expand All @@ -969,7 +970,7 @@ def handle_notify
read_more = true
break
end
if limit_bytes_per_second_reached?(start_reading, number_bytes_read)
if limit_bytes_per_second_reached?
# Just get out from tailing loop.
read_more = false
break
Expand Down
143 changes: 102 additions & 41 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -345,53 +345,114 @@ def test_emit_with_read_lines_limit(data)
cleanup_file("#{TMP_DIR}/tail.txt")
end

data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2],
"flat 8192 bytes, 2 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 2],
"flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20],
"flat #{8192*10} bytes, 20 events w/o stat watcher" => [:flat_without_stat, 100, (8192 * 10), 20],
"parse #{8192*4} bytes, 8 events" => [:parse, 100, (8192 * 4), 8],
"parse #{8192*4} bytes, 8 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 4), 8],
"parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20],
"parse #{8192*10} bytes, 20 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 10), 20],
"flat 8k bytes with unit, 2 events" => [:flat, 100, "8k", 2],
"flat 8k bytes with unit, 2 events w/o stat watcher" => [:flat_without_stat, 100, "8k", 2],
"flat #{8*10}k bytes with unit, 20 events" => [:flat, 100, "#{8*10}k", 20],
"flat #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:flat_without_stat, 100, "#{8*10}k", 20],
"parse #{8*4}k bytes with unit, 8 events" => [:parse, 100, "#{8*4}k", 8],
"parse #{8*4}k bytes with unit, 8 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*4}k", 8],
"parse #{8*10}k bytes with unit, 20 events" => [:parse, 100, "#{8*10}k", 20],
"parse #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*10}k", 20])
def test_emit_with_read_bytes_limit_per_second(data)
config_style, limit, limit_bytes, num_events = data
case config_style
when :flat
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes })
when :parse
config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
when :flat_without_stat
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes })
when :parse_without_stat
config = CONFIG_READ_FROM_HEAD + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
sub_test_case "reads_bytes_per_second w/o throttled" do
data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2, false],
"flat 8192 bytes, 2 events already read limit reached" => [:flat, 100, 8192, 2, true],
"flat 8192 bytes, 2 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 2, false],
"flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20],
"flat #{8192*10} bytes, 20 events w/o stat watcher" => [:flat_without_stat, 100, (8192 * 10), 20],
"parse #{8192*4} bytes, 8 events" => [:parse, 100, (8192 * 4), 8],
"parse #{8192*4} bytes, 8 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 4), 8],
"parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20],
"parse #{8192*10} bytes, 20 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 10), 20],
"flat 8k bytes with unit, 2 events" => [:flat, 100, "8k", 2],
"flat 8k bytes with unit, 2 events w/o stat watcher" => [:flat_without_stat, 100, "8k", 2],
"flat #{8*10}k bytes with unit, 20 events" => [:flat, 100, "#{8*10}k", 20],
"flat #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:flat_without_stat, 100, "#{8*10}k", 20],
"parse #{8*4}k bytes with unit, 8 events" => [:parse, 100, "#{8*4}k", 8],
"parse #{8*4}k bytes with unit, 8 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*4}k", 8],
"parse #{8*10}k bytes with unit, 20 events" => [:parse, 100, "#{8*10}k", 20],
"parse #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*10}k", 20])
def test_emit_with_read_bytes_limit_per_second(data)
config_style, limit, limit_bytes, num_events = data
case config_style
when :flat
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes })
when :parse
config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
when :flat_without_stat
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes })
when :parse_without_stat
config = CONFIG_READ_FROM_HEAD + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
end
d = create_driver(config)
msg = 'test' * 2000 # in_tail reads 8192 bytes at once.

start_time = Fluent::Clock.now

# We should not do shutdown here due to hard timeout.
d.run(expect_emits: 2, shutdown: false) do
File.open("#{TMP_DIR}/tail.txt", "ab") {|f|
100.times do
f.puts msg
end
}
end

assert_true(Fluent::Clock.now - start_time > 1)
assert_equal(num_events.times.map { {"message" => msg} },
d.events.collect { |event| event[2] })

# Teardown in_tail plugin instance here.
d.instance.shutdown
end
d = create_driver(config)
msg = 'test' * 2000 # in_tail reads 8192 bytes at once.
end

sub_test_case "reads_bytes_per_second w/ throttled already" do
data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2],
"flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20],
"parse #{8192*4} bytes, 8 events" => [:parse, 100, (8192 * 4), 8],
"parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20],
"flat 8k bytes with unit, 2 events" => [:flat, 100, "8k", 2],
"flat #{8*10}k bytes with unit, 20 events" => [:flat, 100, "#{8*10}k", 20],
"parse #{8*4}k bytes with unit, 8 events" => [:parse, 100, "#{8*4}k", 8],
"parse #{8*10}k bytes with unit, 20 events" => [:parse, 100, "#{8*10}k", 20])
def test_emit_with_read_bytes_limit_per_second(data)
config_style, limit, limit_bytes, num_events = data
case config_style
when :flat
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes })
when :parse
config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
end
d = create_driver(config)
msg = 'test' * 2000 # in_tail reads 8192 bytes at once.

mock.proxy(d.instance).io_handler(anything, anything) do |io_handler|
require 'fluent/config/types'
limit_bytes_value = Fluent::Config.size_value(limit_bytes)
io_handler.instance_variable_set(:@number_bytes_read, limit_bytes_value)
if Fluent.linux?
mock.proxy(io_handler).handle_notify.at_least(5)
else
mock.proxy(io_handler).handle_notify.twice
end
io_handler
end

# We should not do shutdown here due to hard timeout.
d.run(expect_emits: 2, shutdown: false) do
File.open("#{TMP_DIR}/tail.txt", "ab") {|f|
for _x in 0..30
File.open("#{TMP_DIR}/tail.txt", "ab") do |f|
100.times do
f.puts msg
end
}
end
end

events = d.events
assert_true(events.length <= num_events)
assert_equal({"message" => msg}, events[0][2])
assert_equal({"message" => msg}, events[1][2])
# We should not do shutdown here due to hard timeout.
d.run(shutdown: false) do
start_time = Fluent::Clock.now
while Fluent::Clock.now - start_time < 0.8 do
File.open("#{TMP_DIR}/tail.txt", "ab") do |f|
f.puts msg
f.flush
end
sleep 0.05
end
end

# Teardown in_tail plugin instance here.
d.instance.shutdown
assert_equal([], d.events)

# Teardown in_tail plugin instance here.
d.instance.shutdown
end
end
end

Expand Down