Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_tail: Implement file metrics #3504

Merged
merged 3 commits into from
Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 28 additions & 3 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class TailInput < Fluent::Plugin::Input
helpers :timer, :event_loop, :parser, :compat_parameters

RESERVED_CHARS = ['/', '*', '%'].freeze
MetricsInfo = Struct.new(:opened, :closed, :rotated)

class WatcherSetupError < StandardError
def initialize(msg)
Expand All @@ -57,6 +58,7 @@ def initialize
@pf = nil
@ignore_list = []
@shutdown_start_time = nil
@metrics = nil
end

desc 'The paths to read. Multiple paths can be specified, separated by comma.'
Expand Down Expand Up @@ -191,6 +193,10 @@ def configure(conf)
@read_bytes_limit_per_second = min_bytes
end
end
opened_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_opened_total", help_text: "Total number of opened files")
closed_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_closed_total", help_text: "Total number of closed files")
rotated_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_rotated_total", help_text: "Total number of rotated files")
@metrics = MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics)
end

def configure_tag
Expand Down Expand Up @@ -375,7 +381,7 @@ def refresh_watchers

def setup_watcher(target_info, pe)
line_buffer_timer_flusher = @multiline_mode ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(target_info, pe, log, @read_from_head, @follow_inodes, method(:update_watcher), line_buffer_timer_flusher, method(:io_handler))
tw = TailWatcher.new(target_info, pe, log, @read_from_head, @follow_inodes, method(:update_watcher), line_buffer_timer_flusher, method(:io_handler), @metrics)

if @enable_watch_timer
tt = TimerTrigger.new(1, log) { tw.on_notify }
Expand Down Expand Up @@ -670,6 +676,19 @@ def parse_multilines(lines, tail_watcher)
es
end

def statistics
stats = super

stats = {
'input' => stats["input"].merge({
'opened_file_count' => @metrics.opened.get,
'closed_file_count' => @metrics.closed.get,
'rotated_file_count' => @metrics.rotated.get,
})
}
stats
end

private

def io_handler(watcher, path)
Expand All @@ -682,6 +701,7 @@ def io_handler(watcher, path)
open_on_every_update: @open_on_every_update,
from_encoding: @from_encoding,
encoding: @encoding,
metrics: @metrics,
&method(:receive_lines)
)
end
Expand Down Expand Up @@ -717,7 +737,7 @@ def on_timer
end

class TailWatcher
def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build)
def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics)
@path = target_info.path
@ino = target_info.ino
@pe = pe || MemoryPositionEntry.new
Expand All @@ -729,6 +749,7 @@ def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watch
@line_buffer_timer_flusher = line_buffer_timer_flusher
@io_handler = nil
@io_handler_build = io_handler_build
@metrics = metrics
@watchers = []
end

Expand Down Expand Up @@ -855,6 +876,7 @@ def on_rotate(stat)
@log.info "detected rotation of #{@path}"
@io_handler = io_handler
end
@metrics.rotated.inc
end
end

Expand Down Expand Up @@ -934,7 +956,7 @@ class IOHandler

attr_accessor :shutdown_timeout

def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, &receive_lines)
def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, metrics:, &receive_lines)
@watcher = watcher
@path = path
@read_lines_limit = read_lines_limit
Expand All @@ -953,6 +975,7 @@ def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:,
@shutdown_timeout = SHUTDOWN_TIMEOUT
@shutdown_mutex = Mutex.new
@eof = false
@metrics = metrics

@log.info "following tail of #{@path}"
end
Expand All @@ -972,6 +995,7 @@ def close
if @io && !@io.closed?
@io.close
@io = nil
@metrics.closed.inc
end
end

Expand Down Expand Up @@ -1059,6 +1083,7 @@ def handle_notify
def open
io = Fluent::FileWrapper.open(@path)
io.seek(@watcher.pe.read_pos + @fifo.bytesize)
@metrics.opened.inc
io
rescue RangeError
io.close if io
Expand Down
16 changes: 12 additions & 4 deletions test/plugin/in_tail/test_io_handler.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
require_relative '../../helper'

require 'fluent/plugin/in_tail'
require 'fluent/plugin/metrics_local'
require 'tempfile'

class IntailIOHandlerTest < Test::Unit::TestCase
setup do
@file = Tempfile.new('intail_io_handler').binmode
opened_file_metrics = Fluent::Plugin::LocalMetrics.new
opened_file_metrics.configure(config_element('metrics', '', {}))
closed_file_metrics = Fluent::Plugin::LocalMetrics.new
closed_file_metrics.configure(config_element('metrics', '', {}))
rotated_file_metrics = Fluent::Plugin::LocalMetrics.new
rotated_file_metrics.configure(config_element('metrics', '', {}))
@metrics = Fluent::Plugin::TailInput::MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics)
end

teardown do
Expand All @@ -30,7 +38,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase
end

returned_lines = ''
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false) do |lines, _watcher|
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false, metrics: @metrics) do |lines, _watcher|
returned_lines << lines.join
true
end
Expand Down Expand Up @@ -62,7 +70,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase
end

returned_lines = ''
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: true) do |lines, _watcher|
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: true, metrics: @metrics) do |lines, _watcher|
returned_lines << lines.join
true
end
Expand Down Expand Up @@ -93,7 +101,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase
end

returned_lines = []
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false) do |lines, _watcher|
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false, metrics: @metrics) do |lines, _watcher|
returned_lines << lines.dup
true
end
Expand All @@ -119,7 +127,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase
end

returned_lines = []
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false) do |lines, _watcher|
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false, metrics: @metrics) do |lines, _watcher|
returned_lines << lines.dup
true
end
Expand Down
22 changes: 19 additions & 3 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1524,11 +1524,18 @@ def test_z_refresh_watchers
plugin.instance_eval do
@pf = Fluent::Plugin::TailInput::PositionFile.load(sio, EX_FOLLOW_INODES, {}, logger: $log)
@loop = Coolio::Loop.new
opened_file_metrics = Fluent::Plugin::LocalMetrics.new
opened_file_metrics.configure(config_element('metrics', '', {}))
closed_file_metrics = Fluent::Plugin::LocalMetrics.new
closed_file_metrics.configure(config_element('metrics', '', {}))
rotated_file_metrics = Fluent::Plugin::LocalMetrics.new
rotated_file_metrics.configure(config_element('metrics', '', {}))
@metrics = Fluent::Plugin::TailInput::MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics)
end

Timecop.freeze(2010, 1, 2, 3, 4, 5) do
ex_paths.each do |target_info|
mock.proxy(Fluent::Plugin::TailInput::TailWatcher).new(target_info, anything, anything, true, false, anything, nil, anything).once
mock.proxy(Fluent::Plugin::TailInput::TailWatcher).new(target_info, anything, anything, true, false, anything, nil, anything, anything).once
end

plugin.refresh_watchers
Expand All @@ -1542,7 +1549,7 @@ def test_z_refresh_watchers
path = "test/plugin/data/2010/01/20100102-030406.log"
inode = Fluent::FileWrapper.stat(path).ino
target_info = Fluent::Plugin::TailInput::TargetInfo.new(path, inode)
mock.proxy(Fluent::Plugin::TailInput::TailWatcher).new(target_info, anything, anything, true, false, anything, nil, anything).once
mock.proxy(Fluent::Plugin::TailInput::TailWatcher).new(target_info, anything, anything, true, false, anything, nil, anything, anything).once
plugin.refresh_watchers

flexstub(Fluent::Plugin::TailInput::TailWatcher) do |watcherclass|
Expand Down Expand Up @@ -1885,13 +1892,22 @@ def test_should_close_watcher_after_rotate_wait
config = COMMON_FOLLOW_INODE_CONFIG + config_element('', '', {"rotate_wait" => "1s", "limit_recently_modified" => "1s"})

d = create_driver(config, false)
d.instance.instance_eval do
opened_file_metrics = Fluent::Plugin::LocalMetrics.new
opened_file_metrics.configure(config_element('metrics', '', {}))
closed_file_metrics = Fluent::Plugin::LocalMetrics.new
closed_file_metrics.configure(config_element('metrics', '', {}))
rotated_file_metrics = Fluent::Plugin::LocalMetrics.new
rotated_file_metrics.configure(config_element('metrics', '', {}))
@metrics = Fluent::Plugin::TailInput::MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics)
end

File.open("#{TMP_DIR}/tail.txt", "wb") {|f|
f.puts "test1"
f.puts "test2"
}
target_info = create_target_info("#{TMP_DIR}/tail.txt")
mock.proxy(Fluent::Plugin::TailInput::TailWatcher).new(target_info, anything, anything, true, true, anything, nil, anything).once
mock.proxy(Fluent::Plugin::TailInput::TailWatcher).new(target_info, anything, anything, true, true, anything, nil, anything, anything).once
d.run(shutdown: false)
assert d.instance.instance_variable_get(:@tails)[target_info]

Expand Down