Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce tailwatcher argument #2832

Merged
merged 12 commits into from
Feb 25, 2020
198 changes: 108 additions & 90 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -317,19 +317,33 @@ def refresh_watchers
end

def setup_watcher(path, pe)
line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @enable_stat_watcher, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, &method(:receive_lines))
tw.attach do |watcher|
event_loop_attach(watcher.timer_trigger) if watcher.timer_trigger
event_loop_attach(watcher.stat_trigger) if watcher.stat_trigger
line_buffer_timer_flusher = @multiline_mode ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(path, pe, log, @read_from_head, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, &method(:receive_lines))

if @enable_watch_timer
tt = TimerTrigger.new(1, log) { tw.on_notify }
tw.register_watcher(tt)
end

if @enable_stat_watcher
tt = StatWatcher.new(path, log) { tw.on_notify }
tw.register_watcher(tt)
end

tw.on_notify

tw.watchers.each do |watcher|
event_loop_attach(watcher)
end

tw
rescue => e
if tw
tw.detach { |watcher|
event_loop_detach(watcher.timer_trigger) if watcher.timer_trigger
event_loop_detach(watcher.stat_trigger) if watcher.stat_trigger
}
tw.watchers.each do |watcher|
event_loop_detach(watcher)
end

tw.detach
tw.close
end
raise e
Expand Down Expand Up @@ -384,6 +398,8 @@ def close_watcher_handles

# refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety.
def update_watcher(path, pe)
log.info("detected rotation of #{path}; waiting #{@rotate_wait} seconds")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing log.info here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch…thank you 🙏 9940869

if @pf
unless pe.read_inode == @pf[path].read_inode
log.debug "Skip update_watcher because watcher has been already updated by other inotify event"
Expand All @@ -400,10 +416,11 @@ def update_watcher(path, pe)
# so adding close_io argument to avoid this problem.
# At shutdown, IOHandler's io will be released automatically after detached the event loop
def detach_watcher(tw, close_io = true)
tw.detach { |watcher|
event_loop_detach(watcher.timer_trigger) if watcher.timer_trigger
event_loop_detach(watcher.stat_trigger) if watcher.stat_trigger
}
tw.watchers.each do |watcher|
event_loop_detach(watcher)
end
tw.detach

tw.close if close_io
flush_buffer(tw)
if tw.unwatched && @pf
Expand All @@ -420,7 +437,7 @@ def detach_watcher_after_rotate_wait(tw)
end

def flush_buffer(tw)
if lb = tw.line_buffer
if lb = tw.line_buffer_timer_flusher&.line_buffer
lb.chomp!
@parser.parse(lb) { |time, record|
if time && record
Expand Down Expand Up @@ -490,11 +507,12 @@ def parse_singleline(lines, tail_watcher)
es
end

# No need to check if line_buffer_timer_flusher is nil, since line_buffer_timer_flusher should exist
def parse_multilines(lines, tail_watcher)
lb = tail_watcher.line_buffer
lb = tail_watcher.line_buffer_timer_flusher.line_buffer
es = Fluent::MultiEventStream.new
if @parser.has_firstline?
tail_watcher.line_buffer_timer_flusher.reset_timer if tail_watcher.line_buffer_timer_flusher
tail_watcher.line_buffer_timer_flusher.reset_timer
lines.each { |line|
if @parser.firstline?(line)
if lb
Expand Down Expand Up @@ -524,59 +542,75 @@ def parse_multilines(lines, tail_watcher)
}
end
end
tail_watcher.line_buffer = lb
tail_watcher.line_buffer_timer_flusher.line_buffer = lb
es
end

class StatWatcher < Coolio::StatWatcher
def initialize(path, log, &callback)
@callback = callback
@log = log
super(path)
end

def on_change(prev, cur)
@callback.call
rescue
@log.error $!.to_s
@log.error_backtrace
end
end

class TimerTrigger < Coolio::TimerWatcher
def initialize(interval, log, &callback)
@log = log
@callback = callback
super(interval, true)
end

def on_timer
@callback.call
rescue => e
@log.error e.to_s
@log.error_backtrace
end
end

class TailWatcher
def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, enable_stat_watcher, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines)
def initialize(path, pe, log, read_from_head, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines)
@path = path
@rotate_wait = rotate_wait
@pe = pe || MemoryPositionEntry.new
@read_from_head = read_from_head
@enable_watch_timer = enable_watch_timer
@enable_stat_watcher = enable_stat_watcher
@read_lines_limit = read_lines_limit
@receive_lines = receive_lines
@update_watcher = update_watcher

@stat_trigger = @enable_stat_watcher ? StatWatcher.new(path, log, &method(:on_notify)) : nil
@timer_trigger = @enable_watch_timer ? TimerTrigger.new(1, log, &method(:on_notify)) : nil

@rotate_handler = RotateHandler.new(log, &method(:on_rotate))
@io_handler = nil
@log = log

@line_buffer = nil
@line_buffer_timer_flusher = line_buffer_timer_flusher
@from_encoding = from_encoding
@encoding = encoding
@open_on_every_update = open_on_every_update
@watchers = []
end

attr_reader :path
attr_reader :log, :pe, :read_lines_limit, :open_on_every_update
attr_reader :from_encoding, :encoding
attr_reader :stat_trigger, :enable_watch_timer, :enable_stat_watcher
attr_accessor :timer_trigger
attr_accessor :line_buffer, :line_buffer_timer_flusher
attr_reader :pe
attr_reader :line_buffer_timer_flusher
attr_accessor :unwatched # This is used for removing position entry from PositionFile
attr_reader :watchers

def tag
@parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '')
end

def wrap_receive_lines(lines)
@receive_lines.call(lines, self)
end

def attach
on_notify
yield self
def register_watcher(watcher)
@watchers << watcher
end

def detach
yield self
@io_handler.on_notify if @io_handler
end

Expand Down Expand Up @@ -630,7 +664,7 @@ def on_rotate(stat)
pos = @read_from_head ? 0 : fsize
@pe.update(inode, pos)
end
@io_handler = IOHandler.new(self, &method(:wrap_receive_lines))
@io_handler = io_handler
else
@io_handler = NullIOHandler.new
end
Expand All @@ -655,18 +689,21 @@ def on_rotate(stat)
watcher_needs_update = true
end

log_msg = "detected rotation of #{@path}"
log_msg << "; waiting #{@rotate_wait} seconds" if watcher_needs_update # wait rotate_time if previous file exists
@log.info log_msg

if watcher_needs_update
@update_watcher.call(@path, swap_state(@pe))
else
@io_handler = IOHandler.new(self, &method(:wrap_receive_lines))
@log.info "detected rotation of #{@path}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this line needed?

Copy link
Member Author

@ganmacs ganmacs Feb 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L658 and L660 is invoked even if watcher_needs_update is false, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see.

@io_handler = io_handler
end
end
end

def io_handler
IOHandler.new(self, path: @path, log: @log, read_lines_limit: @read_lines_limit, open_on_every_update: @open_on_every_update, from_encoding: @from_encoding, encoding: @encoding) do |lines|
@receive_lines.call(lines, self)
end
end

def swap_state(pe)
# Use MemoryPositionEntry for rotated file temporary
mpe = MemoryPositionEntry.new
Expand All @@ -675,37 +712,6 @@ def swap_state(pe)
pe # This pe will be updated in on_rotate after TailWatcher is initialized
end

class TimerTrigger < Coolio::TimerWatcher
def initialize(interval, log, &callback)
@callback = callback
@log = log
super(interval, true)
end

def on_timer
@callback.call
rescue => e
@log.error e.to_s
@log.error_backtrace
end
end

class StatWatcher < Coolio::StatWatcher
def initialize(path, log, &callback)
@callback = callback
@log = log
super(path)
end

def on_change(prev, cur)
@callback.call
rescue
# TODO log?
@log.error $!.to_s
@log.error_backtrace
end
end

class FIFO
def initialize(from_encoding, encoding)
@from_encoding = from_encoding
Expand Down Expand Up @@ -765,15 +771,20 @@ def bytesize
end

class IOHandler
def initialize(watcher, &receive_lines)
def initialize(watcher, path:, read_lines_limit:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, &receive_lines)
@watcher = watcher
@path = path
@read_lines_limit = read_lines_limit
@receive_lines = receive_lines
@fifo = FIFO.new(@watcher.from_encoding || Encoding::ASCII_8BIT, @watcher.encoding || Encoding::ASCII_8BIT)
@open_on_every_update = open_on_every_update
@fifo = FIFO.new(from_encoding || Encoding::ASCII_8BIT, encoding || Encoding::ASCII_8BIT)
@iobuf = ''.force_encoding('ASCII-8BIT')
@lines = []
@io = nil
@notify_mutex = Mutex.new
@watcher.log.info "following tail of #{@watcher.path}"
@log = log

@log.info "following tail of #{@path}"
end

def on_notify
Expand All @@ -790,7 +801,7 @@ def handle_notify
while true
@fifo << io.readpartial(8192, @iobuf)
@fifo.read_lines(@lines)
if @lines.size >= @watcher.read_lines_limit
if @lines.size >= @read_lines_limit
# not to use too much memory in case the file is very large
read_more = true
break
Expand Down Expand Up @@ -824,18 +835,18 @@ def opened?
end

def open
io = Fluent::FileWrapper.open(@watcher.path)
io = Fluent::FileWrapper.open(@path)
io.seek(@watcher.pe.read_pos + @fifo.bytesize)
io
rescue RangeError
io.close if io
raise WatcherSetupError, "seek error with #{@watcher.path}: file position = #{@watcher.pe.read_pos.to_s(16)}, reading bytesize = #{@fifo.bytesize.to_s(16)}"
raise WatcherSetupError, "seek error with #{@path}: file position = #{@watcher.pe.read_pos.to_s(16)}, reading bytesize = #{@fifo.bytesize.to_s(16)}"
rescue Errno::ENOENT
nil
end

def with_io
if @watcher.open_on_every_update
if @open_on_every_update
io = open
begin
yield io
Expand All @@ -850,8 +861,8 @@ def with_io
close
raise e
rescue
@watcher.log.error $!.to_s
@watcher.log.error_backtrace
@log.error $!.to_s
@log.error_backtrace
close
end
end
Expand Down Expand Up @@ -903,24 +914,31 @@ def on_notify(stat)
end

class LineBufferTimerFlusher
attr_accessor :line_buffer

def initialize(log, flush_interval, &flush_method)
@log = log
@flush_interval = flush_interval
@flush_method = flush_method
@start = nil
@line_buffer = nil
end

def on_notify(tw)
if @start && @flush_interval
if Time.now - @start >= @flush_interval
@flush_method.call(tw)
tw.line_buffer = nil
@start = nil
end
unless @start && @flush_method
return
end

if Time.now - @start >= @flush_interval
@flush_method.call(tw)
@line_buffer = nil
@start = nil
end
end

def reset_timer
return unless @flush_interval

@start = Time.now
end
end
Expand Down