diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 5fa73b6eb7..a07902a715 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -81,6 +81,8 @@ def initialize config_param :refresh_interval, :time, default: 60 desc 'The number of reading lines at each IO.' config_param :read_lines_limit, :integer, default: 1000 + desc 'The number of reading bytes per second' + config_param :read_bytes_limit_per_second, :size, default: -1 desc 'The interval of flushing the buffer for multiline format' config_param :multiline_flush_interval, :time, default: nil desc 'Enable the option to emit unmatched lines.' @@ -178,6 +180,16 @@ def configure(conf) # parser is already created by parser helper @parser = parser_create(usage: parser_config['usage'] || @parser_configs.first.usage) @capability = Fluent::Capability.new(:current_process) + if @read_bytes_limit_per_second > 0 + if !@enable_watch_timer + raise Fluent::ConfigError, "Need to enable watch timer when using log throttling feature" + end + min_bytes = TailWatcher::IOHandler::BYTES_TO_READ + if @read_bytes_limit_per_second < min_bytes + log.warn "Should specify greater equal than #{min_bytes}. Use #{min_bytes} for read_bytes_limit_per_second" + @read_bytes_limit_per_second = min_bytes + end + end end def configure_tag @@ -392,36 +404,40 @@ def setup_watcher(target_info, pe) raise e end - def start_watchers(targets_info) - targets_info.each_value { |target_info| - pe = nil - if @pf - pe = @pf[target_info] - if @read_from_head && pe.read_inode.zero? - begin - pe.update(Fluent::FileWrapper.stat(target_info.path).ino, 0) - rescue Errno::ENOENT - $log.warn "#{target_info.path} not found. Continuing without tailing it." - end + def construct_watcher(target_info) + pe = nil + if @pf + pe = @pf[target_info] + if @read_from_head && pe.read_inode.zero? + begin + pe.update(Fluent::FileWrapper.stat(target_info.path).ino, 0) + rescue Errno::ENOENT + $log.warn "#{target_info.path} not found. Continuing without tailing it." end end + end - begin - tw = setup_watcher(target_info, pe) - rescue WatcherSetupError => e - log.warn "Skip #{target_info.path} because unexpected setup error happens: #{e}" - next - end + begin + tw = setup_watcher(target_info, pe) + rescue WatcherSetupError => e + log.warn "Skip #{target_info.path} because unexpected setup error happens: #{e}" + return + end - begin - target_info = TargetInfo.new(target_info.path, Fluent::FileWrapper.stat(target_info.path).ino) - @tails[target_info] = tw - rescue Errno::ENOENT - $log.warn "stat() for #{target_info.path} failed with ENOENT. Drop tail watcher for now." - # explicitly detach and unwatch watcher `tw`. - tw.unwatched = true - detach_watcher(tw, target_info.ino, false) - end + begin + target_info = TargetInfo.new(target_info.path, Fluent::FileWrapper.stat(target_info.path).ino) + @tails[target_info] = tw + rescue Errno::ENOENT + $log.warn "stat() for #{target_info.path} failed with ENOENT. Drop tail watcher for now." + # explicitly detach and unwatch watcher `tw`. + tw.unwatched = true + detach_watcher(tw, target_info.ino, false) + end + end + + def start_watchers(targets_info) + targets_info.each_value {|target_info| + construct_watcher(target_info) } end @@ -633,6 +649,7 @@ def io_handler(watcher, path) path: path, log: log, read_lines_limit: @read_lines_limit, + read_bytes_limit_per_second: @read_bytes_limit_per_second, open_on_every_update: @open_on_every_update, from_encoding: @from_encoding, encoding: @encoding, @@ -876,10 +893,13 @@ def bytesize end class IOHandler - def initialize(watcher, path:, read_lines_limit:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, &receive_lines) + BYTES_TO_READ = 8192 + + def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, &receive_lines) @watcher = watcher @path = path @read_lines_limit = read_lines_limit + @read_bytes_limit_per_second = read_bytes_limit_per_second @receive_lines = receive_lines @open_on_every_update = open_on_every_update @fifo = FIFO.new(from_encoding || Encoding::ASCII_8BIT, encoding || Encoding::ASCII_8BIT) @@ -909,21 +929,51 @@ def opened? private + def read_bytes_limits_reached?(number_bytes_read) + number_bytes_read >= @read_bytes_limit_per_second && @read_bytes_limit_per_second > 0 + end + + def limit_bytes_per_second_reached?(start_reading, number_bytes_read) + return false unless read_bytes_limits_reached?(number_bytes_read) + + # sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion + time_spent_reading = Fluent::Clock.now - start_reading + @log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}") + if time_spent_reading < 1 + needed_sleeping_time = 1 - time_spent_reading + @log.debug("log ingestion for `#{@path}' is suspended temporary. Read it again later.") + return true + end + + false + end + def handle_notify with_io do |io| begin + number_bytes_read = 0 + start_reading = Fluent::Clock.now read_more = false if !io.nil? && @lines.empty? begin while true - @fifo << io.readpartial(8192, @iobuf) + data = io.readpartial(BYTES_TO_READ, @iobuf) + number_bytes_read += data.bytesize + @fifo << data @fifo.read_lines(@lines) + + @log.debug("reading file: #{@path}") if @lines.size >= @read_lines_limit # not to use too much memory in case the file is very large read_more = true break end + if limit_bytes_per_second_reached?(start_reading, number_bytes_read) + # Just get out from tailing loop. + read_more = false + break + end end rescue EOFError end diff --git a/test/plugin/in_tail/test_io_handler.rb b/test/plugin/in_tail/test_io_handler.rb index e5cdbbc8b0..2ae47bf995 100644 --- a/test/plugin/in_tail/test_io_handler.rb +++ b/test/plugin/in_tail/test_io_handler.rb @@ -30,7 +30,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase end returned_lines = '' - r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, log: $log, open_on_every_update: false) do |lines, _watcher| + r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false) do |lines, _watcher| returned_lines << lines.join true end @@ -62,7 +62,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase end returned_lines = '' - r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, log: $log, open_on_every_update: true) do |lines, _watcher| + r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: true) do |lines, _watcher| returned_lines << lines.join true end @@ -93,7 +93,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase end returned_lines = [] - r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, log: $log, open_on_every_update: false) do |lines, _watcher| + r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false) do |lines, _watcher| returned_lines << lines.dup true end @@ -119,7 +119,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase end returned_lines = [] - r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, log: $log, open_on_every_update: false) do |lines, _watcher| + r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false) do |lines, _watcher| returned_lines << lines.dup true end diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index faf737b0bf..a651144bd3 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -156,6 +156,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) assert_equal 2, d.instance.rotate_wait assert_equal "#{TMP_DIR}/tail.pos", d.instance.pos_file assert_equal 1000, d.instance.read_lines_limit + assert_equal -1, d.instance.read_bytes_limit_per_second assert_equal false, d.instance.ignore_repeated_permission_error assert_nothing_raised do d.instance.have_read_capability? @@ -195,6 +196,22 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) end end + sub_test_case "log throttling per file" do + test "w/o watcher timer is invalid" do + conf = CONFIG_ENABLE_WATCH_TIMER + config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"}) + assert_raise(Fluent::ConfigError) do + create_driver(conf) + end + end + + test "valid" do + conf = config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"}) + assert_raise(Fluent::ConfigError) do + create_driver(conf) + end + end + end + test "both enable_watch_timer and enable_stat_watcher are false" do assert_raise(Fluent::ConfigError) do create_driver(CONFIG_ENABLE_WATCH_TIMER + CONFIG_DISABLE_STAT_WATCHER + PARSE_SINGLE_LINE_CONFIG) @@ -323,6 +340,61 @@ def test_emit_with_read_lines_limit(data) assert num_events <= d.emit_count end + sub_test_case "log throttling per file" do + teardown do + cleanup_file("#{TMP_DIR}/tail.txt") + end + + data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2], + "flat 8192 bytes, 2 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 2], + "flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20], + "flat #{8192*10} bytes, 20 events w/o stat watcher" => [:flat_without_stat, 100, (8192 * 10), 20], + "parse #{8192*4} bytes, 8 events" => [:parse, 100, (8192 * 4), 8], + "parse #{8192*4} bytes, 8 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 4), 8], + "parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20], + "parse #{8192*10} bytes, 20 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 10), 20], + "flat 8k bytes with unit, 2 events" => [:flat, 100, "8k", 2], + "flat 8k bytes with unit, 2 events w/o stat watcher" => [:flat_without_stat, 100, "8k", 2], + "flat #{8*10}k bytes with unit, 20 events" => [:flat, 100, "#{8*10}k", 20], + "flat #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:flat_without_stat, 100, "#{8*10}k", 20], + "parse #{8*4}k bytes with unit, 8 events" => [:parse, 100, "#{8*4}k", 8], + "parse #{8*4}k bytes with unit, 8 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*4}k", 8], + "parse #{8*10}k bytes with unit, 20 events" => [:parse, 100, "#{8*10}k", 20], + "parse #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*10}k", 20]) + def test_emit_with_read_bytes_limit_per_second(data) + config_style, limit, limit_bytes, num_events = data + case config_style + when :flat + config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + when :parse + config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG + when :flat_without_stat + config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + when :parse_without_stat + config = CONFIG_READ_FROM_HEAD + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG + end + d = create_driver(config) + msg = 'test' * 2000 # in_tail reads 8192 bytes at once. + + # We should not do shutdown here due to hard timeout. + d.run(expect_emits: 2, shutdown: false) do + File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + for _x in 0..30 + f.puts msg + end + } + end + + events = d.events + assert_true(events.length <= num_events) + assert_equal({"message" => msg}, events[0][2]) + assert_equal({"message" => msg}, events[1][2]) + + # Teardown in_tail plugin instance here. + d.instance.shutdown + end + end + data(flat: CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG, parse: CONFIG_READ_FROM_HEAD + PARSE_SINGLE_LINE_CONFIG) def test_emit_with_read_from_head(data)