Skip to content

Commit

Permalink
Unify metrics instances with MetricsInfo struct
Browse files Browse the repository at this point in the history
Signed-off-by: Hiroshi Hatake <hatake@calyptia.com>
  • Loading branch information
cosmo0920 committed Sep 14, 2021
1 parent 0c0dd99 commit a0ed3a9
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 32 deletions.
38 changes: 18 additions & 20 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class TailInput < Fluent::Plugin::Input
helpers :timer, :event_loop, :parser, :compat_parameters

RESERVED_CHARS = ['/', '*', '%'].freeze
MetricsInfo = Struct.new(:opened, :closed, :rotated)

class WatcherSetupError < StandardError
def initialize(msg)
Expand All @@ -57,9 +58,7 @@ def initialize
@pf = nil
@ignore_list = []
@shutdown_start_time = nil
@total_opened_file_metrics = nil
@total_closed_file_metrics = nil
@total_rotated_file_metrics = nil
@metrics = nil
end

desc 'The paths to read. Multiple paths can be specified, separated by comma.'
Expand Down Expand Up @@ -194,9 +193,10 @@ def configure(conf)
@read_bytes_limit_per_second = min_bytes
end
end
@total_opened_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_opened_total", help_text: "Total number of opened files")
@total_closed_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_closed_total", help_text: "Total number of closed files")
@total_rotated_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_rotated_total", help_text: "Total number of rotated files")
opened_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_opened_total", help_text: "Total number of opened files")
closed_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_closed_total", help_text: "Total number of closed files")
rotated_file_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "files_rotated_total", help_text: "Total number of rotated files")
@metrics = MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics)
end

def configure_tag
Expand Down Expand Up @@ -381,7 +381,7 @@ def refresh_watchers

def setup_watcher(target_info, pe)
line_buffer_timer_flusher = @multiline_mode ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(target_info, pe, log, @read_from_head, @follow_inodes, method(:update_watcher), line_buffer_timer_flusher, method(:io_handler), @total_rotated_file_metrics)
tw = TailWatcher.new(target_info, pe, log, @read_from_head, @follow_inodes, method(:update_watcher), line_buffer_timer_flusher, method(:io_handler), @metrics)

if @enable_watch_timer
tt = TimerTrigger.new(1, log) { tw.on_notify }
Expand Down Expand Up @@ -678,9 +678,9 @@ def parse_multilines(lines, tail_watcher)

def statistics
stats = super
opened_file_count = @total_opened_file_metrics.get
closed_file_count = @total_closed_file_metrics.get
rotated_file_count = @total_rotated_file_metrics.get
opened_file_count = @metrics.opened.get
closed_file_count = @metrics.closed.get
rotated_file_count = @metrics.rotated.get
stats = {
'input' => stats["input"].merge({
'opened_file_count' => opened_file_count,
Expand All @@ -703,8 +703,7 @@ def io_handler(watcher, path)
open_on_every_update: @open_on_every_update,
from_encoding: @from_encoding,
encoding: @encoding,
total_opened_file_metrics: @total_opened_file_metrics,
total_closed_file_metrics: @total_closed_file_metrics,
metrics: @metrics,
&method(:receive_lines)
)
end
Expand Down Expand Up @@ -740,7 +739,7 @@ def on_timer
end

class TailWatcher
def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, total_file_rotate_metrics)
def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics)
@path = target_info.path
@ino = target_info.ino
@pe = pe || MemoryPositionEntry.new
Expand All @@ -752,7 +751,7 @@ def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watch
@line_buffer_timer_flusher = line_buffer_timer_flusher
@io_handler = nil
@io_handler_build = io_handler_build
@total_file_rotate_metrics = total_file_rotate_metrics
@metrics = metrics
@watchers = []
end

Expand Down Expand Up @@ -879,7 +878,7 @@ def on_rotate(stat)
@log.info "detected rotation of #{@path}"
@io_handler = io_handler
end
@total_file_rotate_metrics.inc
@metrics.rotated.inc
end
end

Expand Down Expand Up @@ -959,7 +958,7 @@ class IOHandler

attr_accessor :shutdown_timeout

def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, total_opened_file_metrics:, total_closed_file_metrics:, &receive_lines)
def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, metrics:, &receive_lines)
@watcher = watcher
@path = path
@read_lines_limit = read_lines_limit
Expand All @@ -978,8 +977,7 @@ def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:,
@shutdown_timeout = SHUTDOWN_TIMEOUT
@shutdown_mutex = Mutex.new
@eof = false
@total_opened_file_metrics = total_opened_file_metrics
@total_closed_file_metrics = total_closed_file_metrics
@metrics = metrics

@log.info "following tail of #{@path}"
end
Expand All @@ -999,7 +997,7 @@ def close
if @io && !@io.closed?
@io.close
@io = nil
@total_closed_file_metrics.inc
@metrics.closed.inc
end
end

Expand Down Expand Up @@ -1087,7 +1085,7 @@ def handle_notify
def open
io = Fluent::FileWrapper.open(@path)
io.seek(@watcher.pe.read_pos + @fifo.bytesize)
@total_opened_file_metrics.inc
@metrics.opened.inc
io
rescue RangeError
io.close if io
Expand Down
19 changes: 11 additions & 8 deletions test/plugin/in_tail/test_io_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
class IntailIOHandlerTest < Test::Unit::TestCase
setup do
@file = Tempfile.new('intail_io_handler').binmode
@total_opened_file_metrics = Fluent::Plugin::LocalMetrics.new
@total_opened_file_metrics.configure(config_element('metrics', '', {}))
@total_closed_file_metrics = Fluent::Plugin::LocalMetrics.new
@total_closed_file_metrics.configure(config_element('metrics', '', {}))
opened_file_metrics = Fluent::Plugin::LocalMetrics.new
opened_file_metrics.configure(config_element('metrics', '', {}))
closed_file_metrics = Fluent::Plugin::LocalMetrics.new
closed_file_metrics.configure(config_element('metrics', '', {}))
rotated_file_metrics = Fluent::Plugin::LocalMetrics.new
rotated_file_metrics.configure(config_element('metrics', '', {}))
@metrics = Fluent::Plugin::TailInput::MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics)
end

teardown do
Expand All @@ -35,7 +38,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase
end

returned_lines = ''
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false, total_opened_file_metrics: @total_opened_file_metrics, total_closed_file_metrics: @total_closed_file_metrics) do |lines, _watcher|
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false, metrics: @metrics) do |lines, _watcher|
returned_lines << lines.join
true
end
Expand Down Expand Up @@ -67,7 +70,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase
end

returned_lines = ''
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: true, total_opened_file_metrics: @total_opened_file_metrics, total_closed_file_metrics: @total_closed_file_metrics) do |lines, _watcher|
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: true, metrics: @metrics) do |lines, _watcher|
returned_lines << lines.join
true
end
Expand Down Expand Up @@ -98,7 +101,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase
end

returned_lines = []
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false, total_opened_file_metrics: @total_opened_file_metrics, total_closed_file_metrics: @total_closed_file_metrics) do |lines, _watcher|
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false, metrics: @metrics) do |lines, _watcher|
returned_lines << lines.dup
true
end
Expand All @@ -124,7 +127,7 @@ class IntailIOHandlerTest < Test::Unit::TestCase
end

returned_lines = []
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false, total_opened_file_metrics: @total_opened_file_metrics, total_closed_file_metrics: @total_closed_file_metrics) do |lines, _watcher|
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false, metrics: @metrics) do |lines, _watcher|
returned_lines << lines.dup
true
end
Expand Down
18 changes: 14 additions & 4 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1524,8 +1524,13 @@ def test_z_refresh_watchers
plugin.instance_eval do
@pf = Fluent::Plugin::TailInput::PositionFile.load(sio, EX_FOLLOW_INODES, {}, logger: $log)
@loop = Coolio::Loop.new
@total_rotated_file_metrics = Fluent::Plugin::LocalMetrics.new
@total_rotated_file_metrics.configure(config_element('metrics', '', {}))
opened_file_metrics = Fluent::Plugin::LocalMetrics.new
opened_file_metrics.configure(config_element('metrics', '', {}))
closed_file_metrics = Fluent::Plugin::LocalMetrics.new
closed_file_metrics.configure(config_element('metrics', '', {}))
rotated_file_metrics = Fluent::Plugin::LocalMetrics.new
rotated_file_metrics.configure(config_element('metrics', '', {}))
@metrics = Fluent::Plugin::TailInput::MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics)
end

Timecop.freeze(2010, 1, 2, 3, 4, 5) do
Expand Down Expand Up @@ -1888,8 +1893,13 @@ def test_should_close_watcher_after_rotate_wait

d = create_driver(config, false)
d.instance.instance_eval do
@total_rotated_file_metrics = Fluent::Plugin::LocalMetrics.new
@total_rotated_file_metrics.configure(config_element('metrics', '', {}))
opened_file_metrics = Fluent::Plugin::LocalMetrics.new
opened_file_metrics.configure(config_element('metrics', '', {}))
closed_file_metrics = Fluent::Plugin::LocalMetrics.new
closed_file_metrics.configure(config_element('metrics', '', {}))
rotated_file_metrics = Fluent::Plugin::LocalMetrics.new
rotated_file_metrics.configure(config_element('metrics', '', {}))
@metrics = Fluent::Plugin::TailInput::MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics)
end

File.open("#{TMP_DIR}/tail.txt", "wb") {|f|
Expand Down

0 comments on commit a0ed3a9

Please sign in to comment.