Skip to content

Commit

Permalink
Merge pull request #3185 from fluent/log-throttling-per-file-revised
Browse files Browse the repository at this point in the history
Add log throttling per file (revised)
  • Loading branch information
ashie committed May 11, 2021
2 parents 22c2b4a + 98616e2 commit b64ffca
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 32 deletions.
106 changes: 78 additions & 28 deletions lib/fluent/plugin/in_tail.rb
Expand Up @@ -81,6 +81,8 @@ def initialize
config_param :refresh_interval, :time, default: 60
desc 'The number of reading lines at each IO.'
config_param :read_lines_limit, :integer, default: 1000
desc 'The number of reading bytes per second'
config_param :read_bytes_limit_per_second, :size, default: -1
desc 'The interval of flushing the buffer for multiline format'
config_param :multiline_flush_interval, :time, default: nil
desc 'Enable the option to emit unmatched lines.'
Expand Down Expand Up @@ -178,6 +180,16 @@ def configure(conf)
# parser is already created by parser helper
@parser = parser_create(usage: parser_config['usage'] || @parser_configs.first.usage)
@capability = Fluent::Capability.new(:current_process)
if @read_bytes_limit_per_second > 0
if !@enable_watch_timer
raise Fluent::ConfigError, "Need to enable watch timer when using log throttling feature"
end
min_bytes = TailWatcher::IOHandler::BYTES_TO_READ
if @read_bytes_limit_per_second < min_bytes
log.warn "Should specify greater equal than #{min_bytes}. Use #{min_bytes} for read_bytes_limit_per_second"
@read_bytes_limit_per_second = min_bytes
end
end
end

def configure_tag
Expand Down Expand Up @@ -392,36 +404,40 @@ def setup_watcher(target_info, pe)
raise e
end

def start_watchers(targets_info)
targets_info.each_value { |target_info|
pe = nil
if @pf
pe = @pf[target_info]
if @read_from_head && pe.read_inode.zero?
begin
pe.update(Fluent::FileWrapper.stat(target_info.path).ino, 0)
rescue Errno::ENOENT
$log.warn "#{target_info.path} not found. Continuing without tailing it."
end
def construct_watcher(target_info)
pe = nil
if @pf
pe = @pf[target_info]
if @read_from_head && pe.read_inode.zero?
begin
pe.update(Fluent::FileWrapper.stat(target_info.path).ino, 0)
rescue Errno::ENOENT
$log.warn "#{target_info.path} not found. Continuing without tailing it."
end
end
end

begin
tw = setup_watcher(target_info, pe)
rescue WatcherSetupError => e
log.warn "Skip #{target_info.path} because unexpected setup error happens: #{e}"
next
end
begin
tw = setup_watcher(target_info, pe)
rescue WatcherSetupError => e
log.warn "Skip #{target_info.path} because unexpected setup error happens: #{e}"
return
end

begin
target_info = TargetInfo.new(target_info.path, Fluent::FileWrapper.stat(target_info.path).ino)
@tails[target_info] = tw
rescue Errno::ENOENT
$log.warn "stat() for #{target_info.path} failed with ENOENT. Drop tail watcher for now."
# explicitly detach and unwatch watcher `tw`.
tw.unwatched = true
detach_watcher(tw, target_info.ino, false)
end
begin
target_info = TargetInfo.new(target_info.path, Fluent::FileWrapper.stat(target_info.path).ino)
@tails[target_info] = tw
rescue Errno::ENOENT
$log.warn "stat() for #{target_info.path} failed with ENOENT. Drop tail watcher for now."
# explicitly detach and unwatch watcher `tw`.
tw.unwatched = true
detach_watcher(tw, target_info.ino, false)
end
end

def start_watchers(targets_info)
targets_info.each_value {|target_info|
construct_watcher(target_info)
}
end

Expand Down Expand Up @@ -633,6 +649,7 @@ def io_handler(watcher, path)
path: path,
log: log,
read_lines_limit: @read_lines_limit,
read_bytes_limit_per_second: @read_bytes_limit_per_second,
open_on_every_update: @open_on_every_update,
from_encoding: @from_encoding,
encoding: @encoding,
Expand Down Expand Up @@ -876,10 +893,13 @@ def bytesize
end

class IOHandler
def initialize(watcher, path:, read_lines_limit:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, &receive_lines)
BYTES_TO_READ = 8192

def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, &receive_lines)
@watcher = watcher
@path = path
@read_lines_limit = read_lines_limit
@read_bytes_limit_per_second = read_bytes_limit_per_second
@receive_lines = receive_lines
@open_on_every_update = open_on_every_update
@fifo = FIFO.new(from_encoding || Encoding::ASCII_8BIT, encoding || Encoding::ASCII_8BIT)
Expand Down Expand Up @@ -909,21 +929,51 @@ def opened?

private

def read_bytes_limits_reached?(number_bytes_read)
number_bytes_read >= @read_bytes_limit_per_second && @read_bytes_limit_per_second > 0
end

def limit_bytes_per_second_reached?(start_reading, number_bytes_read)
return false unless read_bytes_limits_reached?(number_bytes_read)

# sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion
time_spent_reading = Fluent::Clock.now - start_reading
@log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}")
if time_spent_reading < 1
needed_sleeping_time = 1 - time_spent_reading
@log.debug("log ingestion for `#{@path}' is suspended temporary. Read it again later.")
return true
end

false
end

def handle_notify
with_io do |io|
begin
number_bytes_read = 0
start_reading = Fluent::Clock.now
read_more = false

if !io.nil? && @lines.empty?
begin
while true
@fifo << io.readpartial(8192, @iobuf)
data = io.readpartial(BYTES_TO_READ, @iobuf)
number_bytes_read += data.bytesize
@fifo << data
@fifo.read_lines(@lines)

@log.debug("reading file: #{@path}")
if @lines.size >= @read_lines_limit
# not to use too much memory in case the file is very large
read_more = true
break
end
if limit_bytes_per_second_reached?(start_reading, number_bytes_read)
# Just get out from tailing loop.
read_more = false
break
end
end
rescue EOFError
end
Expand Down
8 changes: 4 additions & 4 deletions test/plugin/in_tail/test_io_handler.rb
Expand Up @@ -30,7 +30,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase
end

returned_lines = ''
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, log: $log, open_on_every_update: false) do |lines, _watcher|
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false) do |lines, _watcher|
returned_lines << lines.join
true
end
Expand Down Expand Up @@ -62,7 +62,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase
end

returned_lines = ''
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, log: $log, open_on_every_update: true) do |lines, _watcher|
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: true) do |lines, _watcher|
returned_lines << lines.join
true
end
Expand Down Expand Up @@ -93,7 +93,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase
end

returned_lines = []
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, log: $log, open_on_every_update: false) do |lines, _watcher|
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false) do |lines, _watcher|
returned_lines << lines.dup
true
end
Expand All @@ -119,7 +119,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase
end

returned_lines = []
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, log: $log, open_on_every_update: false) do |lines, _watcher|
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false) do |lines, _watcher|
returned_lines << lines.dup
true
end
Expand Down
72 changes: 72 additions & 0 deletions test/plugin/test_in_tail.rb
Expand Up @@ -156,6 +156,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true)
assert_equal 2, d.instance.rotate_wait
assert_equal "#{TMP_DIR}/tail.pos", d.instance.pos_file
assert_equal 1000, d.instance.read_lines_limit
assert_equal -1, d.instance.read_bytes_limit_per_second
assert_equal false, d.instance.ignore_repeated_permission_error
assert_nothing_raised do
d.instance.have_read_capability?
Expand Down Expand Up @@ -195,6 +196,22 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true)
end
end

sub_test_case "log throttling per file" do
test "w/o watcher timer is invalid" do
conf = CONFIG_ENABLE_WATCH_TIMER + config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"})
assert_raise(Fluent::ConfigError) do
create_driver(conf)
end
end

test "valid" do
conf = config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"})
assert_raise(Fluent::ConfigError) do
create_driver(conf)
end
end
end

test "both enable_watch_timer and enable_stat_watcher are false" do
assert_raise(Fluent::ConfigError) do
create_driver(CONFIG_ENABLE_WATCH_TIMER + CONFIG_DISABLE_STAT_WATCHER + PARSE_SINGLE_LINE_CONFIG)
Expand Down Expand Up @@ -323,6 +340,61 @@ def test_emit_with_read_lines_limit(data)
assert num_events <= d.emit_count
end

sub_test_case "log throttling per file" do
teardown do
cleanup_file("#{TMP_DIR}/tail.txt")
end

data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2],
"flat 8192 bytes, 2 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 2],
"flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20],
"flat #{8192*10} bytes, 20 events w/o stat watcher" => [:flat_without_stat, 100, (8192 * 10), 20],
"parse #{8192*4} bytes, 8 events" => [:parse, 100, (8192 * 4), 8],
"parse #{8192*4} bytes, 8 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 4), 8],
"parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20],
"parse #{8192*10} bytes, 20 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 10), 20],
"flat 8k bytes with unit, 2 events" => [:flat, 100, "8k", 2],
"flat 8k bytes with unit, 2 events w/o stat watcher" => [:flat_without_stat, 100, "8k", 2],
"flat #{8*10}k bytes with unit, 20 events" => [:flat, 100, "#{8*10}k", 20],
"flat #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:flat_without_stat, 100, "#{8*10}k", 20],
"parse #{8*4}k bytes with unit, 8 events" => [:parse, 100, "#{8*4}k", 8],
"parse #{8*4}k bytes with unit, 8 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*4}k", 8],
"parse #{8*10}k bytes with unit, 20 events" => [:parse, 100, "#{8*10}k", 20],
"parse #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*10}k", 20])
def test_emit_with_read_bytes_limit_per_second(data)
config_style, limit, limit_bytes, num_events = data
case config_style
when :flat
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes })
when :parse
config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
when :flat_without_stat
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes })
when :parse_without_stat
config = CONFIG_READ_FROM_HEAD + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
end
d = create_driver(config)
msg = 'test' * 2000 # in_tail reads 8192 bytes at once.

# We should not do shutdown here due to hard timeout.
d.run(expect_emits: 2, shutdown: false) do
File.open("#{TMP_DIR}/tail.txt", "ab") {|f|
for _x in 0..30
f.puts msg
end
}
end

events = d.events
assert_true(events.length <= num_events)
assert_equal({"message" => msg}, events[0][2])
assert_equal({"message" => msg}, events[1][2])

# Teardown in_tail plugin instance here.
d.instance.shutdown
end
end

data(flat: CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG,
parse: CONFIG_READ_FROM_HEAD + PARSE_SINGLE_LINE_CONFIG)
def test_emit_with_read_from_head(data)
Expand Down

0 comments on commit b64ffca

Please sign in to comment.