Skip to content

Commit

Permalink
Refactor startup to instance variable
Browse files Browse the repository at this point in the history
Signed-off-by: majimenez-stratio <majimenez@stratio.com>
  • Loading branch information
majimenez-stratio committed Nov 1, 2021
1 parent 2012df4 commit 98feddc
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 22 deletions.
24 changes: 13 additions & 11 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def initialize
@ignore_list = []
@shutdown_start_time = nil
@metrics = nil
@startup = true
end

desc 'The paths to read. Multiple paths can be specified, separated by comma.'
Expand Down Expand Up @@ -250,7 +251,7 @@ def start
end
end

refresh_watchers(true) unless @skip_refresh_on_startup
refresh_watchers unless @skip_refresh_on_startup
timer_execute(:in_tail_refresh_watchers, @refresh_interval, &method(:refresh_watchers))
end

Expand Down Expand Up @@ -366,22 +367,23 @@ def existence_path
# It will cause log duplication after updated watch files.
# In such case, you should separate log directory and specify two paths in path parameter.
# e.g. path /path/to/dir/*,/path/to/rotated_logs/target_file
def refresh_watchers(startup = false)
def refresh_watchers
target_paths_hash = expand_paths
existence_paths_hash = existence_path

log.debug { "tailing paths: target = #{target_paths.join(",")} | existing = #{existence_paths.join(",")}" }

unwatched_hash = existence_paths_hash.reject {|key, value| target_paths_hash.key?(key)}
added_hash = target_paths_hash.reject {|key, value| existence_paths_hash.key?(key)}

stop_watchers(unwatched_hash, immediate: false, unwatched: true) unless unwatched_hash.empty?
start_watchers(added_hash, startup) unless added_hash.empty?
start_watchers(added_hash) unless added_hash.empty?
@startup = false if @startup
end

def setup_watcher(target_info, pe, startup = false)
def setup_watcher(target_info, pe)
line_buffer_timer_flusher = @multiline_mode ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
read_from_head = startup ? @read_from_head : true
read_from_head = @startup ? @read_from_head : true
tw = TailWatcher.new(target_info, pe, log, read_from_head, @follow_inodes, method(:update_watcher), line_buffer_timer_flusher, method(:io_handler), @metrics)

if @enable_watch_timer
Expand Down Expand Up @@ -411,7 +413,7 @@ def setup_watcher(target_info, pe, startup = false)
raise e
end

def construct_watcher(target_info, startup)
def construct_watcher(target_info)
pe = nil
if @pf
pe = @pf[target_info]
Expand All @@ -425,7 +427,7 @@ def construct_watcher(target_info, startup)
end

begin
tw = setup_watcher(target_info, pe, startup)
tw = setup_watcher(target_info, pe)
rescue WatcherSetupError => e
log.warn "Skip #{target_info.path} because unexpected setup error happens: #{e}"
return
Expand All @@ -444,9 +446,9 @@ def construct_watcher(target_info, startup)
end
end

def start_watchers(targets_info, startup)
def start_watchers(targets_info)
targets_info.each_value {|target_info|
construct_watcher(target_info, startup)
construct_watcher(target_info)
break if before_shutdown?
}
end
Expand Down
14 changes: 3 additions & 11 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -516,21 +516,13 @@ def test_shorter_than_rotate_wait
rotated = false
detached = false
d = create_driver(config)
mock.proxy(d.instance).setup_watcher(anything, anything, anything) do |tw|
mock.proxy(tw).detach(anything) do |v|
detached = true
v
end
tw
end.once

mock.proxy(d.instance).setup_watcher(anything, anything) do |tw|
mock.proxy(tw).detach(anything) do |v|
detached = true
v
end
tw
end.once
end.twice

d.run(timeout: 10) do
until detached do
Expand Down Expand Up @@ -2196,7 +2188,7 @@ def test_ENOENT_error_after_setup_watcher
'format' => 'none',
})
d = create_driver(config)
mock.proxy(d.instance).setup_watcher(anything, anything, anything) do |tw|
mock.proxy(d.instance).setup_watcher(anything, anything) do |tw|
cleanup_file(path)
tw
end
Expand All @@ -2221,7 +2213,7 @@ def test_EACCES_error_after_setup_watcher
'format' => 'none',
})
d = create_driver(config, false)
mock.proxy(d.instance).setup_watcher(anything, anything, anything) do |tw|
mock.proxy(d.instance).setup_watcher(anything, anything) do |tw|
FileUtils.chmod(0000, "#{TMP_DIR}/noaccess")
tw
end
Expand Down

0 comments on commit 98feddc

Please sign in to comment.