Skip to content

Commit

Permalink
Merge pull request #3541 from majimenez-stratio/in_tail_watchers
Browse files Browse the repository at this point in the history
in_tail: Fix StatWatcher uses wrong path
  • Loading branch information
ashie committed Nov 2, 2021
2 parents daf2f16 + ebfd278 commit a4195b2
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 9 deletions.
10 changes: 7 additions & 3 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,12 @@ def existence_path
def refresh_watchers
target_paths_hash = expand_paths
existence_paths_hash = existence_path

log.debug { "tailing paths: target = #{target_paths.join(",")} | existing = #{existence_paths.join(",")}" }

log.debug {
target_paths_str = target_paths_hash.collect { |key, target_info| target_info.path }.join(",")
existence_paths_str = existence_paths_hash.collect { |key, target_info| target_info.path }.join(",")
"tailing paths: target = #{target_paths_str} | existing = #{existence_paths_str}"
}

unwatched_hash = existence_paths_hash.reject {|key, value| target_paths_hash.key?(key)}
added_hash = target_paths_hash.reject {|key, value| existence_paths_hash.key?(key)}
Expand All @@ -389,7 +393,7 @@ def setup_watcher(target_info, pe)
end

if @enable_stat_watcher
tt = StatWatcher.new(path, log) { tw.on_notify }
tt = StatWatcher.new(target_info.path, log) { tw.on_notify }
tw.register_watcher(tt)
end

Expand Down
44 changes: 38 additions & 6 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def create_target_info(path)
})
COMMON_CONFIG = CONFIG + config_element("", "", { "pos_file" => "#{TMP_DIR}/tail.pos" })
CONFIG_READ_FROM_HEAD = config_element("", "", { "read_from_head" => true })
CONFIG_ENABLE_WATCH_TIMER = config_element("", "", { "enable_watch_timer" => false })
CONFIG_DISABLE_WATCH_TIMER = config_element("", "", { "enable_watch_timer" => false })
CONFIG_DISABLE_STAT_WATCHER = config_element("", "", { "enable_stat_watcher" => false })
CONFIG_OPEN_ON_EVERY_UPDATE = config_element("", "", { "open_on_every_update" => true })
COMMON_FOLLOW_INODE_CONFIG = config_element("ROOT", "", {
Expand Down Expand Up @@ -199,7 +199,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true)

sub_test_case "log throttling per file" do
test "w/o watcher timer is invalid" do
conf = CONFIG_ENABLE_WATCH_TIMER + config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"})
conf = CONFIG_DISABLE_WATCH_TIMER + config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"})
assert_raise(Fluent::ConfigError) do
create_driver(conf)
end
Expand All @@ -215,7 +215,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true)

test "both enable_watch_timer and enable_stat_watcher are false" do
assert_raise(Fluent::ConfigError) do
create_driver(CONFIG_ENABLE_WATCH_TIMER + CONFIG_DISABLE_STAT_WATCHER + PARSE_SINGLE_LINE_CONFIG)
create_driver(CONFIG_DISABLE_WATCH_TIMER + CONFIG_DISABLE_STAT_WATCHER + PARSE_SINGLE_LINE_CONFIG)
end
end

Expand Down Expand Up @@ -570,9 +570,9 @@ def test_emit_with_read_from_head(data)
assert_equal({"message" => "test4"}, events[3][2])
end

data(flat: CONFIG_ENABLE_WATCH_TIMER + SINGLE_LINE_CONFIG,
parse: CONFIG_ENABLE_WATCH_TIMER + PARSE_SINGLE_LINE_CONFIG)
def test_emit_with_enable_watch_timer(data)
data(flat: CONFIG_DISABLE_WATCH_TIMER + SINGLE_LINE_CONFIG,
parse: CONFIG_DISABLE_WATCH_TIMER + PARSE_SINGLE_LINE_CONFIG)
def test_emit_without_watch_timer(data)
config = data
File.open("#{TMP_DIR}/tail.txt", "wb") {|f|
f.puts "test1"
Expand All @@ -596,6 +596,38 @@ def test_emit_with_enable_watch_timer(data)
assert_equal({"message" => "test4"}, events[1][2])
end

# https://github.com/fluent/fluentd/pull/3541#discussion_r740197711
def test_watch_wildcard_path_without_watch_timer
omit "need inotify" unless Fluent.linux?

config = config_element("ROOT", "", {
"path" => "#{TMP_DIR}/tail*.txt",
"tag" => "t1",
})
config = config + CONFIG_DISABLE_WATCH_TIMER + SINGLE_LINE_CONFIG

File.open("#{TMP_DIR}/tail.txt", "wb") {|f|
f.puts "test1"
f.puts "test2"
}

d = create_driver(config, false)

d.run(expect_emits: 1, timeout: 1) do
File.open("#{TMP_DIR}/tail.txt", "ab") {|f|
f.puts "test3"
f.puts "test4"
}
end

assert_equal(
[
{"message" => "test3"},
{"message" => "test4"},
],
d.events.collect { |event| event[2] })
end

data(flat: CONFIG_DISABLE_STAT_WATCHER + SINGLE_LINE_CONFIG,
parse: CONFIG_DISABLE_STAT_WATCHER + PARSE_SINGLE_LINE_CONFIG)
def test_emit_with_disable_stat_watcher(data)
Expand Down

0 comments on commit a4195b2

Please sign in to comment.