From d6755edf76794be934379f4c91c8ce84e6c0c04f Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 30 Nov 2020 14:04:28 +0900 Subject: [PATCH 01/38] in_tail: Handle log throttling per file feature Signed-off-by: Hiroshi Hatake Co-authored-by: Anthony Comtois --- lib/fluent/plugin/in_tail.rb | 30 ++++++++++++++++++--- test/plugin/in_tail/test_io_handler.rb | 8 +++--- test/plugin/test_in_tail.rb | 36 ++++++++++++++++++++++++++ 3 files changed, 67 insertions(+), 7 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 5fa73b6eb7..06a3357ef4 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -81,6 +81,8 @@ def initialize config_param :refresh_interval, :time, default: 60 desc 'The number of reading lines at each IO.' config_param :read_lines_limit, :integer, default: 1000 + desc 'The number of reading bytes per second' + config_param :read_bytes_limit_per_second, :integer, default: -1 desc 'The interval of flushing the buffer for multiline format' config_param :multiline_flush_interval, :time, default: nil desc 'Enable the option to emit unmatched lines.' @@ -633,6 +635,7 @@ def io_handler(watcher, path) path: path, log: log, read_lines_limit: @read_lines_limit, + read_bytes_limit_per_second: @read_bytes_limit_per_second, open_on_every_update: @open_on_every_update, from_encoding: @from_encoding, encoding: @encoding, @@ -876,10 +879,11 @@ def bytesize end class IOHandler - def initialize(watcher, path:, read_lines_limit:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, &receive_lines) + def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, &receive_lines) @watcher = watcher @path = path @read_lines_limit = read_lines_limit + @read_bytes_limit_per_second = read_bytes_limit_per_second @receive_lines = receive_lines @open_on_every_update = open_on_every_update @fifo = FIFO.new(from_encoding || Encoding::ASCII_8BIT, encoding || Encoding::ASCII_8BIT) @@ -912,16 +916,36 @@ def opened? def handle_notify with_io do |io| begin + bytes_to_read = 8192 + number_bytes_read = 0 + start_reading = Fluent::EventTime.now read_more = false if !io.nil? && @lines.empty? begin while true - @fifo << io.readpartial(8192, @iobuf) + @fifo << io.readpartial(bytes_to_read, @iobuf) @fifo.read_lines(@lines) - if @lines.size >= @read_lines_limit + + number_bytes_read += bytes_to_read + limit_bytes_per_second_reached = (number_bytes_read >= @read_bytes_limit_per_second && @read_bytes_limit_per_second > 0) + @log.debug("reading file: #{@path}") + if @lines.size >= @read_lines_limit || limit_bytes_per_second_reached # not to use too much memory in case the file is very large read_more = true + + if limit_bytes_per_second_reached + # sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion + time_spent_reading = Time.now - start_reading + @log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}") + if time_spent_reading < 1 + debug_time = 1 - time_spent_reading + @log.debug("sleep: #{debug_time}") + sleep(1 - time_spent_reading) + end + start_reading = Fluent::EventTime.now + end + break end end diff --git a/test/plugin/in_tail/test_io_handler.rb b/test/plugin/in_tail/test_io_handler.rb index e5cdbbc8b0..2ae47bf995 100644 --- a/test/plugin/in_tail/test_io_handler.rb +++ b/test/plugin/in_tail/test_io_handler.rb @@ -30,7 +30,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase end returned_lines = '' - r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, log: $log, open_on_every_update: false) do |lines, _watcher| + r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false) do |lines, _watcher| returned_lines << lines.join true end @@ -62,7 +62,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase end returned_lines = '' - r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, log: $log, open_on_every_update: true) do |lines, _watcher| + r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: true) do |lines, _watcher| returned_lines << lines.join true end @@ -93,7 +93,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase end returned_lines = [] - r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, log: $log, open_on_every_update: false) do |lines, _watcher| + r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false) do |lines, _watcher| returned_lines << lines.dup true end @@ -119,7 +119,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase end returned_lines = [] - r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, log: $log, open_on_every_update: false) do |lines, _watcher| + r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false) do |lines, _watcher| returned_lines << lines.dup true end diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index faf737b0bf..903701b0f3 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -156,6 +156,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) assert_equal 2, d.instance.rotate_wait assert_equal "#{TMP_DIR}/tail.pos", d.instance.pos_file assert_equal 1000, d.instance.read_lines_limit + assert_equal -1, d.instance.read_bytes_limit_per_second assert_equal false, d.instance.ignore_repeated_permission_error assert_nothing_raised do d.instance.have_read_capability? @@ -323,6 +324,41 @@ def test_emit_with_read_lines_limit(data) assert num_events <= d.emit_count end + sub_test_case "log throttoling per file" do + teardown do + FileUtils.rm_f("#{TMP_DIR}/tail.txt") + end + + data("flat 8192 bytes, 10 events" => [:flat, 100, 8192, 10], + "flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20], + "parse #{8192*3} bytes, 10 events" => [:parse, 100, (8192 * 3), 10], + "parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20]) + def test_emit_with_read_bytes_limit_per_second(data) + config_style, limit, limit_bytes, num_events = data + case config_style + when :flat + config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + when :parse + config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG + end + d = create_driver(config) + msg = 'test' * 2000 # in_tail reads 8192 bytes at once. + + d.run(expect_emits: 2) do + File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + for _x in 0..20 + f.puts msg + end + } + end + + events = d.events + assert_equal(true, events.length <= num_events) + assert_equal({"message" => msg}, events[0][2]) + assert_equal({"message" => msg}, events[1][2]) + end + end + data(flat: CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG, parse: CONFIG_READ_FROM_HEAD + PARSE_SINGLE_LINE_CONFIG) def test_emit_with_read_from_head(data) From a5d3a5a4d1d184085bd185ef80b68071dea4b779 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 1 Dec 2020 17:34:54 +0900 Subject: [PATCH 02/38] in_tail: Use size config type for read_bytes_limit_per_second Because file size should be specified as numbers with SI prefix instead of raw numbers. Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 2 +- test/plugin/test_in_tail.rb | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 06a3357ef4..99506f4178 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -82,7 +82,7 @@ def initialize desc 'The number of reading lines at each IO.' config_param :read_lines_limit, :integer, default: 1000 desc 'The number of reading bytes per second' - config_param :read_bytes_limit_per_second, :integer, default: -1 + config_param :read_bytes_limit_per_second, :size, default: -1 desc 'The interval of flushing the buffer for multiline format' config_param :multiline_flush_interval, :time, default: nil desc 'Enable the option to emit unmatched lines.' diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 903701b0f3..f15418f15e 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -329,10 +329,14 @@ def test_emit_with_read_lines_limit(data) FileUtils.rm_f("#{TMP_DIR}/tail.txt") end - data("flat 8192 bytes, 10 events" => [:flat, 100, 8192, 10], - "flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20], - "parse #{8192*3} bytes, 10 events" => [:parse, 100, (8192 * 3), 10], - "parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20]) + data("flat 8192 bytes, 10 events" => [:flat, 100, 8192, 10], + "flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20], + "parse #{8192*3} bytes, 10 events" => [:parse, 100, (8192 * 3), 10], + "parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20], + "flat 8k bytes with unit, 10 events" => [:flat, 100, "8k", 10], + "flat #{8*10}k bytes with unit, 20 events" => [:flat, 100, "#{8*10}k", 20], + "parse #{8*3}k bytes with unit, 10 events" => [:parse, 100, "#{8*3}k", 10], + "parse #{8*10}k bytes with unit, 20 events" => [:parse, 100, "#{8*10}k", 20]) def test_emit_with_read_bytes_limit_per_second(data) config_style, limit, limit_bytes, num_events = data case config_style From d3d711944a88257c1fe10cbf84870acddb21de51 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 21 Dec 2020 17:02:47 +0900 Subject: [PATCH 03/38] in_tail: Use actual read bytes to calculate read bytes number Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 2 +- test/plugin/test_in_tail.rb | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 99506f4178..51a5971429 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -927,7 +927,7 @@ def handle_notify @fifo << io.readpartial(bytes_to_read, @iobuf) @fifo.read_lines(@lines) - number_bytes_read += bytes_to_read + number_bytes_read += @lines.last.size limit_bytes_per_second_reached = (number_bytes_read >= @read_bytes_limit_per_second && @read_bytes_limit_per_second > 0) @log.debug("reading file: #{@path}") if @lines.size >= @read_lines_limit || limit_bytes_per_second_reached diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index f15418f15e..60d1e951c0 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -330,13 +330,13 @@ def test_emit_with_read_lines_limit(data) end data("flat 8192 bytes, 10 events" => [:flat, 100, 8192, 10], - "flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20], + "flat #{8192*10} bytes, 22 events" => [:flat, 100, (8192 * 10), 22], "parse #{8192*3} bytes, 10 events" => [:parse, 100, (8192 * 3), 10], - "parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20], + "parse #{8192*10} bytes, 22 events" => [:parse, 100, (8192 * 10), 22], "flat 8k bytes with unit, 10 events" => [:flat, 100, "8k", 10], - "flat #{8*10}k bytes with unit, 20 events" => [:flat, 100, "#{8*10}k", 20], + "flat #{8*10}k bytes with unit, 22 events" => [:flat, 100, "#{8*10}k", 22], "parse #{8*3}k bytes with unit, 10 events" => [:parse, 100, "#{8*3}k", 10], - "parse #{8*10}k bytes with unit, 20 events" => [:parse, 100, "#{8*10}k", 20]) + "parse #{8*10}k bytes with unit, 22 events" => [:parse, 100, "#{8*10}k", 22]) def test_emit_with_read_bytes_limit_per_second(data) config_style, limit, limit_bytes, num_events = data case config_style From 30a63ca4d3bb794968eb1afb7cf2b9f7fddc60d2 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 21 Dec 2020 17:06:15 +0900 Subject: [PATCH 04/38] in_tail: Use more meaningful variable name Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 51a5971429..fa5e979c7c 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -939,9 +939,9 @@ def handle_notify time_spent_reading = Time.now - start_reading @log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}") if time_spent_reading < 1 - debug_time = 1 - time_spent_reading - @log.debug("sleep: #{debug_time}") - sleep(1 - time_spent_reading) + needed_sleeping_time = 1 - time_spent_reading + @log.debug("sleep: #{needed_sleeping_time}") + sleep(needed_sleeping_time) end start_reading = Fluent::EventTime.now end From 61554dc869db28025868615e8e29fed8bcde953b Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 21 Dec 2020 17:24:25 +0900 Subject: [PATCH 05/38] in_tail: Use fixed value instead of variable Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index fa5e979c7c..e44d4d6c3c 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -916,7 +916,6 @@ def opened? def handle_notify with_io do |io| begin - bytes_to_read = 8192 number_bytes_read = 0 start_reading = Fluent::EventTime.now read_more = false @@ -924,7 +923,7 @@ def handle_notify if !io.nil? && @lines.empty? begin while true - @fifo << io.readpartial(bytes_to_read, @iobuf) + @fifo << io.readpartial(8192, @iobuf) @fifo.read_lines(@lines) number_bytes_read += @lines.last.size From a54f25b678efcaaf0d698a923aa7ab3c99091f90 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 22 Dec 2020 17:16:17 +0900 Subject: [PATCH 06/38] in_tail: test: Add log throttling testcases for w/o stat watcher situations Signed-off-by: Hiroshi Hatake --- test/plugin/test_in_tail.rb | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 60d1e951c0..c4839599cf 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -330,13 +330,21 @@ def test_emit_with_read_lines_limit(data) end data("flat 8192 bytes, 10 events" => [:flat, 100, 8192, 10], + "flat 8192 bytes, 10 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 10], "flat #{8192*10} bytes, 22 events" => [:flat, 100, (8192 * 10), 22], + "flat #{8192*10} bytes, 22 events w/o stat watcher" => [:flat_without_stat, 100, (8192 * 10), 22], "parse #{8192*3} bytes, 10 events" => [:parse, 100, (8192 * 3), 10], + "parse #{8192*3} bytes, 10 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 3), 10], "parse #{8192*10} bytes, 22 events" => [:parse, 100, (8192 * 10), 22], + "parse #{8192*10} bytes, 22 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 10), 22], "flat 8k bytes with unit, 10 events" => [:flat, 100, "8k", 10], + "flat 8k bytes with unit, 10 events w/o stat watcher" => [:flat_without_stat, 100, "8k", 10], "flat #{8*10}k bytes with unit, 22 events" => [:flat, 100, "#{8*10}k", 22], + "flat #{8*10}k bytes with unit, 22 events w/o stat watcher" => [:flat_without_stat, 100, "#{8*10}k", 22], "parse #{8*3}k bytes with unit, 10 events" => [:parse, 100, "#{8*3}k", 10], - "parse #{8*10}k bytes with unit, 22 events" => [:parse, 100, "#{8*10}k", 22]) + "parse #{8*3}k bytes with unit, 10 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*3}k", 10], + "parse #{8*10}k bytes with unit, 22 events" => [:parse, 100, "#{8*10}k", 22], + "parse #{8*10}k bytes with unit, 22 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*10}k", 22]) def test_emit_with_read_bytes_limit_per_second(data) config_style, limit, limit_bytes, num_events = data case config_style @@ -344,6 +352,10 @@ def test_emit_with_read_bytes_limit_per_second(data) config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) when :parse config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG + when :flat_without_stat + config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + when :parse_without_stat + config = CONFIG_READ_FROM_HEAD + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG end d = create_driver(config) msg = 'test' * 2000 # in_tail reads 8192 bytes at once. From 2b2260e17fe4dcf5c31bded4faf2e46ee36dfb79 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 23 Dec 2020 18:25:46 +0900 Subject: [PATCH 07/38] in_tail: Guard for read empty lines Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index e44d4d6c3c..e5c5827567 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -926,7 +926,7 @@ def handle_notify @fifo << io.readpartial(8192, @iobuf) @fifo.read_lines(@lines) - number_bytes_read += @lines.last.size + number_bytes_read += @lines.last ? @lines.last.size : 0 limit_bytes_per_second_reached = (number_bytes_read >= @read_bytes_limit_per_second && @read_bytes_limit_per_second > 0) @log.debug("reading file: #{@path}") if @lines.size >= @read_lines_limit || limit_bytes_per_second_reached From 8ebc227bcaa4a427c4a7fe36ed3e9abe9249d1e8 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 23 Dec 2020 18:12:22 +0900 Subject: [PATCH 08/38] Use Thread Pool on tailing to prevent Kernel#sleep for entire sleep Because the previous in_tail implementation is single threaded. Thus, calling Kernel#sleep pauses entire tailing thread and in_tail plugin. Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 110 ++++++++++++++++++++++++++--------- test/plugin/test_in_tail.rb | 1 + 2 files changed, 84 insertions(+), 27 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index e5c5827567..534d83b269 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -109,6 +109,8 @@ def initialize config_param :path_timezone, :string, default: nil desc 'Follow inodes instead of following file names. Guarantees more stable delivery and allows to use * in path pattern with rotating files' config_param :follow_inodes, :bool, default: false + desc 'Specify max size of thread pool' + config_param :max_thread_pool_size, :integer, default: 10 config_section :parse, required: false, multi: true, init: true, param_name: :parser_configs do config_argument :usage, :string, default: 'in_tail_parser' @@ -180,6 +182,8 @@ def configure(conf) # parser is already created by parser helper @parser = parser_create(usage: parser_config['usage'] || @parser_configs.first.usage) @capability = Fluent::Capability.new(:current_process) + # Need to create thread pool because #sleep should pause entire thread on enabling log throttling feature. + @thread_pool = nil end def configure_tag @@ -395,36 +399,40 @@ def setup_watcher(target_info, pe) end def start_watchers(targets_info) - targets_info.each_value { |target_info| - pe = nil - if @pf - pe = @pf[target_info] - if @read_from_head && pe.read_inode.zero? - begin - pe.update(Fluent::FileWrapper.stat(target_info.path).ino, 0) - rescue Errno::ENOENT - $log.warn "#{target_info.path} not found. Continuing without tailing it." + @thread_pool = TailThread::Pool.new(@max_thread_pool_size) do |pool| + targets_info.each_value { |target_info| + pool.run { + pe = nil + if @pf + pe = @pf[target_info] + if @read_from_head && pe.read_inode.zero? + begin + pe.update(Fluent::FileWrapper.stat(target_info.path).ino, 0) + rescue Errno::ENOENT + $log.warn "#{target_info.path} not found. Continuing without tailing it." + end + end end - end - end - begin - tw = setup_watcher(target_info, pe) - rescue WatcherSetupError => e - log.warn "Skip #{target_info.path} because unexpected setup error happens: #{e}" - next - end + begin + tw = setup_watcher(target_info, pe) + rescue WatcherSetupError => e + log.warn "Skip #{target_info.path} because unexpected setup error happens: #{e}" + next + end - begin - target_info = TargetInfo.new(target_info.path, Fluent::FileWrapper.stat(target_info.path).ino) - @tails[target_info] = tw - rescue Errno::ENOENT - $log.warn "stat() for #{target_info.path} failed with ENOENT. Drop tail watcher for now." - # explicitly detach and unwatch watcher `tw`. - tw.unwatched = true - detach_watcher(tw, target_info.ino, false) - end - } + begin + target_info = TargetInfo.new(target_info.path, Fluent::FileWrapper.stat(target_info.path).ino) + @tails[target_info] = tw + rescue Errno::ENOENT + $log.warn "stat() for #{target_info.path} failed with ENOENT. Drop tail watcher for now." + # explicitly detach and unwatch watcher `tw`. + tw.unwatched = true + detach_watcher(tw, target_info.ino, false) + end + } + } + end end def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watcher: true) @@ -443,6 +451,9 @@ def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watch end end } + if @thread_pool + @thread_pool.stop + end end def close_watcher_handles @@ -673,6 +684,51 @@ def on_timer end end + class TailThread + class Pool + def initialize(max_size, &session) + @max_size = max_size + @queue = Queue.new + @threads = [] + @running = false + session.call(self) + @running = true + ensure + stop + end + + def run(&task) + @queue.push(task) + @threads << create_thread if @threads.size < @max_size + end + + def stop + return unless @running + @running = false + + until @queue.num_waiting == @threads.size + sleep 0.01 + end + begin + Timeout.timeout(1) do + @threads.join + end + rescue Timeout::Error + @threads.each {|th| th.kill } + end + end + + protected def create_thread + Thread.start(@queue) do |q| + loop do + task = q.pop + task.call + end + end + end + end + end + class TailWatcher def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build) @path = target_info.path diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index c4839599cf..f8cc532bd2 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -157,6 +157,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) assert_equal "#{TMP_DIR}/tail.pos", d.instance.pos_file assert_equal 1000, d.instance.read_lines_limit assert_equal -1, d.instance.read_bytes_limit_per_second + assert_equal 10, d.instance.max_thread_pool_size assert_equal false, d.instance.ignore_repeated_permission_error assert_nothing_raised do d.instance.have_read_capability? From 6e8a13c6ea84ec36560ad99d86d83f8951a0ecc5 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 24 Dec 2020 17:12:21 +0900 Subject: [PATCH 09/38] in_tail: Use Fluent::EventTime instead of Time Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 534d83b269..ac5207be01 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -991,7 +991,7 @@ def handle_notify if limit_bytes_per_second_reached # sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion - time_spent_reading = Time.now - start_reading + time_spent_reading = Fluent::EventTime.now - start_reading @log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}") if time_spent_reading < 1 needed_sleeping_time = 1 - time_spent_reading From 5b19e400966cfed6bc994d4c0b68a4f60be011ef Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 24 Dec 2020 18:04:44 +0900 Subject: [PATCH 10/38] in_tail: Check invalid max_thread_pool_size on #configure Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index ac5207be01..98be79bb25 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -184,6 +184,11 @@ def configure(conf) @capability = Fluent::Capability.new(:current_process) # Need to create thread pool because #sleep should pause entire thread on enabling log throttling feature. @thread_pool = nil + if @read_bytes_limit_per_second > 0 + if @max_thread_pool_size < 0 + raise Fluent::ConfigError, "Specify positive number" + end + end end def configure_tag From 854cca809b17594629240d0783e70566a0171adf Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 24 Dec 2020 18:05:31 +0900 Subject: [PATCH 11/38] in_tail: Set default value of max_thread_pool_size as 1 Because when not using log throttling, is is enough to run on single thread. Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 2 +- test/plugin/test_in_tail.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 98be79bb25..b28947f71a 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -110,7 +110,7 @@ def initialize desc 'Follow inodes instead of following file names. Guarantees more stable delivery and allows to use * in path pattern with rotating files' config_param :follow_inodes, :bool, default: false desc 'Specify max size of thread pool' - config_param :max_thread_pool_size, :integer, default: 10 + config_param :max_thread_pool_size, :integer, default: 1 config_section :parse, required: false, multi: true, init: true, param_name: :parser_configs do config_argument :usage, :string, default: 'in_tail_parser' diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index f8cc532bd2..01f33c827a 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -157,7 +157,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) assert_equal "#{TMP_DIR}/tail.pos", d.instance.pos_file assert_equal 1000, d.instance.read_lines_limit assert_equal -1, d.instance.read_bytes_limit_per_second - assert_equal 10, d.instance.max_thread_pool_size + assert_equal 1, d.instance.max_thread_pool_size assert_equal false, d.instance.ignore_repeated_permission_error assert_nothing_raised do d.instance.have_read_capability? From f76f45a2fb855e046ac5cb8af21c24a7a0edd50e Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 24 Dec 2020 18:06:59 +0900 Subject: [PATCH 12/38] in_tail: Use 2 or more thread pool size on log throttling testcases Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 4 ++++ test/plugin/test_in_tail.rb | 25 +++++++++++++------------ 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index b28947f71a..3622719439 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -188,6 +188,10 @@ def configure(conf) if @max_thread_pool_size < 0 raise Fluent::ConfigError, "Specify positive number" end + require 'etc' + if @max_thread_pool_size <= (Etc.nprocessors / 2) + raise Fluent::ConfigError, "Specify #{Etc.nprocessors / 2} or more on max_thread_pool_size" + end end end diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 01f33c827a..5c85677e7d 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -102,6 +102,7 @@ def create_target_info(path) CONFIG_ENABLE_WATCH_TIMER = config_element("", "", { "enable_watch_timer" => false }) CONFIG_DISABLE_STAT_WATCHER = config_element("", "", { "enable_stat_watcher" => false }) CONFIG_OPEN_ON_EVERY_UPDATE = config_element("", "", { "open_on_every_update" => true }) + CONFIG_MAX_THREAD_POOL_SIZE = config_element("", "", { "max_thread_pool_size" => (Etc.nprocessors / 1.5).ceil }) COMMON_FOLLOW_INODE_CONFIG = config_element("ROOT", "", { "path" => "#{TMP_DIR}/tail.txt*", "pos_file" => "#{TMP_DIR}/tail.pos", @@ -330,33 +331,33 @@ def test_emit_with_read_lines_limit(data) FileUtils.rm_f("#{TMP_DIR}/tail.txt") end - data("flat 8192 bytes, 10 events" => [:flat, 100, 8192, 10], - "flat 8192 bytes, 10 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 10], + data("flat 8192 bytes, 24 events" => [:flat, 100, 8192, 24], + "flat 8192 bytes, 24 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 24], "flat #{8192*10} bytes, 22 events" => [:flat, 100, (8192 * 10), 22], "flat #{8192*10} bytes, 22 events w/o stat watcher" => [:flat_without_stat, 100, (8192 * 10), 22], - "parse #{8192*3} bytes, 10 events" => [:parse, 100, (8192 * 3), 10], - "parse #{8192*3} bytes, 10 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 3), 10], + "parse #{8192*3} bytes, 48 events" => [:parse, 100, (8192 * 3), 48], + "parse #{8192*3} bytes, 48 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 3), 48], "parse #{8192*10} bytes, 22 events" => [:parse, 100, (8192 * 10), 22], "parse #{8192*10} bytes, 22 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 10), 22], - "flat 8k bytes with unit, 10 events" => [:flat, 100, "8k", 10], - "flat 8k bytes with unit, 10 events w/o stat watcher" => [:flat_without_stat, 100, "8k", 10], + "flat 8k bytes with unit, 24 events" => [:flat, 100, "8k", 24], + "flat 8k bytes with unit, 24 events w/o stat watcher" => [:flat_without_stat, 100, "8k", 24], "flat #{8*10}k bytes with unit, 22 events" => [:flat, 100, "#{8*10}k", 22], "flat #{8*10}k bytes with unit, 22 events w/o stat watcher" => [:flat_without_stat, 100, "#{8*10}k", 22], - "parse #{8*3}k bytes with unit, 10 events" => [:parse, 100, "#{8*3}k", 10], - "parse #{8*3}k bytes with unit, 10 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*3}k", 10], + "parse #{8*3}k bytes with unit, 48 events" => [:parse, 100, "#{8*3}k", 48], + "parse #{8*3}k bytes with unit, 48 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*3}k", 48], "parse #{8*10}k bytes with unit, 22 events" => [:parse, 100, "#{8*10}k", 22], "parse #{8*10}k bytes with unit, 22 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*10}k", 22]) def test_emit_with_read_bytes_limit_per_second(data) config_style, limit, limit_bytes, num_events = data case config_style when :flat - config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + CONFIG_MAX_THREAD_POOL_SIZE when :parse - config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG + config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG + CONFIG_MAX_THREAD_POOL_SIZE when :flat_without_stat - config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + CONFIG_MAX_THREAD_POOL_SIZE when :parse_without_stat - config = CONFIG_READ_FROM_HEAD + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG + config = CONFIG_READ_FROM_HEAD + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG + CONFIG_MAX_THREAD_POOL_SIZE end d = create_driver(config) msg = 'test' * 2000 # in_tail reads 8192 bytes at once. From 4b2526b4e8375be9dd427844b0a3bf8830eed150 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 24 Dec 2020 18:39:53 +0900 Subject: [PATCH 13/38] in_tail: Add warning for specifying less equal than 8192 on read_bytes_limit_per_second Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 3622719439..a14512acdc 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -188,6 +188,10 @@ def configure(conf) if @max_thread_pool_size < 0 raise Fluent::ConfigError, "Specify positive number" end + if @read_bytes_limit_per_second < 8192 + log.warn "Should specify greater equal than 8192. Use 8192 for read_bytes_limit_per_second" + @read_bytes_limit_per_second = 8192 + end require 'etc' if @max_thread_pool_size <= (Etc.nprocessors / 2) raise Fluent::ConfigError, "Specify #{Etc.nprocessors / 2} or more on max_thread_pool_size" From f88c4aec5adb49a7e138153ccb83d85cba133976 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 24 Dec 2020 18:56:51 +0900 Subject: [PATCH 14/38] in_tail: Check thread pool objects existences Signed-off-by: Hiroshi Hatake --- test/plugin/test_in_tail.rb | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 5c85677e7d..b2a0dd1bce 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -331,6 +331,14 @@ def test_emit_with_read_lines_limit(data) FileUtils.rm_f("#{TMP_DIR}/tail.txt") end + def count_thread_pool_object + num = 0 + ObjectSpace.each_object(Fluent::Plugin::TailInput::TailThread::Pool) { |obj| + num += 1 + } + num + end + data("flat 8192 bytes, 24 events" => [:flat, 100, 8192, 24], "flat 8192 bytes, 24 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 24], "flat #{8192*10} bytes, 22 events" => [:flat, 100, (8192 * 10), 22], @@ -370,6 +378,10 @@ def test_emit_with_read_bytes_limit_per_second(data) } end + assert do + count_thread_pool_object >= 1 + end + events = d.events assert_equal(true, events.length <= num_events) assert_equal({"message" => msg}, events[0][2]) From 61ae7ca0c94d99a0bcd4a3bfa24f79a4150f8f73 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 25 Dec 2020 11:43:11 +0900 Subject: [PATCH 15/38] in_tail: Use more strict parameter checking for log throttling feature Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 8 +++++--- test/plugin/test_in_tail.rb | 24 ++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index a14512acdc..72b0936a54 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -188,13 +188,15 @@ def configure(conf) if @max_thread_pool_size < 0 raise Fluent::ConfigError, "Specify positive number" end + if !@enable_watch_timer + raise Fluent::ConfigError, "Need to enable watch timer when using log throttling feature" + end if @read_bytes_limit_per_second < 8192 log.warn "Should specify greater equal than 8192. Use 8192 for read_bytes_limit_per_second" @read_bytes_limit_per_second = 8192 end - require 'etc' - if @max_thread_pool_size <= (Etc.nprocessors / 2) - raise Fluent::ConfigError, "Specify #{Etc.nprocessors / 2} or more on max_thread_pool_size" + if @max_thread_pool_size < 2 + raise Fluent::ConfigError, "Specify 2 or more on max_thread_pool_size" end end end diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index b2a0dd1bce..a97b42edf9 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -8,6 +8,7 @@ require 'timecop' require 'tmpdir' require 'securerandom' +require 'etc' class TailInputTest < Test::Unit::TestCase include FlexMock::TestCase @@ -198,6 +199,29 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) end end + sub_test_case "log throttling per file" do + test "w/o watcher timer is invalid" do + conf = CONFIG_ENABLE_WATCH_TIMER + CONFIG_MAX_THREAD_POOL_SIZE + config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"}) + assert_raise(Fluent::ConfigError) do + create_driver(conf) + end + end + + test "w/o 2 or more thread pool size is invalid" do + conf = config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"}) + assert_raise(Fluent::ConfigError) do + create_driver(conf) + end + end + + test "valid" do + conf = CONFIG_MAX_THREAD_POOL_SIZE + config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"}) + assert_raise(Fluent::ConfigError) do + create_driver(conf) + end + end + end + test "both enable_watch_timer and enable_stat_watcher are false" do assert_raise(Fluent::ConfigError) do create_driver(CONFIG_ENABLE_WATCH_TIMER + CONFIG_DISABLE_STAT_WATCHER + PARSE_SINGLE_LINE_CONFIG) From de6bef3bfbb2e83e3ec5a7ce989d0c1b6701147b Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 25 Dec 2020 12:44:01 +0900 Subject: [PATCH 16/38] in_tail: thread_pool: Add tailing thread poll testcases Signed-off-by: Hiroshi Hatake --- test/plugin/in_tail/test_tailthread_pool.rb | 26 +++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 test/plugin/in_tail/test_tailthread_pool.rb diff --git a/test/plugin/in_tail/test_tailthread_pool.rb b/test/plugin/in_tail/test_tailthread_pool.rb new file mode 100644 index 0000000000..412b9de445 --- /dev/null +++ b/test/plugin/in_tail/test_tailthread_pool.rb @@ -0,0 +1,26 @@ +require_relative '../../helper' +require 'fluent/plugin/in_tail' + +class TailThreadPoolTest < Test::Unit::TestCase + data("single thread" => 1, + "max thread pool size (10)" => 10, + "max thread pool size (20)" => 20, + ) + test "thread pool creation" do |data| + max_threads_pool_size = data + thread_pool = Fluent::Plugin::TailInput::TailThread::Pool.new(max_threads_pool_size) do |pool| + 100.times {|n| + pool.run { + nop_task(0.01) + assert do + pool.instance_variable_get(:@threads).size <= max_threads_pool_size + end + } + } + end + end + + def nop_task(sleep_time) + sleep sleep_time + end +end From 48d9be3c2a89a24b34f345810d04bfe6b4aec273 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 6 Jan 2021 11:24:42 +0900 Subject: [PATCH 17/38] Use 8192 bytes on number_bytes_read calculation Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 2 +- test/plugin/test_in_tail.rb | 22 +++++++++++----------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 72b0936a54..2a21ce6c19 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -997,7 +997,7 @@ def handle_notify @fifo << io.readpartial(8192, @iobuf) @fifo.read_lines(@lines) - number_bytes_read += @lines.last ? @lines.last.size : 0 + number_bytes_read += 8192 limit_bytes_per_second_reached = (number_bytes_read >= @read_bytes_limit_per_second && @read_bytes_limit_per_second > 0) @log.debug("reading file: #{@path}") if @lines.size >= @read_lines_limit || limit_bytes_per_second_reached diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index a97b42edf9..0fe160f112 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -363,22 +363,22 @@ def count_thread_pool_object num end - data("flat 8192 bytes, 24 events" => [:flat, 100, 8192, 24], - "flat 8192 bytes, 24 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 24], + data("flat 8192 bytes, 12 events" => [:flat, 100, 8192, 12], + "flat 8192 bytes, 12 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 12], "flat #{8192*10} bytes, 22 events" => [:flat, 100, (8192 * 10), 22], "flat #{8192*10} bytes, 22 events w/o stat watcher" => [:flat_without_stat, 100, (8192 * 10), 22], "parse #{8192*3} bytes, 48 events" => [:parse, 100, (8192 * 3), 48], "parse #{8192*3} bytes, 48 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 3), 48], - "parse #{8192*10} bytes, 22 events" => [:parse, 100, (8192 * 10), 22], - "parse #{8192*10} bytes, 22 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 10), 22], - "flat 8k bytes with unit, 24 events" => [:flat, 100, "8k", 24], - "flat 8k bytes with unit, 24 events w/o stat watcher" => [:flat_without_stat, 100, "8k", 24], - "flat #{8*10}k bytes with unit, 22 events" => [:flat, 100, "#{8*10}k", 22], - "flat #{8*10}k bytes with unit, 22 events w/o stat watcher" => [:flat_without_stat, 100, "#{8*10}k", 22], + "parse #{8192*10} bytes, 32 events" => [:parse, 100, (8192 * 10), 32], + "parse #{8192*10} bytes, 32 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 10), 32], + "flat 8k bytes with unit, 12 events" => [:flat, 100, "8k", 12], + "flat 8k bytes with unit, 12 events w/o stat watcher" => [:flat_without_stat, 100, "8k", 12], + "flat #{8*10}k bytes with unit, 32 events" => [:flat, 100, "#{8*10}k", 32], + "flat #{8*10}k bytes with unit, 32 events w/o stat watcher" => [:flat_without_stat, 100, "#{8*10}k", 32], "parse #{8*3}k bytes with unit, 48 events" => [:parse, 100, "#{8*3}k", 48], "parse #{8*3}k bytes with unit, 48 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*3}k", 48], - "parse #{8*10}k bytes with unit, 22 events" => [:parse, 100, "#{8*10}k", 22], - "parse #{8*10}k bytes with unit, 22 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*10}k", 22]) + "parse #{8*10}k bytes with unit, 32 events" => [:parse, 100, "#{8*10}k", 32], + "parse #{8*10}k bytes with unit, 32 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*10}k", 32]) def test_emit_with_read_bytes_limit_per_second(data) config_style, limit, limit_bytes, num_events = data case config_style @@ -407,7 +407,7 @@ def test_emit_with_read_bytes_limit_per_second(data) end events = d.events - assert_equal(true, events.length <= num_events) + assert_true(events.length <= num_events) assert_equal({"message" => msg}, events[0][2]) assert_equal({"message" => msg}, events[1][2]) end From 4d9a755147f3ebd96c441808a4b911cb95546957 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 7 Jan 2021 13:31:36 +0900 Subject: [PATCH 18/38] Fix a typo Signed-off-by: Hiroshi Hatake --- test/plugin/test_in_tail.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 0fe160f112..d67d481e3c 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -350,7 +350,7 @@ def test_emit_with_read_lines_limit(data) assert num_events <= d.emit_count end - sub_test_case "log throttoling per file" do + sub_test_case "log throttling per file" do teardown do FileUtils.rm_f("#{TMP_DIR}/tail.txt") end From a4459aeb8ad23d9531bf8103e13b2b5f72eea313 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 7 Jan 2021 13:34:53 +0900 Subject: [PATCH 19/38] Use constant instead of interger values Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 2a21ce6c19..81ba44e722 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -950,6 +950,8 @@ def bytesize end class IOHandler + BYTES_TO_READ = 8192 + def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, &receive_lines) @watcher = watcher @path = path @@ -994,10 +996,10 @@ def handle_notify if !io.nil? && @lines.empty? begin while true - @fifo << io.readpartial(8192, @iobuf) + @fifo << io.readpartial(BYTES_TO_READ, @iobuf) @fifo.read_lines(@lines) - number_bytes_read += 8192 + number_bytes_read += BYTES_TO_READ limit_bytes_per_second_reached = (number_bytes_read >= @read_bytes_limit_per_second && @read_bytes_limit_per_second > 0) @log.debug("reading file: #{@path}") if @lines.size >= @read_lines_limit || limit_bytes_per_second_reached From 4da4ae32a040df655d9ac0a405d858d749068033 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 4 Mar 2021 10:29:34 +0900 Subject: [PATCH 20/38] in_tail: Fix unstable testcases Signed-off-by: Hiroshi Hatake --- test/plugin/test_in_tail.rb | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index d67d481e3c..602b426085 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -363,16 +363,16 @@ def count_thread_pool_object num end - data("flat 8192 bytes, 12 events" => [:flat, 100, 8192, 12], - "flat 8192 bytes, 12 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 12], - "flat #{8192*10} bytes, 22 events" => [:flat, 100, (8192 * 10), 22], - "flat #{8192*10} bytes, 22 events w/o stat watcher" => [:flat_without_stat, 100, (8192 * 10), 22], + data("flat 8192 bytes, 21 events" => [:flat, 100, 8192, 21], + "flat 8192 bytes, 21 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 21], + "flat #{8192*10} bytes, 32 events" => [:flat, 100, (8192 * 10), 32], + "flat #{8192*10} bytes, 32 events w/o stat watcher" => [:flat_without_stat, 100, (8192 * 10), 32], "parse #{8192*3} bytes, 48 events" => [:parse, 100, (8192 * 3), 48], "parse #{8192*3} bytes, 48 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 3), 48], "parse #{8192*10} bytes, 32 events" => [:parse, 100, (8192 * 10), 32], "parse #{8192*10} bytes, 32 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 10), 32], - "flat 8k bytes with unit, 12 events" => [:flat, 100, "8k", 12], - "flat 8k bytes with unit, 12 events w/o stat watcher" => [:flat_without_stat, 100, "8k", 12], + "flat 8k bytes with unit, 21 events" => [:flat, 100, "8k", 21], + "flat 8k bytes with unit, 21 events w/o stat watcher" => [:flat_without_stat, 100, "8k", 21], "flat #{8*10}k bytes with unit, 32 events" => [:flat, 100, "#{8*10}k", 32], "flat #{8*10}k bytes with unit, 32 events w/o stat watcher" => [:flat_without_stat, 100, "#{8*10}k", 32], "parse #{8*3}k bytes with unit, 48 events" => [:parse, 100, "#{8*3}k", 48], From f29309bb3ccde45b79b08293d54924309c6448db Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 16 Mar 2021 16:45:39 +0900 Subject: [PATCH 21/38] in_tail: Use thread pool only when log throttling feature is enabled Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 72 +++++++++++++++++++++--------------- 1 file changed, 42 insertions(+), 30 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 81ba44e722..1b3505a36a 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -184,6 +184,7 @@ def configure(conf) @capability = Fluent::Capability.new(:current_process) # Need to create thread pool because #sleep should pause entire thread on enabling log throttling feature. @thread_pool = nil + @enable_read_bytes_limit_thread_pool = false if @read_bytes_limit_per_second > 0 if @max_thread_pool_size < 0 raise Fluent::ConfigError, "Specify positive number" @@ -198,6 +199,7 @@ def configure(conf) if @max_thread_pool_size < 2 raise Fluent::ConfigError, "Specify 2 or more on max_thread_pool_size" end + @enable_read_bytes_limit_thread_pool = true end end @@ -413,39 +415,49 @@ def setup_watcher(target_info, pe) raise e end - def start_watchers(targets_info) - @thread_pool = TailThread::Pool.new(@max_thread_pool_size) do |pool| - targets_info.each_value { |target_info| - pool.run { - pe = nil - if @pf - pe = @pf[target_info] - if @read_from_head && pe.read_inode.zero? - begin - pe.update(Fluent::FileWrapper.stat(target_info.path).ino, 0) - rescue Errno::ENOENT - $log.warn "#{target_info.path} not found. Continuing without tailing it." - end - end - end + def construct_watcher(target_info) + pe = nil + if @pf + pe = @pf[target_info] + if @read_from_head && pe.read_inode.zero? + begin + pe.update(Fluent::FileWrapper.stat(target_info.path).ino, 0) + rescue Errno::ENOENT + $log.warn "#{target_info.path} not found. Continuing without tailing it." + end + end + end - begin - tw = setup_watcher(target_info, pe) - rescue WatcherSetupError => e - log.warn "Skip #{target_info.path} because unexpected setup error happens: #{e}" - next - end + begin + tw = setup_watcher(target_info, pe) + rescue WatcherSetupError => e + log.warn "Skip #{target_info.path} because unexpected setup error happens: #{e}" + return + end - begin - target_info = TargetInfo.new(target_info.path, Fluent::FileWrapper.stat(target_info.path).ino) - @tails[target_info] = tw - rescue Errno::ENOENT - $log.warn "stat() for #{target_info.path} failed with ENOENT. Drop tail watcher for now." - # explicitly detach and unwatch watcher `tw`. - tw.unwatched = true - detach_watcher(tw, target_info.ino, false) - end + begin + target_info = TargetInfo.new(target_info.path, Fluent::FileWrapper.stat(target_info.path).ino) + @tails[target_info] = tw + rescue Errno::ENOENT + $log.warn "stat() for #{target_info.path} failed with ENOENT. Drop tail watcher for now." + # explicitly detach and unwatch watcher `tw`. + tw.unwatched = true + detach_watcher(tw, target_info.ino, false) + end + end + + def start_watchers(targets_info) + if @enable_read_bytes_limit_thread_pool + @thread_pool = TailThread::Pool.new(@max_thread_pool_size) do |pool| + targets_info.each_value {|target_info| + pool.run { + construct_watcher(target_info) + } } + end + else + targets_info.each_value {|target_info| + construct_watcher(target_info) } end end From 490a329bc268cdd2449efe150882065db700fccb Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 31 Mar 2021 09:28:55 +0900 Subject: [PATCH 22/38] in_tail: Fix incorrect #join usage Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 1b3505a36a..ef6e9c2d6b 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -738,7 +738,7 @@ def stop end begin Timeout.timeout(1) do - @threads.join + @threads.each{|th| th.join } end rescue Timeout::Error @threads.each {|th| th.kill } From f1807c9b9040f3f514c81aebeefb7f5892fedf3e Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 31 Mar 2021 09:29:59 +0900 Subject: [PATCH 23/38] in_tail: Use Fluent::Clock.new to obtain starting time Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index ef6e9c2d6b..3255d22b32 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -1002,7 +1002,7 @@ def handle_notify with_io do |io| begin number_bytes_read = 0 - start_reading = Fluent::EventTime.now + start_reading = Fluent::Clock.now read_more = false if !io.nil? && @lines.empty? From 7023b3f22a87483ae363e3af8c5b413a73de20d9 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 31 Mar 2021 16:05:30 +0900 Subject: [PATCH 24/38] in_tail: Use Fluent::Clock.now instead of Fluent::EventTime.now again Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 3255d22b32..f2e2074049 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -1020,14 +1020,14 @@ def handle_notify if limit_bytes_per_second_reached # sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion - time_spent_reading = Fluent::EventTime.now - start_reading + time_spent_reading = Fluent::Clock.now - start_reading @log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}") if time_spent_reading < 1 needed_sleeping_time = 1 - time_spent_reading @log.debug("sleep: #{needed_sleeping_time}") sleep(needed_sleeping_time) end - start_reading = Fluent::EventTime.now + start_reading = Fluent::Clock.now end break From ae9deef906dd7cfc2ecbe726cfd5192f952ec7f4 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 5 Apr 2021 17:21:27 +0900 Subject: [PATCH 25/38] in_tail: Prevent to call for nil instance Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index f2e2074049..b620605cf9 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -747,8 +747,7 @@ def stop protected def create_thread Thread.start(@queue) do |q| - loop do - task = q.pop + while task = q.pop task.call end end From 8c63cb17beef828336119f7f1ad30abe8875a6df Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 5 Apr 2021 17:21:54 +0900 Subject: [PATCH 26/38] in_tail: Prevent to reuse `@thread_pool` Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index b620605cf9..df10d37560 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -480,6 +480,7 @@ def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watch } if @thread_pool @thread_pool.stop + @thread_pool = nil end end From 43e8f4f0e0d12cbc97b136eedbca96b483edf17b Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 5 Apr 2021 17:46:01 +0900 Subject: [PATCH 27/38] in_tail: test: Use safe file removing method Signed-off-by: Hiroshi Hatake --- test/plugin/test_in_tail.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 602b426085..b72cc2bddc 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -352,7 +352,7 @@ def test_emit_with_read_lines_limit(data) sub_test_case "log throttling per file" do teardown do - FileUtils.rm_f("#{TMP_DIR}/tail.txt") + cleanup_file("#{TMP_DIR}/tail.txt") end def count_thread_pool_object From 808e27e428f69af7b6872648d9e606cdc0b2ade0 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 5 Apr 2021 18:01:53 +0900 Subject: [PATCH 28/38] in_tail: Remove needless start_reading variable reset Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index df10d37560..42f4e741e4 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -1027,7 +1027,6 @@ def handle_notify @log.debug("sleep: #{needed_sleeping_time}") sleep(needed_sleeping_time) end - start_reading = Fluent::Clock.now end break From 1c88b0caa0f9a94104a38bb652fa4d2985ba8420 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 6 Apr 2021 18:10:41 +0900 Subject: [PATCH 29/38] Use #bytesize for calculating read bytes Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 42f4e741e4..85b0065811 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -1008,10 +1008,11 @@ def handle_notify if !io.nil? && @lines.empty? begin while true - @fifo << io.readpartial(BYTES_TO_READ, @iobuf) + data = io.readpartial(BYTES_TO_READ, @iobuf) + number_bytes_read += data.bytesize + @fifo << data @fifo.read_lines(@lines) - number_bytes_read += BYTES_TO_READ limit_bytes_per_second_reached = (number_bytes_read >= @read_bytes_limit_per_second && @read_bytes_limit_per_second > 0) @log.debug("reading file: #{@path}") if @lines.size >= @read_lines_limit || limit_bytes_per_second_reached From e3e93459f9571d7b2c31177c9bbd9ea04c160902 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 7 Apr 2021 18:39:00 +0900 Subject: [PATCH 30/38] in_tail: thread_pool: Check running status properly Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 85b0065811..31a927d8aa 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -722,7 +722,7 @@ def initialize(max_size, &session) session.call(self) @running = true ensure - stop + terminate end def run(&task) @@ -730,13 +730,11 @@ def run(&task) @threads << create_thread if @threads.size < @max_size end - def stop - return unless @running - @running = false - + def terminate until @queue.num_waiting == @threads.size sleep 0.01 end + @queue.close begin Timeout.timeout(1) do @threads.each{|th| th.join } @@ -746,6 +744,13 @@ def stop end end + def stop + return unless running? + @running = false + + terminate + end + protected def create_thread Thread.start(@queue) do |q| while task = q.pop @@ -753,6 +758,10 @@ def stop end end end + + protected def running? + !@queue.closed? && @running + end end end From 671c666c3e8fbff7a5ac646f776e59a405b10cd3 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Sat, 10 Apr 2021 23:38:12 +0900 Subject: [PATCH 31/38] in_tail: thread_pool: Detect running or not with only queue is closed This is because checking `@queue` status is enough to detect whether thread pool is alived or not. Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 31a927d8aa..04ef349f5d 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -718,9 +718,7 @@ def initialize(max_size, &session) @max_size = max_size @queue = Queue.new @threads = [] - @running = false session.call(self) - @running = true ensure terminate end @@ -746,7 +744,6 @@ def terminate def stop return unless running? - @running = false terminate end @@ -760,7 +757,7 @@ def stop end protected def running? - !@queue.closed? && @running + !@queue.closed? end end end From e4747eab411fdff730c791a7f16ab192ebe8246e Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Sat, 10 Apr 2021 23:39:42 +0900 Subject: [PATCH 32/38] in_tail: test: Reconsider testing caluses and assertion conditions To create test fixture should contain 20 lines or greater. Then, we should create 30 lines text included file. num_events should be specify more accurate values. 8192 bytes case, num_events should specify 2 to make more concreate circumstances. To prevent hard timout on driver#run, we should specify `shutdown: false` keyward argument. This is a culprit to generate plugin hard timeout error messages on testing. Signed-off-by: Hiroshi Hatake --- test/plugin/test_in_tail.rb | 40 ++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index b72cc2bddc..94bedb3cd4 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -363,22 +363,22 @@ def count_thread_pool_object num end - data("flat 8192 bytes, 21 events" => [:flat, 100, 8192, 21], - "flat 8192 bytes, 21 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 21], - "flat #{8192*10} bytes, 32 events" => [:flat, 100, (8192 * 10), 32], - "flat #{8192*10} bytes, 32 events w/o stat watcher" => [:flat_without_stat, 100, (8192 * 10), 32], - "parse #{8192*3} bytes, 48 events" => [:parse, 100, (8192 * 3), 48], - "parse #{8192*3} bytes, 48 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 3), 48], - "parse #{8192*10} bytes, 32 events" => [:parse, 100, (8192 * 10), 32], - "parse #{8192*10} bytes, 32 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 10), 32], - "flat 8k bytes with unit, 21 events" => [:flat, 100, "8k", 21], - "flat 8k bytes with unit, 21 events w/o stat watcher" => [:flat_without_stat, 100, "8k", 21], - "flat #{8*10}k bytes with unit, 32 events" => [:flat, 100, "#{8*10}k", 32], - "flat #{8*10}k bytes with unit, 32 events w/o stat watcher" => [:flat_without_stat, 100, "#{8*10}k", 32], - "parse #{8*3}k bytes with unit, 48 events" => [:parse, 100, "#{8*3}k", 48], - "parse #{8*3}k bytes with unit, 48 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*3}k", 48], - "parse #{8*10}k bytes with unit, 32 events" => [:parse, 100, "#{8*10}k", 32], - "parse #{8*10}k bytes with unit, 32 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*10}k", 32]) + data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2], + "flat 8192 bytes, 2 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 2], + "flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20], + "flat #{8192*10} bytes, 20 events w/o stat watcher" => [:flat_without_stat, 100, (8192 * 10), 20], + "parse #{8192*4} bytes, 8 events" => [:parse, 100, (8192 * 4), 8], + "parse #{8192*4} bytes, 8 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 4), 8], + "parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20], + "parse #{8192*10} bytes, 20 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 10), 20], + "flat 8k bytes with unit, 2 events" => [:flat, 100, "8k", 2], + "flat 8k bytes with unit, 2 events w/o stat watcher" => [:flat_without_stat, 100, "8k", 2], + "flat #{8*10}k bytes with unit, 20 events" => [:flat, 100, "#{8*10}k", 20], + "flat #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:flat_without_stat, 100, "#{8*10}k", 20], + "parse #{8*4}k bytes with unit, 8 events" => [:parse, 100, "#{8*4}k", 8], + "parse #{8*4}k bytes with unit, 8 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*4}k", 8], + "parse #{8*10}k bytes with unit, 20 events" => [:parse, 100, "#{8*10}k", 20], + "parse #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*10}k", 20]) def test_emit_with_read_bytes_limit_per_second(data) config_style, limit, limit_bytes, num_events = data case config_style @@ -394,9 +394,10 @@ def test_emit_with_read_bytes_limit_per_second(data) d = create_driver(config) msg = 'test' * 2000 # in_tail reads 8192 bytes at once. - d.run(expect_emits: 2) do + # We should not do shutdown here due to hard timeout. + d.run(expect_emits: 2, shutdown: false) do File.open("#{TMP_DIR}/tail.txt", "ab") {|f| - for _x in 0..20 + for _x in 0..30 f.puts msg end } @@ -410,6 +411,9 @@ def test_emit_with_read_bytes_limit_per_second(data) assert_true(events.length <= num_events) assert_equal({"message" => msg}, events[0][2]) assert_equal({"message" => msg}, events[1][2]) + + # Teardown in_tail plugin instance here. + d.instance.shutdown end end From af2ea9bb3af594bf0a1fbb18eaf701c26ebcf291 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 12 Apr 2021 09:31:41 +0900 Subject: [PATCH 33/38] in_tail: Extract as a method for sleep for log ingestion code Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 04ef349f5d..011c947c7e 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -1004,6 +1004,17 @@ def opened? private + def sleep_for_log_ingestion(start_reading) + # sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion + time_spent_reading = Fluent::Clock.now - start_reading + @log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}") + if time_spent_reading < 1 + needed_sleeping_time = 1 - time_spent_reading + @log.debug("sleep: #{needed_sleeping_time}") + sleep(needed_sleeping_time) + end + end + def handle_notify with_io do |io| begin @@ -1025,16 +1036,7 @@ def handle_notify # not to use too much memory in case the file is very large read_more = true - if limit_bytes_per_second_reached - # sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion - time_spent_reading = Fluent::Clock.now - start_reading - @log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}") - if time_spent_reading < 1 - needed_sleeping_time = 1 - time_spent_reading - @log.debug("sleep: #{needed_sleeping_time}") - sleep(needed_sleeping_time) - end - end + sleep_for_log_ingestion(start_reading) if limit_bytes_per_second_reached break end From d3a3023fc8e3d6b06fc37c7f432fc91b5694471f Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 11 May 2021 09:35:06 +0900 Subject: [PATCH 34/38] Avoid to use `#sleep` for stop reading files Specify `read_more` as `false` is enough to prevent reading beyond bytes limits. Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 011c947c7e..3b9adc3994 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -1004,15 +1004,16 @@ def opened? private - def sleep_for_log_ingestion(start_reading) + def need_to_get_out_for_log_ingestion_loop?(start_reading) # sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion time_spent_reading = Fluent::Clock.now - start_reading @log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}") if time_spent_reading < 1 needed_sleeping_time = 1 - time_spent_reading - @log.debug("sleep: #{needed_sleeping_time}") - sleep(needed_sleeping_time) + @log.debug("log ingestion for `#{@path}' is suspended temporary. Read it again later.") + return true end + false end def handle_notify @@ -1036,7 +1037,7 @@ def handle_notify # not to use too much memory in case the file is very large read_more = true - sleep_for_log_ingestion(start_reading) if limit_bytes_per_second_reached + read_more = false if limit_bytes_per_second_reached && need_to_get_out_for_log_ingestion_loop?(start_reading) break end From 478b487383e1dec1af86cb0def2000b515eb6577 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 11 May 2021 10:24:20 +0900 Subject: [PATCH 35/38] Remove needless thread poll implementation If we don't use `#sleep` to prevent reading bytes beyond limits, it is not needed to implement thread pool mechanism. We just need to get out reading file loop when bytes limit is reached. Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 82 +-------------------- test/plugin/in_tail/test_tailthread_pool.rb | 26 ------- test/plugin/test_in_tail.rb | 34 ++------- 3 files changed, 9 insertions(+), 133 deletions(-) delete mode 100644 test/plugin/in_tail/test_tailthread_pool.rb diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 3b9adc3994..9f8ca451f7 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -109,8 +109,6 @@ def initialize config_param :path_timezone, :string, default: nil desc 'Follow inodes instead of following file names. Guarantees more stable delivery and allows to use * in path pattern with rotating files' config_param :follow_inodes, :bool, default: false - desc 'Specify max size of thread pool' - config_param :max_thread_pool_size, :integer, default: 1 config_section :parse, required: false, multi: true, init: true, param_name: :parser_configs do config_argument :usage, :string, default: 'in_tail_parser' @@ -182,13 +180,7 @@ def configure(conf) # parser is already created by parser helper @parser = parser_create(usage: parser_config['usage'] || @parser_configs.first.usage) @capability = Fluent::Capability.new(:current_process) - # Need to create thread pool because #sleep should pause entire thread on enabling log throttling feature. - @thread_pool = nil - @enable_read_bytes_limit_thread_pool = false if @read_bytes_limit_per_second > 0 - if @max_thread_pool_size < 0 - raise Fluent::ConfigError, "Specify positive number" - end if !@enable_watch_timer raise Fluent::ConfigError, "Need to enable watch timer when using log throttling feature" end @@ -196,10 +188,6 @@ def configure(conf) log.warn "Should specify greater equal than 8192. Use 8192 for read_bytes_limit_per_second" @read_bytes_limit_per_second = 8192 end - if @max_thread_pool_size < 2 - raise Fluent::ConfigError, "Specify 2 or more on max_thread_pool_size" - end - @enable_read_bytes_limit_thread_pool = true end end @@ -447,19 +435,9 @@ def construct_watcher(target_info) end def start_watchers(targets_info) - if @enable_read_bytes_limit_thread_pool - @thread_pool = TailThread::Pool.new(@max_thread_pool_size) do |pool| - targets_info.each_value {|target_info| - pool.run { - construct_watcher(target_info) - } - } - end - else - targets_info.each_value {|target_info| - construct_watcher(target_info) - } - end + targets_info.each_value {|target_info| + construct_watcher(target_info) + } end def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watcher: true) @@ -478,10 +456,6 @@ def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watch end end } - if @thread_pool - @thread_pool.stop - @thread_pool = nil - end end def close_watcher_handles @@ -712,56 +686,6 @@ def on_timer end end - class TailThread - class Pool - def initialize(max_size, &session) - @max_size = max_size - @queue = Queue.new - @threads = [] - session.call(self) - ensure - terminate - end - - def run(&task) - @queue.push(task) - @threads << create_thread if @threads.size < @max_size - end - - def terminate - until @queue.num_waiting == @threads.size - sleep 0.01 - end - @queue.close - begin - Timeout.timeout(1) do - @threads.each{|th| th.join } - end - rescue Timeout::Error - @threads.each {|th| th.kill } - end - end - - def stop - return unless running? - - terminate - end - - protected def create_thread - Thread.start(@queue) do |q| - while task = q.pop - task.call - end - end - end - - protected def running? - !@queue.closed? - end - end - end - class TailWatcher def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build) @path = target_info.path diff --git a/test/plugin/in_tail/test_tailthread_pool.rb b/test/plugin/in_tail/test_tailthread_pool.rb deleted file mode 100644 index 412b9de445..0000000000 --- a/test/plugin/in_tail/test_tailthread_pool.rb +++ /dev/null @@ -1,26 +0,0 @@ -require_relative '../../helper' -require 'fluent/plugin/in_tail' - -class TailThreadPoolTest < Test::Unit::TestCase - data("single thread" => 1, - "max thread pool size (10)" => 10, - "max thread pool size (20)" => 20, - ) - test "thread pool creation" do |data| - max_threads_pool_size = data - thread_pool = Fluent::Plugin::TailInput::TailThread::Pool.new(max_threads_pool_size) do |pool| - 100.times {|n| - pool.run { - nop_task(0.01) - assert do - pool.instance_variable_get(:@threads).size <= max_threads_pool_size - end - } - } - end - end - - def nop_task(sleep_time) - sleep sleep_time - end -end diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 94bedb3cd4..a651144bd3 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -8,7 +8,6 @@ require 'timecop' require 'tmpdir' require 'securerandom' -require 'etc' class TailInputTest < Test::Unit::TestCase include FlexMock::TestCase @@ -103,7 +102,6 @@ def create_target_info(path) CONFIG_ENABLE_WATCH_TIMER = config_element("", "", { "enable_watch_timer" => false }) CONFIG_DISABLE_STAT_WATCHER = config_element("", "", { "enable_stat_watcher" => false }) CONFIG_OPEN_ON_EVERY_UPDATE = config_element("", "", { "open_on_every_update" => true }) - CONFIG_MAX_THREAD_POOL_SIZE = config_element("", "", { "max_thread_pool_size" => (Etc.nprocessors / 1.5).ceil }) COMMON_FOLLOW_INODE_CONFIG = config_element("ROOT", "", { "path" => "#{TMP_DIR}/tail.txt*", "pos_file" => "#{TMP_DIR}/tail.pos", @@ -159,7 +157,6 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) assert_equal "#{TMP_DIR}/tail.pos", d.instance.pos_file assert_equal 1000, d.instance.read_lines_limit assert_equal -1, d.instance.read_bytes_limit_per_second - assert_equal 1, d.instance.max_thread_pool_size assert_equal false, d.instance.ignore_repeated_permission_error assert_nothing_raised do d.instance.have_read_capability? @@ -201,21 +198,14 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) sub_test_case "log throttling per file" do test "w/o watcher timer is invalid" do - conf = CONFIG_ENABLE_WATCH_TIMER + CONFIG_MAX_THREAD_POOL_SIZE + config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"}) - assert_raise(Fluent::ConfigError) do - create_driver(conf) - end - end - - test "w/o 2 or more thread pool size is invalid" do - conf = config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"}) + conf = CONFIG_ENABLE_WATCH_TIMER + config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"}) assert_raise(Fluent::ConfigError) do create_driver(conf) end end test "valid" do - conf = CONFIG_MAX_THREAD_POOL_SIZE + config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"}) + conf = config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"}) assert_raise(Fluent::ConfigError) do create_driver(conf) end @@ -355,14 +345,6 @@ def test_emit_with_read_lines_limit(data) cleanup_file("#{TMP_DIR}/tail.txt") end - def count_thread_pool_object - num = 0 - ObjectSpace.each_object(Fluent::Plugin::TailInput::TailThread::Pool) { |obj| - num += 1 - } - num - end - data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2], "flat 8192 bytes, 2 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 2], "flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20], @@ -383,13 +365,13 @@ def test_emit_with_read_bytes_limit_per_second(data) config_style, limit, limit_bytes, num_events = data case config_style when :flat - config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + CONFIG_MAX_THREAD_POOL_SIZE + config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) when :parse - config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG + CONFIG_MAX_THREAD_POOL_SIZE + config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG when :flat_without_stat - config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + CONFIG_MAX_THREAD_POOL_SIZE + config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) when :parse_without_stat - config = CONFIG_READ_FROM_HEAD + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG + CONFIG_MAX_THREAD_POOL_SIZE + config = CONFIG_READ_FROM_HEAD + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG end d = create_driver(config) msg = 'test' * 2000 # in_tail reads 8192 bytes at once. @@ -403,10 +385,6 @@ def test_emit_with_read_bytes_limit_per_second(data) } end - assert do - count_thread_pool_object >= 1 - end - events = d.events assert_true(events.length <= num_events) assert_equal({"message" => msg}, events[0][2]) From 147ffbfad2c1a24dce219528b99c5d933a98aa01 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 11 May 2021 12:37:04 +0900 Subject: [PATCH 36/38] Split complicated clause into read lines limits and log throttling limits Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 9f8ca451f7..9b1f4b54a9 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -928,16 +928,21 @@ def opened? private - def need_to_get_out_for_log_ingestion_loop?(start_reading) - # sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion - time_spent_reading = Fluent::Clock.now - start_reading - @log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}") - if time_spent_reading < 1 - needed_sleeping_time = 1 - time_spent_reading - @log.debug("log ingestion for `#{@path}' is suspended temporary. Read it again later.") - return true + def limit_bytes_per_second_reached?(start_reading, number_bytes_read) + if number_bytes_read >= @read_bytes_limit_per_second && @read_bytes_limit_per_second > 0 + # sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion + time_spent_reading = Fluent::Clock.now - start_reading + @log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}") + if time_spent_reading < 1 + needed_sleeping_time = 1 - time_spent_reading + @log.debug("log ingestion for `#{@path}' is suspended temporary. Read it again later.") + return true + end + + false + else + false end - false end def handle_notify @@ -955,14 +960,15 @@ def handle_notify @fifo << data @fifo.read_lines(@lines) - limit_bytes_per_second_reached = (number_bytes_read >= @read_bytes_limit_per_second && @read_bytes_limit_per_second > 0) @log.debug("reading file: #{@path}") - if @lines.size >= @read_lines_limit || limit_bytes_per_second_reached + if @lines.size >= @read_lines_limit # not to use too much memory in case the file is very large read_more = true - - read_more = false if limit_bytes_per_second_reached && need_to_get_out_for_log_ingestion_loop?(start_reading) - + break + end + if limit_bytes_per_second_reached?(start_reading, number_bytes_read) + # Just get out from tailing loop. + read_more = false break end end From 6be35337982e3887ef4e72d8976c46c5c6ccda35 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 11 May 2021 13:43:02 +0900 Subject: [PATCH 37/38] Use early return on the log throttling first clause Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/in_tail.rb | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 9b1f4b54a9..753115a0f0 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -928,21 +928,23 @@ def opened? private - def limit_bytes_per_second_reached?(start_reading, number_bytes_read) - if number_bytes_read >= @read_bytes_limit_per_second && @read_bytes_limit_per_second > 0 - # sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion - time_spent_reading = Fluent::Clock.now - start_reading - @log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}") - if time_spent_reading < 1 - needed_sleeping_time = 1 - time_spent_reading - @log.debug("log ingestion for `#{@path}' is suspended temporary. Read it again later.") - return true - end + def read_bytes_limits_reached?(number_bytes_read) + number_bytes_read >= @read_bytes_limit_per_second && @read_bytes_limit_per_second > 0 + end - false - else - false + def limit_bytes_per_second_reached?(start_reading, number_bytes_read) + return false unless read_bytes_limits_reached?(number_bytes_read) + + # sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion + time_spent_reading = Fluent::Clock.now - start_reading + @log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}") + if time_spent_reading < 1 + needed_sleeping_time = 1 - time_spent_reading + @log.debug("log ingestion for `#{@path}' is suspended temporary. Read it again later.") + return true end + + false end def handle_notify From 98616e28001251875c49ffd6b210a80baca0342c Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Tue, 11 May 2021 14:42:17 +0900 Subject: [PATCH 38/38] in_tail: Replace a magic number with a constant Should use TailWatcher::IOHandler::BYTES_TO_READ as mininum value of read_bytes_limit_per_second. Signed-off-by: Takuro Ashie --- lib/fluent/plugin/in_tail.rb | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 753115a0f0..a07902a715 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -184,9 +184,10 @@ def configure(conf) if !@enable_watch_timer raise Fluent::ConfigError, "Need to enable watch timer when using log throttling feature" end - if @read_bytes_limit_per_second < 8192 - log.warn "Should specify greater equal than 8192. Use 8192 for read_bytes_limit_per_second" - @read_bytes_limit_per_second = 8192 + min_bytes = TailWatcher::IOHandler::BYTES_TO_READ + if @read_bytes_limit_per_second < min_bytes + log.warn "Should specify greater equal than #{min_bytes}. Use #{min_bytes} for read_bytes_limit_per_second" + @read_bytes_limit_per_second = min_bytes end end end