Skip to content

Commit

Permalink
Merge pull request #2832 from ganmacs/reduce-Tailwatcher-argument
Browse files Browse the repository at this point in the history
Reduce tailwatcher argument
  • Loading branch information
ganmacs committed Feb 25, 2020
2 parents c152369 + 857d9cb commit 2acf3f7
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 114 deletions.
198 changes: 108 additions & 90 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -319,19 +319,33 @@ def refresh_watchers
end

def setup_watcher(path, pe)
line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @enable_stat_watcher, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, &method(:receive_lines))
tw.attach do |watcher|
event_loop_attach(watcher.timer_trigger) if watcher.timer_trigger
event_loop_attach(watcher.stat_trigger) if watcher.stat_trigger
line_buffer_timer_flusher = @multiline_mode ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(path, pe, log, @read_from_head, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, &method(:receive_lines))

if @enable_watch_timer
tt = TimerTrigger.new(1, log) { tw.on_notify }
tw.register_watcher(tt)
end

if @enable_stat_watcher
tt = StatWatcher.new(path, log) { tw.on_notify }
tw.register_watcher(tt)
end

tw.on_notify

tw.watchers.each do |watcher|
event_loop_attach(watcher)
end

tw
rescue => e
if tw
tw.detach { |watcher|
event_loop_detach(watcher.timer_trigger) if watcher.timer_trigger
event_loop_detach(watcher.stat_trigger) if watcher.stat_trigger
}
tw.watchers.each do |watcher|
event_loop_detach(watcher)
end

tw.detach
tw.close
end
raise e
Expand Down Expand Up @@ -386,6 +400,8 @@ def close_watcher_handles

# refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety.
def update_watcher(path, pe)
log.info("detected rotation of #{path}; waiting #{@rotate_wait} seconds")

if @pf
unless pe.read_inode == @pf[path].read_inode
log.debug "Skip update_watcher because watcher has been already updated by other inotify event"
Expand All @@ -402,10 +418,11 @@ def update_watcher(path, pe)
# so adding close_io argument to avoid this problem.
# At shutdown, IOHandler's io will be released automatically after detached the event loop
def detach_watcher(tw, close_io = true)
tw.detach { |watcher|
event_loop_detach(watcher.timer_trigger) if watcher.timer_trigger
event_loop_detach(watcher.stat_trigger) if watcher.stat_trigger
}
tw.watchers.each do |watcher|
event_loop_detach(watcher)
end
tw.detach

tw.close if close_io
flush_buffer(tw)
if tw.unwatched && @pf
Expand All @@ -422,7 +439,7 @@ def detach_watcher_after_rotate_wait(tw)
end

def flush_buffer(tw)
if lb = tw.line_buffer
if lb = tw.line_buffer_timer_flusher&.line_buffer
lb.chomp!
@parser.parse(lb) { |time, record|
if time && record
Expand Down Expand Up @@ -492,11 +509,12 @@ def parse_singleline(lines, tail_watcher)
es
end

# No need to check if line_buffer_timer_flusher is nil, since line_buffer_timer_flusher should exist
def parse_multilines(lines, tail_watcher)
lb = tail_watcher.line_buffer
lb = tail_watcher.line_buffer_timer_flusher.line_buffer
es = Fluent::MultiEventStream.new
if @parser.has_firstline?
tail_watcher.line_buffer_timer_flusher.reset_timer if tail_watcher.line_buffer_timer_flusher
tail_watcher.line_buffer_timer_flusher.reset_timer
lines.each { |line|
if @parser.firstline?(line)
if lb
Expand Down Expand Up @@ -526,59 +544,75 @@ def parse_multilines(lines, tail_watcher)
}
end
end
tail_watcher.line_buffer = lb
tail_watcher.line_buffer_timer_flusher.line_buffer = lb
es
end

class StatWatcher < Coolio::StatWatcher
def initialize(path, log, &callback)
@callback = callback
@log = log
super(path)
end

def on_change(prev, cur)
@callback.call
rescue
@log.error $!.to_s
@log.error_backtrace
end
end

class TimerTrigger < Coolio::TimerWatcher
def initialize(interval, log, &callback)
@log = log
@callback = callback
super(interval, true)
end

def on_timer
@callback.call
rescue => e
@log.error e.to_s
@log.error_backtrace
end
end

class TailWatcher
def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, enable_stat_watcher, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines)
def initialize(path, pe, log, read_from_head, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines)
@path = path
@rotate_wait = rotate_wait
@pe = pe || MemoryPositionEntry.new
@read_from_head = read_from_head
@enable_watch_timer = enable_watch_timer
@enable_stat_watcher = enable_stat_watcher
@read_lines_limit = read_lines_limit
@receive_lines = receive_lines
@update_watcher = update_watcher

@stat_trigger = @enable_stat_watcher ? StatWatcher.new(path, log, &method(:on_notify)) : nil
@timer_trigger = @enable_watch_timer ? TimerTrigger.new(1, log, &method(:on_notify)) : nil

@rotate_handler = RotateHandler.new(log, &method(:on_rotate))
@io_handler = nil
@log = log

@line_buffer = nil
@line_buffer_timer_flusher = line_buffer_timer_flusher
@from_encoding = from_encoding
@encoding = encoding
@open_on_every_update = open_on_every_update
@watchers = []
end

attr_reader :path
attr_reader :log, :pe, :read_lines_limit, :open_on_every_update
attr_reader :from_encoding, :encoding
attr_reader :stat_trigger, :enable_watch_timer, :enable_stat_watcher
attr_accessor :timer_trigger
attr_accessor :line_buffer, :line_buffer_timer_flusher
attr_reader :pe
attr_reader :line_buffer_timer_flusher
attr_accessor :unwatched # This is used for removing position entry from PositionFile
attr_reader :watchers

def tag
@parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '')
end

def wrap_receive_lines(lines)
@receive_lines.call(lines, self)
end

def attach
on_notify
yield self
def register_watcher(watcher)
@watchers << watcher
end

def detach
yield self
@io_handler.on_notify if @io_handler
end

Expand Down Expand Up @@ -632,7 +666,7 @@ def on_rotate(stat)
pos = @read_from_head ? 0 : fsize
@pe.update(inode, pos)
end
@io_handler = IOHandler.new(self, &method(:wrap_receive_lines))
@io_handler = io_handler
else
@io_handler = NullIOHandler.new
end
Expand All @@ -657,18 +691,21 @@ def on_rotate(stat)
watcher_needs_update = true
end

log_msg = "detected rotation of #{@path}"
log_msg << "; waiting #{@rotate_wait} seconds" if watcher_needs_update # wait rotate_time if previous file exists
@log.info log_msg

if watcher_needs_update
@update_watcher.call(@path, swap_state(@pe))
else
@io_handler = IOHandler.new(self, &method(:wrap_receive_lines))
@log.info "detected rotation of #{@path}"
@io_handler = io_handler
end
end
end

def io_handler
IOHandler.new(self, path: @path, log: @log, read_lines_limit: @read_lines_limit, open_on_every_update: @open_on_every_update, from_encoding: @from_encoding, encoding: @encoding) do |lines|
@receive_lines.call(lines, self)
end
end

def swap_state(pe)
# Use MemoryPositionEntry for rotated file temporary
mpe = MemoryPositionEntry.new
Expand All @@ -677,37 +714,6 @@ def swap_state(pe)
pe # This pe will be updated in on_rotate after TailWatcher is initialized
end

class TimerTrigger < Coolio::TimerWatcher
def initialize(interval, log, &callback)
@callback = callback
@log = log
super(interval, true)
end

def on_timer
@callback.call
rescue => e
@log.error e.to_s
@log.error_backtrace
end
end

class StatWatcher < Coolio::StatWatcher
def initialize(path, log, &callback)
@callback = callback
@log = log
super(path)
end

def on_change(prev, cur)
@callback.call
rescue
# TODO log?
@log.error $!.to_s
@log.error_backtrace
end
end

class FIFO
def initialize(from_encoding, encoding)
@from_encoding = from_encoding
Expand Down Expand Up @@ -767,15 +773,20 @@ def bytesize
end

class IOHandler
def initialize(watcher, &receive_lines)
def initialize(watcher, path:, read_lines_limit:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, &receive_lines)
@watcher = watcher
@path = path
@read_lines_limit = read_lines_limit
@receive_lines = receive_lines
@fifo = FIFO.new(@watcher.from_encoding || Encoding::ASCII_8BIT, @watcher.encoding || Encoding::ASCII_8BIT)
@open_on_every_update = open_on_every_update
@fifo = FIFO.new(from_encoding || Encoding::ASCII_8BIT, encoding || Encoding::ASCII_8BIT)
@iobuf = ''.force_encoding('ASCII-8BIT')
@lines = []
@io = nil
@notify_mutex = Mutex.new
@watcher.log.info "following tail of #{@watcher.path}"
@log = log

@log.info "following tail of #{@path}"
end

def on_notify
Expand All @@ -792,7 +803,7 @@ def handle_notify
while true
@fifo << io.readpartial(8192, @iobuf)
@fifo.read_lines(@lines)
if @lines.size >= @watcher.read_lines_limit
if @lines.size >= @read_lines_limit
# not to use too much memory in case the file is very large
read_more = true
break
Expand Down Expand Up @@ -826,18 +837,18 @@ def opened?
end

def open
io = Fluent::FileWrapper.open(@watcher.path)
io = Fluent::FileWrapper.open(@path)
io.seek(@watcher.pe.read_pos + @fifo.bytesize)
io
rescue RangeError
io.close if io
raise WatcherSetupError, "seek error with #{@watcher.path}: file position = #{@watcher.pe.read_pos.to_s(16)}, reading bytesize = #{@fifo.bytesize.to_s(16)}"
raise WatcherSetupError, "seek error with #{@path}: file position = #{@watcher.pe.read_pos.to_s(16)}, reading bytesize = #{@fifo.bytesize.to_s(16)}"
rescue Errno::ENOENT
nil
end

def with_io
if @watcher.open_on_every_update
if @open_on_every_update
io = open
begin
yield io
Expand All @@ -852,8 +863,8 @@ def with_io
close
raise e
rescue
@watcher.log.error $!.to_s
@watcher.log.error_backtrace
@log.error $!.to_s
@log.error_backtrace
close
end
end
Expand Down Expand Up @@ -905,24 +916,31 @@ def on_notify(stat)
end

class LineBufferTimerFlusher
attr_accessor :line_buffer

def initialize(log, flush_interval, &flush_method)
@log = log
@flush_interval = flush_interval
@flush_method = flush_method
@start = nil
@line_buffer = nil
end

def on_notify(tw)
if @start && @flush_interval
if Time.now - @start >= @flush_interval
@flush_method.call(tw)
tw.line_buffer = nil
@start = nil
end
unless @start && @flush_method
return
end

if Time.now - @start >= @flush_interval
@flush_method.call(tw)
@line_buffer = nil
@start = nil
end
end

def reset_timer
return unless @flush_interval

@start = Time.now
end
end
Expand Down

0 comments on commit 2acf3f7

Please sign in to comment.