Skip to content

Commit

Permalink
in_tail: Remove deleted paths in pos_file
Browse files Browse the repository at this point in the history
Fix #3433

Signed-off-by: Takuro Ashie <ashie@clear-code.com>
  • Loading branch information
ashie committed Jul 26, 2021
1 parent d2867c0 commit 4a4d6ae
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 28 deletions.
2 changes: 1 addition & 1 deletion lib/fluent/plugin/in_tail/position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def fetch_compacted_entries(existing_targets = nil)
end
end

entries = remove_deleted_files_entries(entries, existing_targets) if @follow_inodes
entries = remove_deleted_files_entries(entries, existing_targets)
entries
end

Expand Down
30 changes: 26 additions & 4 deletions test/plugin/in_tail/test_position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ class IntailPositionFileTest < Test::Unit::TestCase
invalidpath100000000000000000000000000000000
unwatched\t#{UNWATCHED_STR}\t0000000000000000
EOF
TEST_CONTENT_PATHS = {
"valid_path" => Fluent::Plugin::TailInput::TargetInfo.new("valid_path", 1),
"inode23bit" => Fluent::Plugin::TailInput::TargetInfo.new("inode23bit", 0),
}

def write_data(f, content)
f.write(content)
Expand All @@ -36,7 +40,11 @@ def follow_inodes_block

test '.load' do
write_data(@file, TEST_CONTENT)
Fluent::Plugin::TailInput::PositionFile.load(@file, false, {}, **{logger: $log})
paths = {
"valid_path" => Fluent::Plugin::TailInput::TargetInfo.new("valid_path", 1),
"inode23bit" => Fluent::Plugin::TailInput::TargetInfo.new("inode23bit", 2),
}
Fluent::Plugin::TailInput::PositionFile.load(@file, false, TEST_CONTENT_PATHS, **{logger: $log})

@file.seek(0)
lines = @file.readlines
Expand Down Expand Up @@ -118,7 +126,7 @@ def follow_inodes_block

test 'should ignore initial existing files on follow_inode' do
write_data(@file, TEST_CONTENT)
pos_file = Fluent::Plugin::TailInput::PositionFile.load(@file, true, {}, **{logger: $log})
pos_file = Fluent::Plugin::TailInput::PositionFile.load(@file, true, TEST_CONTENT_PATHS, **{logger: $log})
@file.seek(0)
assert_equal([], @file.readlines)

Expand All @@ -138,7 +146,12 @@ def follow_inodes_block
sub_test_case '#load' do
test 'compact invalid and convert 32 bit inode value' do
write_data(@file, TEST_CONTENT)
Fluent::Plugin::TailInput::PositionFile.load(@file, false, {}, **{logger: $log})
invalid_path = "invalidpath100000000000000000000000000000000"
paths = TEST_CONTENT_PATHS.merge({
invalid_path => Fluent::Plugin::TailInput::TargetInfo.new(invalid_path, 0),
"unwatched" => Fluent::Plugin::TailInput::TargetInfo.new("unwatched", 0),
})
Fluent::Plugin::TailInput::PositionFile.load(@file, false, TEST_CONTENT_PATHS, **{logger: $log})

@file.seek(0)
lines = @file.readlines
Expand All @@ -147,6 +160,15 @@ def follow_inodes_block
assert_equal "inode23bit\t0000000000000000\t0000000000000000\n", lines[1]
end

test 'compact deleted paths' do
write_data(@file, TEST_CONTENT)
Fluent::Plugin::TailInput::PositionFile.load(@file, false, {}, **{logger: $log})

@file.seek(0)
lines = @file.readlines
assert_equal [], lines
end

test 'compact data if duplicated line' do
write_data(@file, <<~EOF)
valid_path\t0000000000000002\t0000000000000001
Expand All @@ -163,7 +185,7 @@ def follow_inodes_block
sub_test_case '#[]' do
test 'return entry' do
write_data(@file, TEST_CONTENT)
pf = Fluent::Plugin::TailInput::PositionFile.load(@file, false, {}, **{logger: $log})
pf = Fluent::Plugin::TailInput::PositionFile.load(@file, false, TEST_CONTENT_PATHS, **{logger: $log})

valid_target_info = Fluent::Plugin::TailInput::TargetInfo.new('valid_path', File.stat(@file).ino)
f = pf[valid_target_info]
Expand Down
27 changes: 4 additions & 23 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1738,7 +1738,7 @@ def test_should_write_latest_offset_after_rotate_wait
d.instance_shutdown
end

def test_should_keep_and_update_existing_file_pos_entry_for_deleted_file_when_new_file_with_same_name_created
def test_should_remove_deleted_file
config = config_element("", "", {"format" => "none"})

path = "#{TMP_DIR}/tail.txt"
Expand All @@ -1749,30 +1749,11 @@ def test_should_keep_and_update_existing_file_pos_entry_for_deleted_file_when_ne
}

d = create_driver(config)
d.run(shutdown: false)

pos_file = File.open("#{TMP_DIR}/tail.pos", "r")
pos_file.pos = 0

path_pos_ino = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(pos_file.readline)
assert_equal(path, path_pos_ino[1])
assert_equal(pos, path_pos_ino[2].to_i(16))
assert_equal(ino, path_pos_ino[3].to_i(16))

File.open("#{TMP_DIR}/tail.txt", "wb") {|f|
f.puts "test1"
f.puts "test2"
}
Timecop.travel(Time.now + 10) do
sleep 5
d.run do
pos_file = File.open("#{TMP_DIR}/tail.pos", "r")
pos_file.pos = 0
tuple = create_target_info("#{TMP_DIR}/tail.txt")
path_pos_ino = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(pos_file.readline)
assert_equal(tuple.path, path_pos_ino[1])
assert_equal(12, path_pos_ino[2].to_i(16))
assert_equal(tuple.ino, path_pos_ino[3].to_i(16))
assert_equal([], pos_file.readlines)
end
d.instance_shutdown
end

def test_should_mark_file_unwatched_after_limit_recently_modified_and_rotate_wait
Expand Down

0 comments on commit 4a4d6ae

Please sign in to comment.