Skip to content

Commit

Permalink
in_tail: Implement file metrics
Browse files Browse the repository at this point in the history
* opened file count
* closed file count
* rotated file count

These metrics should be collected same as fluent-bit's in_tail.

Signed-off-by: Hiroshi Hatake <hatake@calyptia.com>
  • Loading branch information
cosmo0920 committed Sep 14, 2021
1 parent 56ed522 commit 46cf60c
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 10 deletions.
35 changes: 32 additions & 3 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ def initialize
@pf = nil
@ignore_list = []
@shutdown_start_time = nil
@total_opened_file_metrics = nil
@total_closed_file_metrics = nil
@total_rotated_file_metrics = nil
end

desc 'The paths to read. Multiple paths can be specified, separated by comma.'
Expand Down Expand Up @@ -191,6 +194,9 @@ def configure(conf)
@read_bytes_limit_per_second = min_bytes
end
end
@total_opened_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_opened_total", help_text: "Total number of opened files")
@total_closed_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_closed_total", help_text: "Total number of closed files")
@total_rotated_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_rotated_total", help_text: "Total number of rotated files")
end

def configure_tag
Expand Down Expand Up @@ -375,7 +381,7 @@ def refresh_watchers

def setup_watcher(target_info, pe)
line_buffer_timer_flusher = @multiline_mode ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(target_info, pe, log, @read_from_head, @follow_inodes, method(:update_watcher), line_buffer_timer_flusher, method(:io_handler))
tw = TailWatcher.new(target_info, pe, log, @read_from_head, @follow_inodes, method(:update_watcher), line_buffer_timer_flusher, method(:io_handler), @total_rotated_file_metrics)

if @enable_watch_timer
tt = TimerTrigger.new(1, log) { tw.on_notify }
Expand Down Expand Up @@ -670,6 +676,21 @@ def parse_multilines(lines, tail_watcher)
es
end

def statistics
stats = super
opened_file_count = @total_opened_file_metrics.get
closed_file_count = @total_closed_file_metrics.get
rotated_file_count = @total_rotated_file_metrics.get
stats = {
'input' => stats["input"].merge({
'opened_file_count' => opened_file_count,
'closed_file_count' => closed_file_count,
'rotated_file_count' => rotated_file_count,
},)
}
stats
end

private

def io_handler(watcher, path)
Expand All @@ -682,6 +703,8 @@ def io_handler(watcher, path)
open_on_every_update: @open_on_every_update,
from_encoding: @from_encoding,
encoding: @encoding,
total_opened_file_metrics: @total_opened_file_metrics,
total_closed_file_metrics: @total_closed_file_metrics,
&method(:receive_lines)
)
end
Expand Down Expand Up @@ -717,7 +740,7 @@ def on_timer
end

class TailWatcher
def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build)
def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, total_file_rotate_metrics)
@path = target_info.path
@ino = target_info.ino
@pe = pe || MemoryPositionEntry.new
Expand All @@ -729,6 +752,7 @@ def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watch
@line_buffer_timer_flusher = line_buffer_timer_flusher
@io_handler = nil
@io_handler_build = io_handler_build
@total_file_rotate_metrics = total_file_rotate_metrics
@watchers = []
end

Expand Down Expand Up @@ -855,6 +879,7 @@ def on_rotate(stat)
@log.info "detected rotation of #{@path}"
@io_handler = io_handler
end
@total_file_rotate_metrics.inc
end
end

Expand Down Expand Up @@ -934,7 +959,7 @@ class IOHandler

attr_accessor :shutdown_timeout

def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, &receive_lines)
def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, total_opened_file_metrics:, total_closed_file_metrics:, &receive_lines)
@watcher = watcher
@path = path
@read_lines_limit = read_lines_limit
Expand All @@ -953,6 +978,8 @@ def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:,
@shutdown_timeout = SHUTDOWN_TIMEOUT
@shutdown_mutex = Mutex.new
@eof = false
@total_opened_file_metrics = total_opened_file_metrics
@total_closed_file_metrics = total_closed_file_metrics

@log.info "following tail of #{@path}"
end
Expand All @@ -972,6 +999,7 @@ def close
if @io && !@io.closed?
@io.close
@io = nil
@total_closed_file_metrics.inc
end
end

Expand Down Expand Up @@ -1059,6 +1087,7 @@ def handle_notify
def open
io = Fluent::FileWrapper.open(@path)
io.seek(@watcher.pe.read_pos + @fifo.bytesize)
@total_opened_file_metrics.inc
io
rescue RangeError
io.close if io
Expand Down
13 changes: 9 additions & 4 deletions test/plugin/in_tail/test_io_handler.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
require_relative '../../helper'

require 'fluent/plugin/in_tail'
require 'fluent/plugin/metrics_local'
require 'tempfile'

class IntailIOHandlerTest < Test::Unit::TestCase
setup do
@file = Tempfile.new('intail_io_handler').binmode
@total_opened_file_metrics = Fluent::Plugin::LocalMetrics.new
@total_opened_file_metrics.configure(config_element('metrics', '', {}))
@total_closed_file_metrics = Fluent::Plugin::LocalMetrics.new
@total_closed_file_metrics.configure(config_element('metrics', '', {}))
end

teardown do
Expand All @@ -30,7 +35,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase
end

returned_lines = ''
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false) do |lines, _watcher|
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false, total_opened_file_metrics: @total_opened_file_metrics, total_closed_file_metrics: @total_closed_file_metrics) do |lines, _watcher|
returned_lines << lines.join
true
end
Expand Down Expand Up @@ -62,7 +67,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase
end

returned_lines = ''
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: true) do |lines, _watcher|
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: true, total_opened_file_metrics: @total_opened_file_metrics, total_closed_file_metrics: @total_closed_file_metrics) do |lines, _watcher|
returned_lines << lines.join
true
end
Expand Down Expand Up @@ -93,7 +98,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase
end

returned_lines = []
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false) do |lines, _watcher|
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false, total_opened_file_metrics: @total_opened_file_metrics, total_closed_file_metrics: @total_closed_file_metrics) do |lines, _watcher|
returned_lines << lines.dup
true
end
Expand All @@ -119,7 +124,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase
end

returned_lines = []
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false) do |lines, _watcher|
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false, total_opened_file_metrics: @total_opened_file_metrics, total_closed_file_metrics: @total_closed_file_metrics) do |lines, _watcher|
returned_lines << lines.dup
true
end
Expand Down
12 changes: 9 additions & 3 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1524,11 +1524,13 @@ def test_z_refresh_watchers
plugin.instance_eval do
@pf = Fluent::Plugin::TailInput::PositionFile.load(sio, EX_FOLLOW_INODES, {}, logger: $log)
@loop = Coolio::Loop.new
@total_rotated_file_metrics = Fluent::Plugin::LocalMetrics.new
@total_rotated_file_metrics.configure(config_element('metrics', '', {}))
end

Timecop.freeze(2010, 1, 2, 3, 4, 5) do
ex_paths.each do |target_info|
mock.proxy(Fluent::Plugin::TailInput::TailWatcher).new(target_info, anything, anything, true, false, anything, nil, anything).once
mock.proxy(Fluent::Plugin::TailInput::TailWatcher).new(target_info, anything, anything, true, false, anything, nil, anything, anything).once
end

plugin.refresh_watchers
Expand All @@ -1542,7 +1544,7 @@ def test_z_refresh_watchers
path = "test/plugin/data/2010/01/20100102-030406.log"
inode = Fluent::FileWrapper.stat(path).ino
target_info = Fluent::Plugin::TailInput::TargetInfo.new(path, inode)
mock.proxy(Fluent::Plugin::TailInput::TailWatcher).new(target_info, anything, anything, true, false, anything, nil, anything).once
mock.proxy(Fluent::Plugin::TailInput::TailWatcher).new(target_info, anything, anything, true, false, anything, nil, anything, anything).once
plugin.refresh_watchers

flexstub(Fluent::Plugin::TailInput::TailWatcher) do |watcherclass|
Expand Down Expand Up @@ -1885,13 +1887,17 @@ def test_should_close_watcher_after_rotate_wait
config = COMMON_FOLLOW_INODE_CONFIG + config_element('', '', {"rotate_wait" => "1s", "limit_recently_modified" => "1s"})

d = create_driver(config, false)
d.instance.instance_eval do
@total_rotated_file_metrics = Fluent::Plugin::LocalMetrics.new
@total_rotated_file_metrics.configure(config_element('metrics', '', {}))
end

File.open("#{TMP_DIR}/tail.txt", "wb") {|f|
f.puts "test1"
f.puts "test2"
}
target_info = create_target_info("#{TMP_DIR}/tail.txt")
mock.proxy(Fluent::Plugin::TailInput::TailWatcher).new(target_info, anything, anything, true, true, anything, nil, anything).once
mock.proxy(Fluent::Plugin::TailInput::TailWatcher).new(target_info, anything, anything, true, true, anything, nil, anything, anything).once
d.run(shutdown: false)
assert d.instance.instance_variable_get(:@tails)[target_info]

Expand Down

0 comments on commit 46cf60c

Please sign in to comment.