Skip to content

Commit

Permalink
Merge pull request #3380 from ashie/fix-in-tail-shutdown
Browse files Browse the repository at this point in the history
in_tail: Shutdown immediately & safely even if reading huge files
  • Loading branch information
ashie committed May 24, 2021
2 parents b6ba1c2 + 0f018a4 commit 7c0fce6
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 7 deletions.
43 changes: 36 additions & 7 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def initialize
@pf_file = nil
@pf = nil
@ignore_list = []
@shutdown_start_time = nil
end

desc 'The paths to read. Multiple paths can be specified, separated by comma.'
Expand Down Expand Up @@ -256,6 +257,7 @@ def stop
end

def shutdown
@shutdown_start_time = Fluent::Clock.now
# during shutdown phase, don't close io. It should be done in close after all threads are stopped. See close.
stop_watchers(existence_path, immediate: true, remove_watcher: false)
@pf_file.close if @pf_file
Expand Down Expand Up @@ -385,8 +387,6 @@ def setup_watcher(target_info, pe)
tw.register_watcher(tt)
end

tw.on_notify

tw.watchers.each do |watcher|
event_loop_attach(watcher)
end
Expand All @@ -398,7 +398,7 @@ def setup_watcher(target_info, pe)
event_loop_detach(watcher)
end

tw.detach
tw.detach(@shutdown_start_time)
tw.close
end
raise e
Expand Down Expand Up @@ -427,6 +427,7 @@ def construct_watcher(target_info)
begin
target_info = TargetInfo.new(target_info.path, Fluent::FileWrapper.stat(target_info.path).ino)
@tails[target_info] = tw
tw.on_notify
rescue Errno::ENOENT, Errno::EACCES => e
$log.warn "stat() for #{target_info.path} failed with #{e.class.name}. Drop tail watcher for now."
# explicitly detach and unwatch watcher `tw`.
Expand All @@ -438,6 +439,7 @@ def construct_watcher(target_info)
def start_watchers(targets_info)
targets_info.each_value {|target_info|
construct_watcher(target_info)
break if before_shutdown?
}
end

Expand Down Expand Up @@ -490,9 +492,11 @@ def update_watcher(target_info, pe)

if new_position_entry.read_inode == 0
@tails[new_target_info] = setup_watcher(new_target_info, new_position_entry)
@tails[new_target_info].on_notify
end
else
@tails[new_target_info] = setup_watcher(new_target_info, pe)
@tails[new_target_info].on_notify
end
detach_watcher_after_rotate_wait(rotated_tw, pe.read_inode) if rotated_tw
end
Expand All @@ -505,7 +509,7 @@ def detach_watcher(tw, ino, close_io = true)
tw.watchers.each do |watcher|
event_loop_detach(watcher)
end
tw.detach
tw.detach(@shutdown_start_time)

tw.close if close_io

Expand Down Expand Up @@ -717,8 +721,11 @@ def register_watcher(watcher)
@watchers << watcher
end

def detach
@io_handler.on_notify if @io_handler
def detach(shutdown_start_time = nil)
if @io_handler
@io_handler.ready_to_shutdown(shutdown_start_time)
@io_handler.on_notify
end
@line_buffer_timer_flusher&.close(self)
end

Expand Down Expand Up @@ -894,6 +901,9 @@ def bytesize

class IOHandler
BYTES_TO_READ = 8192
SHUTDOWN_TIMEOUT = 5

attr_accessor :shutdown_timeout

def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, log:, open_on_every_update:, from_encoding: nil, encoding: nil, &receive_lines)
@watcher = watcher
Expand All @@ -910,6 +920,9 @@ def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:,
@log = log
@start_reading_time = nil
@number_bytes_read = 0
@shutdown_start_time = nil
@shutdown_timeout = SHUTDOWN_TIMEOUT
@shutdown_mutex = Mutex.new

@log.info "following tail of #{@path}"
end
Expand All @@ -918,6 +931,13 @@ def on_notify
@notify_mutex.synchronize { handle_notify }
end

def ready_to_shutdown(shutdown_start_time = nil)
@shutdown_mutex.synchronize {
@shutdown_start_time =
shutdown_start_time || Fluent::Clock.now
}
end

def close
if @io && !@io.closed?
@io.close
Expand Down Expand Up @@ -948,6 +968,15 @@ def limit_bytes_per_second_reached?
end
end

def should_shutdown_now?
# Ensure to read all remaining lines, but abort immediately if it
# seems to take too long time.
@shutdown_mutex.synchronize {
return false if @shutdown_start_time.nil?
return Fluent::Clock.now - @shutdown_start_time > @shutdown_timeout
}
end

def handle_notify
return if limit_bytes_per_second_reached?

Expand All @@ -965,7 +994,7 @@ def handle_notify
@fifo.read_lines(@lines)

@log.debug("reading file: #{@path}")
if limit_bytes_per_second_reached?
if limit_bytes_per_second_reached? || should_shutdown_now?
# Just get out from tailing loop.
read_more = false
break
Expand Down
29 changes: 29 additions & 0 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2130,4 +2130,33 @@ def test_EACCES
d.instance_shutdown
assert($log.out.logs.any?{|log| log.include?("expand_paths: stat() for #{path} failed with Errno::EACCES. Skip file.\n") })
end

def test_shutdown_timeout
path = "#{TMP_DIR}/tail.txt"
File.open("#{TMP_DIR}/tail.txt", "wb") do |f|
(1024 * 1024 * 5).times do
f.puts "{\"test\":\"fizzbuzz\"}"
end
end

config =
CONFIG_READ_FROM_HEAD +
config_element('', '', {
'format' => 'json',
'skip_refresh_on_startup' => true,
})
d = create_driver(config)
mock.proxy(d.instance).io_handler(anything, anything) do |io_handler|
io_handler.shutdown_timeout = 0.5
io_handler
end

start_time = Fluent::Clock.now
assert_nothing_raised do
d.run(expect_emits: 1)
end

elapsed = Fluent::Clock.now - start_time
assert_true(elapsed > 0.5 && elapsed < 2.5)
end
end

0 comments on commit 7c0fce6

Please sign in to comment.