Skip to content

Commit

Permalink
in_tail: Handle log throttling per file feature
Browse files Browse the repository at this point in the history
Signed-off-by: Hiroshi Hatake <hatake@clear-code.com>
Co-authored-by: Anthony Comtois <anthony.comtois@sky.uk>
  • Loading branch information
cosmo0920 and rewiko committed Dec 1, 2020
1 parent 9110685 commit 7d11b46
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 7 deletions.
30 changes: 27 additions & 3 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ def initialize
config_param :refresh_interval, :time, default: 60
desc 'The number of reading lines at each IO.'
config_param :read_lines_limit, :integer, default: 1000
desc 'The number of reading bytes per second'
config_param :read_bytes_limit_per_second, :integer, default: -1
desc 'The interval of flushing the buffer for multiline format'
config_param :multiline_flush_interval, :time, default: nil
desc 'Enable the option to emit unmatched lines.'
Expand Down Expand Up @@ -570,6 +572,7 @@ def io_handler(watcher, path)
path: path,
log: log,
read_lines_limit: @read_lines_limit,
read_bytes_limit_per_second: @read_bytes_limit_per_second,
open_on_every_update: @open_on_every_update,
from_encoding: @from_encoding,
encoding: @encoding,
Expand Down Expand Up @@ -795,10 +798,11 @@ def bytesize
end

class IOHandler
def initialize(watcher, path:, read_lines_limit:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, &receive_lines)
def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, &receive_lines)
@watcher = watcher
@path = path
@read_lines_limit = read_lines_limit
@read_bytes_limit_per_second = read_bytes_limit_per_second
@receive_lines = receive_lines
@open_on_every_update = open_on_every_update
@fifo = FIFO.new(from_encoding || Encoding::ASCII_8BIT, encoding || Encoding::ASCII_8BIT)
Expand Down Expand Up @@ -831,16 +835,36 @@ def opened?
def handle_notify
with_io do |io|
begin
bytes_to_read = 8192
number_bytes_read = 0
start_reading = Fluent::EventTime.now
read_more = false

if !io.nil? && @lines.empty?
begin
while true
@fifo << io.readpartial(8192, @iobuf)
@fifo << io.readpartial(bytes_to_read, @iobuf)
@fifo.read_lines(@lines)
if @lines.size >= @read_lines_limit

number_bytes_read += bytes_to_read
limit_bytes_per_second_reached = (number_bytes_read >= @read_bytes_limit_per_second && @read_bytes_limit_per_second > 0)
@log.debug("reading file: #{@path}")
if @lines.size >= @read_lines_limit || limit_bytes_per_second_reached
# not to use too much memory in case the file is very large
read_more = true

if limit_bytes_per_second_reached
# sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion
time_spent_reading = Time.now - start_reading
@log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}")
if time_spent_reading < 1
debug_time = 1 - time_spent_reading
@log.debug("sleep: #{debug_time}")
sleep(1 - time_spent_reading)
end
start_reading = Fluent::EventTime.now
end

break
end
end
Expand Down
8 changes: 4 additions & 4 deletions test/plugin/in_tail/test_io_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase
end

returned_lines = ''
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, log: $log, open_on_every_update: false) do |lines, _watcher|
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false) do |lines, _watcher|
returned_lines << lines.join
true
end
Expand Down Expand Up @@ -62,7 +62,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase
end

returned_lines = ''
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, log: $log, open_on_every_update: true) do |lines, _watcher|
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: true) do |lines, _watcher|
returned_lines << lines.join
true
end
Expand Down Expand Up @@ -93,7 +93,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase
end

returned_lines = []
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, log: $log, open_on_every_update: false) do |lines, _watcher|
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false) do |lines, _watcher|
returned_lines << lines.dup
true
end
Expand All @@ -119,7 +119,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase
end

returned_lines = []
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, log: $log, open_on_every_update: false) do |lines, _watcher|
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false) do |lines, _watcher|
returned_lines << lines.dup
true
end
Expand Down
36 changes: 36 additions & 0 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true)
assert_equal 2, d.instance.rotate_wait
assert_equal "#{TMP_DIR}/tail.pos", d.instance.pos_file
assert_equal 1000, d.instance.read_lines_limit
assert_equal -1, d.instance.read_bytes_limit_per_second
assert_equal false, d.instance.ignore_repeated_permission_error
assert_nothing_raised do
d.instance.have_read_capability?
Expand Down Expand Up @@ -246,6 +247,41 @@ def test_emit_with_read_lines_limit(data)
assert num_events <= d.emit_count
end

sub_test_case "log throttoling per file" do
teardown do
FileUtils.rm_f("#{TMP_DIR}/tail.txt")
end

data("flat 8192 bytes, 10 events" => [:flat, 100, 8192, 10],
"flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20],
"parse #{8192*3} bytes, 10 events" => [:parse, 100, (8192 * 3), 10],
"parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20])
def test_emit_with_read_bytes_limit_per_second(data)
config_style, limit, limit_bytes, num_events = data
case config_style
when :flat
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes })
when :parse
config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
end
d = create_driver(config)
msg = 'test' * 2000 # in_tail reads 8192 bytes at once.

d.run(expect_emits: 2) do
File.open("#{TMP_DIR}/tail.txt", "ab") {|f|
for _x in 0..20
f.puts msg
end
}
end

events = d.events
assert_equal(true, events.length <= num_events)
assert_equal({"message" => msg}, events[0][2])
assert_equal({"message" => msg}, events[1][2])
end
end

data(flat: CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG,
parse: CONFIG_READ_FROM_HEAD + PARSE_SINGLE_LINE_CONFIG)
def test_emit_with_read_from_head(data)
Expand Down

0 comments on commit 7d11b46

Please sign in to comment.