Skip to content

Commit

Permalink
Merge pull request #3364 from fluent/follow-up-log-throttling-per-fil…
Browse files Browse the repository at this point in the history
…e-feature

Handle both of stat and timer watchers are enabled and steady growing file case
  • Loading branch information
ashie committed May 14, 2021
2 parents 5c33add + d4d0118 commit e2e51d1
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 58 deletions.
35 changes: 18 additions & 17 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,8 @@ def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:,
@io = nil
@notify_mutex = Mutex.new
@log = log
@start_reading_time = nil
@number_bytes_read = 0

@log.info "following tail of #{@path}"
end
Expand All @@ -929,37 +931,36 @@ def opened?

private

def read_bytes_limits_reached?(number_bytes_read)
number_bytes_read >= @read_bytes_limit_per_second && @read_bytes_limit_per_second > 0
end

def limit_bytes_per_second_reached?(start_reading, number_bytes_read)
return false unless read_bytes_limits_reached?(number_bytes_read)
def limit_bytes_per_second_reached?
return false if @read_bytes_limit_per_second < 0 # not enabled by conf
return false if @number_bytes_read < @read_bytes_limit_per_second

# sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion
time_spent_reading = Fluent::Clock.now - start_reading
@start_reading_time ||= Fluent::Clock.now
time_spent_reading = Fluent::Clock.now - @start_reading_time
@log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}")

if time_spent_reading < 1
needed_sleeping_time = 1 - time_spent_reading
@log.debug("log ingestion for `#{@path}' is suspended temporary. Read it again later.")
return true
true
else
@start_reading_time = nil
@number_bytes_read = 0
false
end

false
end

def handle_notify
return if limit_bytes_per_second_reached?

with_io do |io|
begin
number_bytes_read = 0
start_reading = Fluent::Clock.now
read_more = false

if !io.nil? && @lines.empty?
begin
while true
@start_reading_time ||= Fluent::Clock.now
data = io.readpartial(BYTES_TO_READ, @iobuf)
number_bytes_read += data.bytesize
@number_bytes_read += data.bytesize
@fifo << data
@fifo.read_lines(@lines)

Expand All @@ -969,7 +970,7 @@ def handle_notify
read_more = true
break
end
if limit_bytes_per_second_reached?(start_reading, number_bytes_read)
if limit_bytes_per_second_reached?
# Just get out from tailing loop.
read_more = false
break
Expand Down
143 changes: 102 additions & 41 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -345,53 +345,114 @@ def test_emit_with_read_lines_limit(data)
cleanup_file("#{TMP_DIR}/tail.txt")
end

data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2],
"flat 8192 bytes, 2 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 2],
"flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20],
"flat #{8192*10} bytes, 20 events w/o stat watcher" => [:flat_without_stat, 100, (8192 * 10), 20],
"parse #{8192*4} bytes, 8 events" => [:parse, 100, (8192 * 4), 8],
"parse #{8192*4} bytes, 8 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 4), 8],
"parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20],
"parse #{8192*10} bytes, 20 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 10), 20],
"flat 8k bytes with unit, 2 events" => [:flat, 100, "8k", 2],
"flat 8k bytes with unit, 2 events w/o stat watcher" => [:flat_without_stat, 100, "8k", 2],
"flat #{8*10}k bytes with unit, 20 events" => [:flat, 100, "#{8*10}k", 20],
"flat #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:flat_without_stat, 100, "#{8*10}k", 20],
"parse #{8*4}k bytes with unit, 8 events" => [:parse, 100, "#{8*4}k", 8],
"parse #{8*4}k bytes with unit, 8 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*4}k", 8],
"parse #{8*10}k bytes with unit, 20 events" => [:parse, 100, "#{8*10}k", 20],
"parse #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*10}k", 20])
def test_emit_with_read_bytes_limit_per_second(data)
config_style, limit, limit_bytes, num_events = data
case config_style
when :flat
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes })
when :parse
config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
when :flat_without_stat
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes })
when :parse_without_stat
config = CONFIG_READ_FROM_HEAD + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
sub_test_case "reads_bytes_per_second w/o throttled" do
data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2, false],
"flat 8192 bytes, 2 events already read limit reached" => [:flat, 100, 8192, 2, true],
"flat 8192 bytes, 2 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 2, false],
"flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20],
"flat #{8192*10} bytes, 20 events w/o stat watcher" => [:flat_without_stat, 100, (8192 * 10), 20],
"parse #{8192*4} bytes, 8 events" => [:parse, 100, (8192 * 4), 8],
"parse #{8192*4} bytes, 8 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 4), 8],
"parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20],
"parse #{8192*10} bytes, 20 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 10), 20],
"flat 8k bytes with unit, 2 events" => [:flat, 100, "8k", 2],
"flat 8k bytes with unit, 2 events w/o stat watcher" => [:flat_without_stat, 100, "8k", 2],
"flat #{8*10}k bytes with unit, 20 events" => [:flat, 100, "#{8*10}k", 20],
"flat #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:flat_without_stat, 100, "#{8*10}k", 20],
"parse #{8*4}k bytes with unit, 8 events" => [:parse, 100, "#{8*4}k", 8],
"parse #{8*4}k bytes with unit, 8 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*4}k", 8],
"parse #{8*10}k bytes with unit, 20 events" => [:parse, 100, "#{8*10}k", 20],
"parse #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*10}k", 20])
def test_emit_with_read_bytes_limit_per_second(data)
config_style, limit, limit_bytes, num_events = data
case config_style
when :flat
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes })
when :parse
config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
when :flat_without_stat
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes })
when :parse_without_stat
config = CONFIG_READ_FROM_HEAD + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
end
d = create_driver(config)
msg = 'test' * 2000 # in_tail reads 8192 bytes at once.

start_time = Fluent::Clock.now

# We should not do shutdown here due to hard timeout.
d.run(expect_emits: 2, shutdown: false) do
File.open("#{TMP_DIR}/tail.txt", "ab") {|f|
100.times do
f.puts msg
end
}
end

assert_true(Fluent::Clock.now - start_time > 1)
assert_equal(num_events.times.map { {"message" => msg} },
d.events.collect { |event| event[2] })

# Teardown in_tail plugin instance here.
d.instance.shutdown
end
d = create_driver(config)
msg = 'test' * 2000 # in_tail reads 8192 bytes at once.
end

sub_test_case "reads_bytes_per_second w/ throttled already" do
data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2],
"flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20],
"parse #{8192*4} bytes, 8 events" => [:parse, 100, (8192 * 4), 8],
"parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20],
"flat 8k bytes with unit, 2 events" => [:flat, 100, "8k", 2],
"flat #{8*10}k bytes with unit, 20 events" => [:flat, 100, "#{8*10}k", 20],
"parse #{8*4}k bytes with unit, 8 events" => [:parse, 100, "#{8*4}k", 8],
"parse #{8*10}k bytes with unit, 20 events" => [:parse, 100, "#{8*10}k", 20])
def test_emit_with_read_bytes_limit_per_second(data)
config_style, limit, limit_bytes, num_events = data
case config_style
when :flat
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes })
when :parse
config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
end
d = create_driver(config)
msg = 'test' * 2000 # in_tail reads 8192 bytes at once.

mock.proxy(d.instance).io_handler(anything, anything) do |io_handler|
require 'fluent/config/types'
limit_bytes_value = Fluent::Config.size_value(limit_bytes)
io_handler.instance_variable_set(:@number_bytes_read, limit_bytes_value)
if Fluent.linux?
mock.proxy(io_handler).handle_notify.at_least(5)
else
mock.proxy(io_handler).handle_notify.twice
end
io_handler
end

# We should not do shutdown here due to hard timeout.
d.run(expect_emits: 2, shutdown: false) do
File.open("#{TMP_DIR}/tail.txt", "ab") {|f|
for _x in 0..30
File.open("#{TMP_DIR}/tail.txt", "ab") do |f|
100.times do
f.puts msg
end
}
end
end

events = d.events
assert_true(events.length <= num_events)
assert_equal({"message" => msg}, events[0][2])
assert_equal({"message" => msg}, events[1][2])
# We should not do shutdown here due to hard timeout.
d.run(shutdown: false) do
start_time = Fluent::Clock.now
while Fluent::Clock.now - start_time < 0.8 do
File.open("#{TMP_DIR}/tail.txt", "ab") do |f|
f.puts msg
f.flush
end
sleep 0.05
end
end

# Teardown in_tail plugin instance here.
d.instance.shutdown
assert_equal([], d.events)

# Teardown in_tail plugin instance here.
d.instance.shutdown
end
end
end

Expand Down

0 comments on commit e2e51d1

Please sign in to comment.