Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_tail: Always read from head for new/rotated files after startup #3542

Merged
merged 6 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 10 additions & 9 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ def start
end
end

refresh_watchers unless @skip_refresh_on_startup
refresh_watchers(true) unless @skip_refresh_on_startup
timer_execute(:in_tail_refresh_watchers, @refresh_interval, &method(:refresh_watchers))
end

Expand Down Expand Up @@ -366,7 +366,7 @@ def existence_path
# It will cause log duplication after updated watch files.
# In such case, you should separate log directory and specify two paths in path parameter.
# e.g. path /path/to/dir/*,/path/to/rotated_logs/target_file
def refresh_watchers
def refresh_watchers(startup = false)
target_paths_hash = expand_paths
existence_paths_hash = existence_path

Expand All @@ -376,12 +376,13 @@ def refresh_watchers
added_hash = target_paths_hash.reject {|key, value| existence_paths_hash.key?(key)}

stop_watchers(unwatched_hash, immediate: false, unwatched: true) unless unwatched_hash.empty?
start_watchers(added_hash) unless added_hash.empty?
start_watchers(added_hash, startup) unless added_hash.empty?
end

def setup_watcher(target_info, pe)
def setup_watcher(target_info, pe, startup = false)
line_buffer_timer_flusher = @multiline_mode ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(target_info, pe, log, @read_from_head, @follow_inodes, method(:update_watcher), line_buffer_timer_flusher, method(:io_handler), @metrics)
read_from_head = startup ? @read_from_head : true
tw = TailWatcher.new(target_info, pe, log, read_from_head, @follow_inodes, method(:update_watcher), line_buffer_timer_flusher, method(:io_handler), @metrics)

if @enable_watch_timer
tt = TimerTrigger.new(1, log) { tw.on_notify }
Expand Down Expand Up @@ -410,7 +411,7 @@ def setup_watcher(target_info, pe)
raise e
end

def construct_watcher(target_info)
def construct_watcher(target_info, startup)
pe = nil
if @pf
pe = @pf[target_info]
Expand All @@ -424,7 +425,7 @@ def construct_watcher(target_info)
end

begin
tw = setup_watcher(target_info, pe)
tw = setup_watcher(target_info, pe, startup)
rescue WatcherSetupError => e
log.warn "Skip #{target_info.path} because unexpected setup error happens: #{e}"
return
Expand All @@ -443,9 +444,9 @@ def construct_watcher(target_info)
end
end

def start_watchers(targets_info)
def start_watchers(targets_info, startup)
targets_info.each_value {|target_info|
construct_watcher(target_info)
construct_watcher(target_info, startup)
break if before_shutdown?
}
end
Expand Down
14 changes: 11 additions & 3 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -516,13 +516,21 @@ def test_shorter_than_rotate_wait
rotated = false
detached = false
d = create_driver(config)
mock.proxy(d.instance).setup_watcher(anything, anything, anything) do |tw|
mock.proxy(tw).detach(anything) do |v|
detached = true
v
end
tw
end.once

mock.proxy(d.instance).setup_watcher(anything, anything) do |tw|
mock.proxy(tw).detach(anything) do |v|
detached = true
v
end
tw
end.twice
end.once

d.run(timeout: 10) do
until detached do
Expand Down Expand Up @@ -2188,7 +2196,7 @@ def test_ENOENT_error_after_setup_watcher
'format' => 'none',
})
d = create_driver(config)
mock.proxy(d.instance).setup_watcher(anything, anything) do |tw|
mock.proxy(d.instance).setup_watcher(anything, anything, anything) do |tw|
cleanup_file(path)
tw
end
Expand All @@ -2213,7 +2221,7 @@ def test_EACCES_error_after_setup_watcher
'format' => 'none',
})
d = create_driver(config, false)
mock.proxy(d.instance).setup_watcher(anything, anything) do |tw|
mock.proxy(d.instance).setup_watcher(anything, anything, anything) do |tw|
FileUtils.chmod(0000, "#{TMP_DIR}/noaccess")
tw
end
Expand Down