Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add log throttling per file (revised) #3185

Merged
merged 38 commits into from May 11, 2021
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
d6755ed
in_tail: Handle log throttling per file feature
cosmo0920 Nov 30, 2020
a5d3a5a
in_tail: Use size config type for read_bytes_limit_per_second
cosmo0920 Dec 1, 2020
d3d7119
in_tail: Use actual read bytes to calculate read bytes number
cosmo0920 Dec 21, 2020
30a63ca
in_tail: Use more meaningful variable name
cosmo0920 Dec 21, 2020
61554dc
in_tail: Use fixed value instead of variable
cosmo0920 Dec 21, 2020
a54f25b
in_tail: test: Add log throttling testcases for w/o stat watcher situ…
cosmo0920 Dec 22, 2020
2b2260e
in_tail: Guard for read empty lines
cosmo0920 Dec 23, 2020
8ebc227
Use Thread Pool on tailing to prevent Kernel#sleep for entire sleep
cosmo0920 Dec 23, 2020
6e8a13c
in_tail: Use Fluent::EventTime instead of Time
cosmo0920 Dec 24, 2020
5b19e40
in_tail: Check invalid max_thread_pool_size on #configure
cosmo0920 Dec 24, 2020
854cca8
in_tail: Set default value of max_thread_pool_size as 1
cosmo0920 Dec 24, 2020
f76f45a
in_tail: Use 2 or more thread pool size on log throttling testcases
cosmo0920 Dec 24, 2020
4b2526b
in_tail: Add warning for specifying less equal than 8192 on read_byte…
cosmo0920 Dec 24, 2020
f88c4ae
in_tail: Check thread pool objects existences
cosmo0920 Dec 24, 2020
61ae7ca
in_tail: Use more strict parameter checking for log throttling feature
cosmo0920 Dec 25, 2020
de6bef3
in_tail: thread_pool: Add tailing thread poll testcases
cosmo0920 Dec 25, 2020
48d9be3
Use 8192 bytes on number_bytes_read calculation
cosmo0920 Jan 6, 2021
4d9a755
Fix a typo
cosmo0920 Jan 7, 2021
a4459ae
Use constant instead of interger values
cosmo0920 Jan 7, 2021
4da4ae3
in_tail: Fix unstable testcases
cosmo0920 Mar 4, 2021
f29309b
in_tail: Use thread pool only when log throttling feature is enabled
cosmo0920 Mar 16, 2021
490a329
in_tail: Fix incorrect #join usage
cosmo0920 Mar 31, 2021
f1807c9
in_tail: Use Fluent::Clock.new to obtain starting time
cosmo0920 Mar 31, 2021
7023b3f
in_tail: Use Fluent::Clock.now instead of Fluent::EventTime.now again
cosmo0920 Mar 31, 2021
ae9deef
in_tail: Prevent to call for nil instance
cosmo0920 Apr 5, 2021
8c63cb1
in_tail: Prevent to reuse `@thread_pool`
cosmo0920 Apr 5, 2021
43e8f4f
in_tail: test: Use safe file removing method
cosmo0920 Apr 5, 2021
808e27e
in_tail: Remove needless start_reading variable reset
cosmo0920 Apr 5, 2021
1c88b0c
Use #bytesize for calculating read bytes
cosmo0920 Apr 6, 2021
e3e9345
in_tail: thread_pool: Check running status properly
cosmo0920 Apr 7, 2021
671c666
in_tail: thread_pool: Detect running or not with only queue is closed
cosmo0920 Apr 10, 2021
e4747ea
in_tail: test: Reconsider testing caluses and assertion conditions
cosmo0920 Apr 10, 2021
af2ea9b
in_tail: Extract as a method for sleep for log ingestion code
cosmo0920 Apr 12, 2021
d3a3023
Avoid to use `#sleep` for stop reading files
cosmo0920 May 11, 2021
478b487
Remove needless thread poll implementation
cosmo0920 May 11, 2021
147ffbf
Split complicated clause into read lines limits and log throttling
cosmo0920 May 11, 2021
6be3533
Use early return on the log throttling first clause
cosmo0920 May 11, 2021
98616e2
in_tail: Replace a magic number with a constant
ashie May 11, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
103 changes: 75 additions & 28 deletions lib/fluent/plugin/in_tail.rb
Expand Up @@ -81,6 +81,8 @@ def initialize
config_param :refresh_interval, :time, default: 60
desc 'The number of reading lines at each IO.'
config_param :read_lines_limit, :integer, default: 1000
desc 'The number of reading bytes per second'
config_param :read_bytes_limit_per_second, :size, default: -1
desc 'The interval of flushing the buffer for multiline format'
config_param :multiline_flush_interval, :time, default: nil
desc 'Enable the option to emit unmatched lines.'
Expand Down Expand Up @@ -178,6 +180,15 @@ def configure(conf)
# parser is already created by parser helper
@parser = parser_create(usage: parser_config['usage'] || @parser_configs.first.usage)
@capability = Fluent::Capability.new(:current_process)
if @read_bytes_limit_per_second > 0
if !@enable_watch_timer
raise Fluent::ConfigError, "Need to enable watch timer when using log throttling feature"
end
if @read_bytes_limit_per_second < 8192
log.warn "Should specify greater equal than 8192. Use 8192 for read_bytes_limit_per_second"
@read_bytes_limit_per_second = 8192
end
end
end

def configure_tag
Expand Down Expand Up @@ -392,36 +403,40 @@ def setup_watcher(target_info, pe)
raise e
end

def start_watchers(targets_info)
targets_info.each_value { |target_info|
pe = nil
if @pf
pe = @pf[target_info]
if @read_from_head && pe.read_inode.zero?
begin
pe.update(Fluent::FileWrapper.stat(target_info.path).ino, 0)
rescue Errno::ENOENT
$log.warn "#{target_info.path} not found. Continuing without tailing it."
end
def construct_watcher(target_info)
pe = nil
if @pf
pe = @pf[target_info]
if @read_from_head && pe.read_inode.zero?
begin
pe.update(Fluent::FileWrapper.stat(target_info.path).ino, 0)
rescue Errno::ENOENT
$log.warn "#{target_info.path} not found. Continuing without tailing it."
end
end
end

begin
tw = setup_watcher(target_info, pe)
rescue WatcherSetupError => e
log.warn "Skip #{target_info.path} because unexpected setup error happens: #{e}"
next
end
begin
tw = setup_watcher(target_info, pe)
rescue WatcherSetupError => e
log.warn "Skip #{target_info.path} because unexpected setup error happens: #{e}"
return
end

begin
target_info = TargetInfo.new(target_info.path, Fluent::FileWrapper.stat(target_info.path).ino)
@tails[target_info] = tw
rescue Errno::ENOENT
$log.warn "stat() for #{target_info.path} failed with ENOENT. Drop tail watcher for now."
# explicitly detach and unwatch watcher `tw`.
tw.unwatched = true
detach_watcher(tw, target_info.ino, false)
end
begin
target_info = TargetInfo.new(target_info.path, Fluent::FileWrapper.stat(target_info.path).ino)
@tails[target_info] = tw
rescue Errno::ENOENT
$log.warn "stat() for #{target_info.path} failed with ENOENT. Drop tail watcher for now."
# explicitly detach and unwatch watcher `tw`.
tw.unwatched = true
detach_watcher(tw, target_info.ino, false)
end
end

def start_watchers(targets_info)
targets_info.each_value {|target_info|
construct_watcher(target_info)
}
end

Expand Down Expand Up @@ -633,6 +648,7 @@ def io_handler(watcher, path)
path: path,
log: log,
read_lines_limit: @read_lines_limit,
read_bytes_limit_per_second: @read_bytes_limit_per_second,
open_on_every_update: @open_on_every_update,
from_encoding: @from_encoding,
encoding: @encoding,
Expand Down Expand Up @@ -876,10 +892,13 @@ def bytesize
end

class IOHandler
def initialize(watcher, path:, read_lines_limit:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, &receive_lines)
BYTES_TO_READ = 8192

def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, &receive_lines)
@watcher = watcher
@path = path
@read_lines_limit = read_lines_limit
@read_bytes_limit_per_second = read_bytes_limit_per_second
@receive_lines = receive_lines
@open_on_every_update = open_on_every_update
@fifo = FIFO.new(from_encoding || Encoding::ASCII_8BIT, encoding || Encoding::ASCII_8BIT)
Expand Down Expand Up @@ -909,21 +928,49 @@ def opened?

private

def limit_bytes_per_second_reached?(start_reading, number_bytes_read)
if number_bytes_read >= @read_bytes_limit_per_second && @read_bytes_limit_per_second > 0
cosmo0920 marked this conversation as resolved.
Show resolved Hide resolved
# sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion
time_spent_reading = Fluent::Clock.now - start_reading
@log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}")
if time_spent_reading < 1
needed_sleeping_time = 1 - time_spent_reading
@log.debug("log ingestion for `#{@path}' is suspended temporary. Read it again later.")
return true
end

false
else
false
end
end

def handle_notify
with_io do |io|
begin
number_bytes_read = 0
start_reading = Fluent::Clock.now
read_more = false

if !io.nil? && @lines.empty?
begin
while true
@fifo << io.readpartial(8192, @iobuf)
data = io.readpartial(BYTES_TO_READ, @iobuf)
number_bytes_read += data.bytesize
@fifo << data
@fifo.read_lines(@lines)

@log.debug("reading file: #{@path}")
if @lines.size >= @read_lines_limit
# not to use too much memory in case the file is very large
read_more = true
break
end
if limit_bytes_per_second_reached?(start_reading, number_bytes_read)
# Just get out from tailing loop.
read_more = false
break
end
end
rescue EOFError
end
Expand Down
8 changes: 4 additions & 4 deletions test/plugin/in_tail/test_io_handler.rb
Expand Up @@ -30,7 +30,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase
end

returned_lines = ''
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, log: $log, open_on_every_update: false) do |lines, _watcher|
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false) do |lines, _watcher|
returned_lines << lines.join
true
end
Expand Down Expand Up @@ -62,7 +62,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase
end

returned_lines = ''
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, log: $log, open_on_every_update: true) do |lines, _watcher|
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: true) do |lines, _watcher|
returned_lines << lines.join
true
end
Expand Down Expand Up @@ -93,7 +93,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase
end

returned_lines = []
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, log: $log, open_on_every_update: false) do |lines, _watcher|
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false) do |lines, _watcher|
returned_lines << lines.dup
true
end
Expand All @@ -119,7 +119,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase
end

returned_lines = []
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, log: $log, open_on_every_update: false) do |lines, _watcher|
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false) do |lines, _watcher|
returned_lines << lines.dup
true
end
Expand Down
72 changes: 72 additions & 0 deletions test/plugin/test_in_tail.rb
Expand Up @@ -156,6 +156,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true)
assert_equal 2, d.instance.rotate_wait
assert_equal "#{TMP_DIR}/tail.pos", d.instance.pos_file
assert_equal 1000, d.instance.read_lines_limit
assert_equal -1, d.instance.read_bytes_limit_per_second
assert_equal false, d.instance.ignore_repeated_permission_error
assert_nothing_raised do
d.instance.have_read_capability?
Expand Down Expand Up @@ -195,6 +196,22 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true)
end
end

sub_test_case "log throttling per file" do
test "w/o watcher timer is invalid" do
conf = CONFIG_ENABLE_WATCH_TIMER + config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"})
assert_raise(Fluent::ConfigError) do
create_driver(conf)
end
end

test "valid" do
conf = config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"})
assert_raise(Fluent::ConfigError) do
create_driver(conf)
end
end
end

test "both enable_watch_timer and enable_stat_watcher are false" do
assert_raise(Fluent::ConfigError) do
create_driver(CONFIG_ENABLE_WATCH_TIMER + CONFIG_DISABLE_STAT_WATCHER + PARSE_SINGLE_LINE_CONFIG)
Expand Down Expand Up @@ -323,6 +340,61 @@ def test_emit_with_read_lines_limit(data)
assert num_events <= d.emit_count
end

sub_test_case "log throttling per file" do
teardown do
cleanup_file("#{TMP_DIR}/tail.txt")
end

data("flat 8192 bytes, 2 events" => [:flat, 100, 8192, 2],
"flat 8192 bytes, 2 events w/o stat watcher" => [:flat_without_stat, 100, 8192, 2],
"flat #{8192*10} bytes, 20 events" => [:flat, 100, (8192 * 10), 20],
"flat #{8192*10} bytes, 20 events w/o stat watcher" => [:flat_without_stat, 100, (8192 * 10), 20],
"parse #{8192*4} bytes, 8 events" => [:parse, 100, (8192 * 4), 8],
"parse #{8192*4} bytes, 8 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 4), 8],
"parse #{8192*10} bytes, 20 events" => [:parse, 100, (8192 * 10), 20],
"parse #{8192*10} bytes, 20 events w/o stat watcher" => [:parse_without_stat, 100, (8192 * 10), 20],
"flat 8k bytes with unit, 2 events" => [:flat, 100, "8k", 2],
"flat 8k bytes with unit, 2 events w/o stat watcher" => [:flat_without_stat, 100, "8k", 2],
"flat #{8*10}k bytes with unit, 20 events" => [:flat, 100, "#{8*10}k", 20],
"flat #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:flat_without_stat, 100, "#{8*10}k", 20],
"parse #{8*4}k bytes with unit, 8 events" => [:parse, 100, "#{8*4}k", 8],
"parse #{8*4}k bytes with unit, 8 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*4}k", 8],
"parse #{8*10}k bytes with unit, 20 events" => [:parse, 100, "#{8*10}k", 20],
"parse #{8*10}k bytes with unit, 20 events w/o stat watcher" => [:parse_without_stat, 100, "#{8*10}k", 20])
def test_emit_with_read_bytes_limit_per_second(data)
config_style, limit, limit_bytes, num_events = data
case config_style
when :flat
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes })
when :parse
config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
when :flat_without_stat
config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes })
when :parse_without_stat
config = CONFIG_READ_FROM_HEAD + CONFIG_DISABLE_STAT_WATCHER + config_element("", "", { "read_lines_limit" => limit, "read_bytes_limit_per_second" => limit_bytes }) + PARSE_SINGLE_LINE_CONFIG
end
d = create_driver(config)
msg = 'test' * 2000 # in_tail reads 8192 bytes at once.

# We should not do shutdown here due to hard timeout.
d.run(expect_emits: 2, shutdown: false) do
File.open("#{TMP_DIR}/tail.txt", "ab") {|f|
for _x in 0..30
f.puts msg
end
}
end

events = d.events
assert_true(events.length <= num_events)
cosmo0920 marked this conversation as resolved.
Show resolved Hide resolved
assert_equal({"message" => msg}, events[0][2])
assert_equal({"message" => msg}, events[1][2])

# Teardown in_tail plugin instance here.
d.instance.shutdown
end
end

data(flat: CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG,
parse: CONFIG_READ_FROM_HEAD + PARSE_SINGLE_LINE_CONFIG)
def test_emit_with_read_from_head(data)
Expand Down