Skip to content

Commit

Permalink
Always read from head for new/rotated files after startup
Browse files Browse the repository at this point in the history
Signed-off-by: majimenez-stratio <majimenez@stratio.com>
  • Loading branch information
majimenez-stratio committed Nov 1, 2021
1 parent daf2f16 commit 2012df4
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 12 deletions.
19 changes: 10 additions & 9 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ def start
end
end

refresh_watchers unless @skip_refresh_on_startup
refresh_watchers(true) unless @skip_refresh_on_startup
timer_execute(:in_tail_refresh_watchers, @refresh_interval, &method(:refresh_watchers))
end

Expand Down Expand Up @@ -366,7 +366,7 @@ def existence_path
# It will cause log duplication after updated watch files.
# In such case, you should separate log directory and specify two paths in path parameter.
# e.g. path /path/to/dir/*,/path/to/rotated_logs/target_file
def refresh_watchers
def refresh_watchers(startup = false)
target_paths_hash = expand_paths
existence_paths_hash = existence_path

Expand All @@ -376,12 +376,13 @@ def refresh_watchers
added_hash = target_paths_hash.reject {|key, value| existence_paths_hash.key?(key)}

stop_watchers(unwatched_hash, immediate: false, unwatched: true) unless unwatched_hash.empty?
start_watchers(added_hash) unless added_hash.empty?
start_watchers(added_hash, startup) unless added_hash.empty?
end

def setup_watcher(target_info, pe)
def setup_watcher(target_info, pe, startup = false)
line_buffer_timer_flusher = @multiline_mode ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(target_info, pe, log, @read_from_head, @follow_inodes, method(:update_watcher), line_buffer_timer_flusher, method(:io_handler), @metrics)
read_from_head = startup ? @read_from_head : true
tw = TailWatcher.new(target_info, pe, log, read_from_head, @follow_inodes, method(:update_watcher), line_buffer_timer_flusher, method(:io_handler), @metrics)

if @enable_watch_timer
tt = TimerTrigger.new(1, log) { tw.on_notify }
Expand Down Expand Up @@ -410,7 +411,7 @@ def setup_watcher(target_info, pe)
raise e
end

def construct_watcher(target_info)
def construct_watcher(target_info, startup)
pe = nil
if @pf
pe = @pf[target_info]
Expand All @@ -424,7 +425,7 @@ def construct_watcher(target_info)
end

begin
tw = setup_watcher(target_info, pe)
tw = setup_watcher(target_info, pe, startup)
rescue WatcherSetupError => e
log.warn "Skip #{target_info.path} because unexpected setup error happens: #{e}"
return
Expand All @@ -443,9 +444,9 @@ def construct_watcher(target_info)
end
end

def start_watchers(targets_info)
def start_watchers(targets_info, startup)
targets_info.each_value {|target_info|
construct_watcher(target_info)
construct_watcher(target_info, startup)
break if before_shutdown?
}
end
Expand Down
14 changes: 11 additions & 3 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -516,13 +516,21 @@ def test_shorter_than_rotate_wait
rotated = false
detached = false
d = create_driver(config)
mock.proxy(d.instance).setup_watcher(anything, anything, anything) do |tw|
mock.proxy(tw).detach(anything) do |v|
detached = true
v
end
tw
end.once

mock.proxy(d.instance).setup_watcher(anything, anything) do |tw|
mock.proxy(tw).detach(anything) do |v|
detached = true
v
end
tw
end.twice
end.once

d.run(timeout: 10) do
until detached do
Expand Down Expand Up @@ -2188,7 +2196,7 @@ def test_ENOENT_error_after_setup_watcher
'format' => 'none',
})
d = create_driver(config)
mock.proxy(d.instance).setup_watcher(anything, anything) do |tw|
mock.proxy(d.instance).setup_watcher(anything, anything, anything) do |tw|
cleanup_file(path)
tw
end
Expand All @@ -2213,7 +2221,7 @@ def test_EACCES_error_after_setup_watcher
'format' => 'none',
})
d = create_driver(config, false)
mock.proxy(d.instance).setup_watcher(anything, anything) do |tw|
mock.proxy(d.instance).setup_watcher(anything, anything, anything) do |tw|
FileUtils.chmod(0000, "#{TMP_DIR}/noaccess")
tw
end
Expand Down

0 comments on commit 2012df4

Please sign in to comment.