Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Impl posfile auto-compaction in in_tail #2805

Merged
merged 10 commits into from
Feb 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 10 additions & 1 deletion lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def initialize
config_param :rotate_wait, :time, default: 5
desc 'Fluentd will record the position it last read into this file.'
config_param :pos_file, :string, default: nil
desc 'The cleanup interval of pos file'
config_param :pos_file_compaction_interval, :integer, default: nil
desc 'Start to read the logs from the head of file, not bottom.'
config_param :read_from_head, :bool, default: false
# When the program deletes log file and re-creates log file with same filename after passed refresh_interval,
Expand Down Expand Up @@ -211,7 +213,14 @@ def start
FileUtils.mkdir_p(pos_file_dir) unless Dir.exist?(pos_file_dir)
@pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, @file_perm)
@pf_file.sync = true
@pf = PositionFile.parse(@pf_file)
@pf = PositionFile.load(@pf_file, logger: log)

if @pos_file_compaction_interval
timer_execute(:in_tail_refresh_compact_pos_file, @pos_file_compaction_interval) do
log.info('Clean up the pos file')
@pf.try_compact
end
end
end

refresh_watchers unless @skip_refresh_on_startup
Expand Down
116 changes: 78 additions & 38 deletions lib/fluent/plugin/in_tail/position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
#

require 'fluent/plugin/in_tail'
require 'fluent/plugin/input'

module Fluent::Plugin
class TailInput < Fluent::Plugin::Input
Expand All @@ -23,10 +23,17 @@ class PositionFile
POSITION_FILE_ENTRY_REGEX = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.freeze
POSITION_FILE_ENTRY_FORMAT = "%s\t%016x\t%016x\n".freeze

def initialize(file, file_mutex, map)
def self.load(file, logger:)
pf = new(file, logger: logger)
pf.load
pf
end

def initialize(file, logger: nil)
@file = file
@file_mutex = file_mutex
@map = map
@logger = logger
@file_mutex = Mutex.new
@map = {}
end

def [](path)
Expand All @@ -48,56 +55,89 @@ def unwatch(path)
end
end

def self.parse(file)
compact(file)
def load
compact

file_mutex = Mutex.new
map = {}
file.pos = 0
file.each_line {|line|
m = POSITION_FILE_ENTRY_REGEX.match(line)
unless m
$log.warn "Unparsable line in pos_file: #{line}"
next
@file_mutex.synchronize do
@file.pos = 0

@file.each_line do |line|
m = POSITION_FILE_ENTRY_REGEX.match(line)
next if m.nil?

path = m[1]
pos = m[2].to_i(16)
ino = m[3].to_i(16)
seek = @file.pos - line.bytesize + path.bytesize + 1
map[path] = FilePositionEntry.new(@file, @file_mutex, seek, pos, ino)
end
path = m[1]
pos = m[2].to_i(16)
ino = m[3].to_i(16)
seek = file.pos - line.bytesize + path.bytesize + 1
map[path] = FilePositionEntry.new(file, file_mutex, seek, pos, ino)
}
new(file, file_mutex, map)
end

@map = map
end

# This method is similer to #compact but it tries to get less lock to avoid a lock contention
def try_compact
last_modified = nil
size = nil

@file_mutex.synchronize do
last_modified = @file.mtime
size = @file.size
end

entries = fetch_compacted_entries

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having this function outside of the lock is probably a race as it reads the file without a lock and as far as I can tell the file writes are not atomic
so this can read a file currently being modified by another thread leading to corrupted data

probable cause of: #2918

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code seems not to have nothing to do with #2918 because here verify that the file content doesn't change.
So there is no need to lock here.


@file_mutex.synchronize do
if last_modified == @file.mtime && size == @file.size
@file.pos = 0
@file.truncate(0)
@file.write(entries.join)
else
# skip
end
end
end

private

def compact
@file_mutex.synchronize do
entries = fetch_compacted_entries

@file.pos = 0
@file.truncate(0)
@file.write(entries.join)
end
end

# Clean up unwatched file entries
def self.compact(file)
existent_entries = {}
file.pos = 0
file.each_line do |line|
def fetch_compacted_entries
entries = {}

@file.pos = 0
@file.each_line do |line|
m = POSITION_FILE_ENTRY_REGEX.match(line)
unless m
$log.warn "Unparsable line in pos_file: #{line}"
if m.nil?
@logger.warn "Unparsable line in pos_file: #{line}" if @logger
next
end

path = m[1]
pos = m[2].to_i(16)
ino = m[3].to_i(16)

if pos == UNWATCHED_POSITION
next
end
@logger.debug "Remove unwatched line from pos_file: #{line}" if @logger
else
if entries.include?(path)
@logger.warn("#{path} already exists. use latest one: deleted #{entries[path]}") if @logger
end

if existent_entries.include?(path)
$log.warn("#{path} already exists. use latest one: deleted #{existent_entries[path]}")
entries[path] = (POSITION_FILE_ENTRY_FORMAT % [path, pos, ino])
end

# 32bit inode converted to 64bit at this phase
existent_entries[path] = (POSITION_FILE_ENTRY_FORMAT % [path, pos, ino])
end

file.pos = 0
file.truncate(0)
file.write(existent_entries.values.join)
entries.values
end
end

Expand Down
79 changes: 64 additions & 15 deletions test/plugin/in_tail/test_position_file.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
require_relative '../../helper'
require 'fluent/plugin/in_tail/position_file'

require 'fileutils'
require 'tempfile'

class IntailPositionFileTest < Test::Unit::TestCase
setup do
@file = StringIO.new(+'')
@file = Tempfile.new('intail_position_file_test')
end

teardown do
@file.close rescue nil
@file.unlink rescue nil
end

UNWATCHED_STR = '%016x' % Fluent::Plugin::TailInput::PositionFile::UNWATCHED_POSITION
Expand All @@ -19,10 +27,21 @@ def write_data(f, content)
f.seek(0)
end

sub_test_case '.compact' do
test '.load' do
write_data(@file, TEST_CONTENT)
Fluent::Plugin::TailInput::PositionFile.load(@file, logger: $log)

@file.seek(0)
lines = @file.readlines
assert_equal 2, lines.size
assert_equal "valid_path\t0000000000000002\t0000000000000001\n", lines[0]
assert_equal "inode23bit\t0000000000000000\t0000000000000000\n", lines[1]
end

sub_test_case '#try_compact' do
test 'compact invalid and convert 32 bit inode value' do
write_data(@file, TEST_CONTENT)
Fluent::Plugin::TailInput::PositionFile.compact(@file)
Fluent::Plugin::TailInput::PositionFile.new(@file, logger: $log).try_compact

@file.seek(0)
lines = @file.readlines
Expand All @@ -36,29 +55,59 @@ def write_data(f, content)
valid_path\t0000000000000002\t0000000000000001
valid_path\t0000000000000003\t0000000000000004
EOF
Fluent::Plugin::TailInput::PositionFile.compact(@file)
Fluent::Plugin::TailInput::PositionFile.new(@file, logger: $log).try_compact

@file.seek(0)
lines = @file.readlines
assert_equal "valid_path\t0000000000000003\t0000000000000004\n", lines[0]
end

test 'does not change when the file is changed' do
write_data(@file, TEST_CONTENT)
pf = Fluent::Plugin::TailInput::PositionFile.new(@file, logger: $log)

mock.proxy(pf).fetch_compacted_entries do |r|
@file.write("unwatched\t#{UNWATCHED_STR}\t0000000000000000\n")
r
end

pf.try_compact

@file.seek(0)
lines = @file.readlines
assert_equal 5, lines.size
end
end

test '.parse' do
write_data(@file, TEST_CONTENT)
Fluent::Plugin::TailInput::PositionFile.parse(@file)
sub_test_case '#load' do
test 'compact invalid and convert 32 bit inode value' do
write_data(@file, TEST_CONTENT)
Fluent::Plugin::TailInput::PositionFile.load(@file, logger: $log)

@file.seek(0)
lines = @file.readlines
assert_equal 2, lines.size
assert_equal "valid_path\t0000000000000002\t0000000000000001\n", lines[0]
assert_equal "inode23bit\t0000000000000000\t0000000000000000\n", lines[1]
@file.seek(0)
lines = @file.readlines
assert_equal 2, lines.size
assert_equal "valid_path\t0000000000000002\t0000000000000001\n", lines[0]
assert_equal "inode23bit\t0000000000000000\t0000000000000000\n", lines[1]
end

test 'compact data if duplicated line' do
write_data(@file, <<~EOF)
valid_path\t0000000000000002\t0000000000000001
valid_path\t0000000000000003\t0000000000000004
EOF
Fluent::Plugin::TailInput::PositionFile.new(@file, logger: $log).load

@file.seek(0)
lines = @file.readlines
assert_equal "valid_path\t0000000000000003\t0000000000000004\n", lines[0]
end
end

sub_test_case '#[]' do
test 'return entry' do
write_data(@file, TEST_CONTENT)
pf = Fluent::Plugin::TailInput::PositionFile.parse(@file)
pf = Fluent::Plugin::TailInput::PositionFile.load(@file, logger: $log)

f = pf['valid_path']
assert_equal Fluent::Plugin::TailInput::FilePositionEntry, f.class
Expand All @@ -82,7 +131,7 @@ def write_data(f, content)

test 'does not change other value position if other entry try to write' do
write_data(@file, TEST_CONTENT)
pf = Fluent::Plugin::TailInput::PositionFile.parse(@file)
pf = Fluent::Plugin::TailInput::PositionFile.load(@file, logger: $log)

f = pf['nonexist_path']
assert_equal 0, f.read_inode
Expand All @@ -103,7 +152,7 @@ def write_data(f, content)
sub_test_case '#unwatch' do
test 'deletes entry by path' do
write_data(@file, TEST_CONTENT)
pf = Fluent::Plugin::TailInput::PositionFile.parse(@file)
pf = Fluent::Plugin::TailInput::PositionFile.load(@file, logger: $log)
p1 = pf['valid_path']
assert_equal Fluent::Plugin::TailInput::FilePositionEntry, p1.class

Expand Down
2 changes: 1 addition & 1 deletion test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1088,7 +1088,7 @@ def test_z_refresh_watchers
plugin = create_driver(EX_CONFIG, false).instance
sio = StringIO.new
plugin.instance_eval do
@pf = Fluent::Plugin::TailInput::PositionFile.parse(sio)
@pf = Fluent::Plugin::TailInput::PositionFile.load(sio, logger: $log)
@loop = Coolio::Loop.new
end

Expand Down