Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_tail: Fix StatWatcher uses wrong path #3541

Merged
merged 7 commits into from
Nov 2, 2021
10 changes: 7 additions & 3 deletions lib/fluent/plugin/in_tail.rb
Expand Up @@ -369,8 +369,12 @@ def existence_path
def refresh_watchers
target_paths_hash = expand_paths
existence_paths_hash = existence_path

log.debug { "tailing paths: target = #{target_paths.join(",")} | existing = #{existence_paths.join(",")}" }

log.debug {
target_paths_str = target_paths_hash.collect { |key, target_info| target_info.path }.join(",")
existence_paths_str = existence_paths_hash.collect { |key, target_info| target_info.path }.join(",")
"tailing paths: target = #{target_paths_str} | existing = #{existence_paths_str}"
}

unwatched_hash = existence_paths_hash.reject {|key, value| target_paths_hash.key?(key)}
added_hash = target_paths_hash.reject {|key, value| existence_paths_hash.key?(key)}
Expand All @@ -389,7 +393,7 @@ def setup_watcher(target_info, pe)
end

if @enable_stat_watcher
tt = StatWatcher.new(path, log) { tw.on_notify }
tt = StatWatcher.new(target_info.path, log) { tw.on_notify }
ashie marked this conversation as resolved.
Show resolved Hide resolved
tw.register_watcher(tt)
end

Expand Down
44 changes: 38 additions & 6 deletions test/plugin/test_in_tail.rb
Expand Up @@ -99,7 +99,7 @@ def create_target_info(path)
})
COMMON_CONFIG = CONFIG + config_element("", "", { "pos_file" => "#{TMP_DIR}/tail.pos" })
CONFIG_READ_FROM_HEAD = config_element("", "", { "read_from_head" => true })
CONFIG_ENABLE_WATCH_TIMER = config_element("", "", { "enable_watch_timer" => false })
CONFIG_DISABLE_WATCH_TIMER = config_element("", "", { "enable_watch_timer" => false })
CONFIG_DISABLE_STAT_WATCHER = config_element("", "", { "enable_stat_watcher" => false })
CONFIG_OPEN_ON_EVERY_UPDATE = config_element("", "", { "open_on_every_update" => true })
COMMON_FOLLOW_INODE_CONFIG = config_element("ROOT", "", {
Expand Down Expand Up @@ -199,7 +199,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true)

sub_test_case "log throttling per file" do
test "w/o watcher timer is invalid" do
conf = CONFIG_ENABLE_WATCH_TIMER + config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"})
conf = CONFIG_DISABLE_WATCH_TIMER + config_element("ROOT", "", {"read_bytes_limit_per_second" => "8k"})
assert_raise(Fluent::ConfigError) do
create_driver(conf)
end
Expand All @@ -215,7 +215,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true)

test "both enable_watch_timer and enable_stat_watcher are false" do
assert_raise(Fluent::ConfigError) do
create_driver(CONFIG_ENABLE_WATCH_TIMER + CONFIG_DISABLE_STAT_WATCHER + PARSE_SINGLE_LINE_CONFIG)
create_driver(CONFIG_DISABLE_WATCH_TIMER + CONFIG_DISABLE_STAT_WATCHER + PARSE_SINGLE_LINE_CONFIG)
end
end

Expand Down Expand Up @@ -570,9 +570,9 @@ def test_emit_with_read_from_head(data)
assert_equal({"message" => "test4"}, events[3][2])
end

data(flat: CONFIG_ENABLE_WATCH_TIMER + SINGLE_LINE_CONFIG,
parse: CONFIG_ENABLE_WATCH_TIMER + PARSE_SINGLE_LINE_CONFIG)
def test_emit_with_enable_watch_timer(data)
data(flat: CONFIG_DISABLE_WATCH_TIMER + SINGLE_LINE_CONFIG,
parse: CONFIG_DISABLE_WATCH_TIMER + PARSE_SINGLE_LINE_CONFIG)
def test_emit_without_watch_timer(data)
config = data
File.open("#{TMP_DIR}/tail.txt", "wb") {|f|
f.puts "test1"
Expand All @@ -596,6 +596,38 @@ def test_emit_with_enable_watch_timer(data)
assert_equal({"message" => "test4"}, events[1][2])
end

# https://github.com/fluent/fluentd/pull/3541#discussion_r740197711
def test_watch_wildcard_path_without_watch_timer
omit "need inotify" unless Fluent.linux?

config = config_element("ROOT", "", {
"path" => "#{TMP_DIR}/tail*.txt",
"tag" => "t1",
})
config = config + CONFIG_DISABLE_WATCH_TIMER + SINGLE_LINE_CONFIG

File.open("#{TMP_DIR}/tail.txt", "wb") {|f|
f.puts "test1"
f.puts "test2"
}

d = create_driver(config, false)

d.run(expect_emits: 1, timeout: 1) do
File.open("#{TMP_DIR}/tail.txt", "ab") {|f|
f.puts "test3"
f.puts "test4"
}
end

assert_equal(
[
{"message" => "test3"},
{"message" => "test4"},
],
d.events.collect { |event| event[2] })
end

data(flat: CONFIG_DISABLE_STAT_WATCHER + SINGLE_LINE_CONFIG,
parse: CONFIG_DISABLE_STAT_WATCHER + PARSE_SINGLE_LINE_CONFIG)
def test_emit_with_disable_stat_watcher(data)
Expand Down