Skip to content

Commit

Permalink
Merge pull request #2805 from ganmacs/autocompaction-of-posfile
Browse files Browse the repository at this point in the history
Impl posfile auto-compaction in in_tail
  • Loading branch information
ganmacs committed Feb 5, 2020
2 parents 0e072e7 + 8946e53 commit d7cebc6
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 55 deletions.
11 changes: 10 additions & 1 deletion lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def initialize
config_param :rotate_wait, :time, default: 5
desc 'Fluentd will record the position it last read into this file.'
config_param :pos_file, :string, default: nil
desc 'The cleanup interval of pos file'
config_param :pos_file_compaction_interval, :integer, default: nil
desc 'Start to read the logs from the head of file, not bottom.'
config_param :read_from_head, :bool, default: false
# When the program deletes log file and re-creates log file with same filename after passed refresh_interval,
Expand Down Expand Up @@ -211,7 +213,14 @@ def start
FileUtils.mkdir_p(pos_file_dir) unless Dir.exist?(pos_file_dir)
@pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, @file_perm)
@pf_file.sync = true
@pf = PositionFile.parse(@pf_file)
@pf = PositionFile.load(@pf_file, logger: log)

if @pos_file_compaction_interval
timer_execute(:in_tail_refresh_compact_pos_file, @pos_file_compaction_interval) do
log.info('Clean up the pos file')
@pf.try_compact
end
end
end

refresh_watchers unless @skip_refresh_on_startup
Expand Down
116 changes: 78 additions & 38 deletions lib/fluent/plugin/in_tail/position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
#

require 'fluent/plugin/in_tail'
require 'fluent/plugin/input'

module Fluent::Plugin
class TailInput < Fluent::Plugin::Input
Expand All @@ -23,10 +23,17 @@ class PositionFile
POSITION_FILE_ENTRY_REGEX = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.freeze
POSITION_FILE_ENTRY_FORMAT = "%s\t%016x\t%016x\n".freeze

def initialize(file, file_mutex, map)
def self.load(file, logger:)
pf = new(file, logger: logger)
pf.load
pf
end

def initialize(file, logger: nil)
@file = file
@file_mutex = file_mutex
@map = map
@logger = logger
@file_mutex = Mutex.new
@map = {}
end

def [](path)
Expand All @@ -48,56 +55,89 @@ def unwatch(path)
end
end

def self.parse(file)
compact(file)
def load
compact

file_mutex = Mutex.new
map = {}
file.pos = 0
file.each_line {|line|
m = POSITION_FILE_ENTRY_REGEX.match(line)
unless m
$log.warn "Unparsable line in pos_file: #{line}"
next
@file_mutex.synchronize do
@file.pos = 0

@file.each_line do |line|
m = POSITION_FILE_ENTRY_REGEX.match(line)
next if m.nil?

path = m[1]
pos = m[2].to_i(16)
ino = m[3].to_i(16)
seek = @file.pos - line.bytesize + path.bytesize + 1
map[path] = FilePositionEntry.new(@file, @file_mutex, seek, pos, ino)
end
path = m[1]
pos = m[2].to_i(16)
ino = m[3].to_i(16)
seek = file.pos - line.bytesize + path.bytesize + 1
map[path] = FilePositionEntry.new(file, file_mutex, seek, pos, ino)
}
new(file, file_mutex, map)
end

@map = map
end

# This method is similer to #compact but it tries to get less lock to avoid a lock contention
def try_compact
last_modified = nil
size = nil

@file_mutex.synchronize do
last_modified = @file.mtime
size = @file.size
end

entries = fetch_compacted_entries

@file_mutex.synchronize do
if last_modified == @file.mtime && size == @file.size
@file.pos = 0
@file.truncate(0)
@file.write(entries.join)
else
# skip
end
end
end

private

def compact
@file_mutex.synchronize do
entries = fetch_compacted_entries

@file.pos = 0
@file.truncate(0)
@file.write(entries.join)
end
end

# Clean up unwatched file entries
def self.compact(file)
existent_entries = {}
file.pos = 0
file.each_line do |line|
def fetch_compacted_entries
entries = {}

@file.pos = 0
@file.each_line do |line|
m = POSITION_FILE_ENTRY_REGEX.match(line)
unless m
$log.warn "Unparsable line in pos_file: #{line}"
if m.nil?
@logger.warn "Unparsable line in pos_file: #{line}" if @logger
next
end

path = m[1]
pos = m[2].to_i(16)
ino = m[3].to_i(16)

if pos == UNWATCHED_POSITION
next
end
@logger.debug "Remove unwatched line from pos_file: #{line}" if @logger
else
if entries.include?(path)
@logger.warn("#{path} already exists. use latest one: deleted #{entries[path]}") if @logger
end

if existent_entries.include?(path)
$log.warn("#{path} already exists. use latest one: deleted #{existent_entries[path]}")
entries[path] = (POSITION_FILE_ENTRY_FORMAT % [path, pos, ino])
end

# 32bit inode converted to 64bit at this phase
existent_entries[path] = (POSITION_FILE_ENTRY_FORMAT % [path, pos, ino])
end

file.pos = 0
file.truncate(0)
file.write(existent_entries.values.join)
entries.values
end
end

Expand Down
79 changes: 64 additions & 15 deletions test/plugin/in_tail/test_position_file.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
require_relative '../../helper'
require 'fluent/plugin/in_tail/position_file'

require 'fileutils'
require 'tempfile'

class IntailPositionFileTest < Test::Unit::TestCase
setup do
@file = StringIO.new(+'')
@file = Tempfile.new('intail_position_file_test')
end

teardown do
@file.close rescue nil
@file.unlink rescue nil
end

UNWATCHED_STR = '%016x' % Fluent::Plugin::TailInput::PositionFile::UNWATCHED_POSITION
Expand All @@ -19,10 +27,21 @@ def write_data(f, content)
f.seek(0)
end

sub_test_case '.compact' do
test '.load' do
write_data(@file, TEST_CONTENT)
Fluent::Plugin::TailInput::PositionFile.load(@file, logger: $log)

@file.seek(0)
lines = @file.readlines
assert_equal 2, lines.size
assert_equal "valid_path\t0000000000000002\t0000000000000001\n", lines[0]
assert_equal "inode23bit\t0000000000000000\t0000000000000000\n", lines[1]
end

sub_test_case '#try_compact' do
test 'compact invalid and convert 32 bit inode value' do
write_data(@file, TEST_CONTENT)
Fluent::Plugin::TailInput::PositionFile.compact(@file)
Fluent::Plugin::TailInput::PositionFile.new(@file, logger: $log).try_compact

@file.seek(0)
lines = @file.readlines
Expand All @@ -36,29 +55,59 @@ def write_data(f, content)
valid_path\t0000000000000002\t0000000000000001
valid_path\t0000000000000003\t0000000000000004
EOF
Fluent::Plugin::TailInput::PositionFile.compact(@file)
Fluent::Plugin::TailInput::PositionFile.new(@file, logger: $log).try_compact

@file.seek(0)
lines = @file.readlines
assert_equal "valid_path\t0000000000000003\t0000000000000004\n", lines[0]
end

test 'does not change when the file is changed' do
write_data(@file, TEST_CONTENT)
pf = Fluent::Plugin::TailInput::PositionFile.new(@file, logger: $log)

mock.proxy(pf).fetch_compacted_entries do |r|
@file.write("unwatched\t#{UNWATCHED_STR}\t0000000000000000\n")
r
end

pf.try_compact

@file.seek(0)
lines = @file.readlines
assert_equal 5, lines.size
end
end

test '.parse' do
write_data(@file, TEST_CONTENT)
Fluent::Plugin::TailInput::PositionFile.parse(@file)
sub_test_case '#load' do
test 'compact invalid and convert 32 bit inode value' do
write_data(@file, TEST_CONTENT)
Fluent::Plugin::TailInput::PositionFile.load(@file, logger: $log)

@file.seek(0)
lines = @file.readlines
assert_equal 2, lines.size
assert_equal "valid_path\t0000000000000002\t0000000000000001\n", lines[0]
assert_equal "inode23bit\t0000000000000000\t0000000000000000\n", lines[1]
@file.seek(0)
lines = @file.readlines
assert_equal 2, lines.size
assert_equal "valid_path\t0000000000000002\t0000000000000001\n", lines[0]
assert_equal "inode23bit\t0000000000000000\t0000000000000000\n", lines[1]
end

test 'compact data if duplicated line' do
write_data(@file, <<~EOF)
valid_path\t0000000000000002\t0000000000000001
valid_path\t0000000000000003\t0000000000000004
EOF
Fluent::Plugin::TailInput::PositionFile.new(@file, logger: $log).load

@file.seek(0)
lines = @file.readlines
assert_equal "valid_path\t0000000000000003\t0000000000000004\n", lines[0]
end
end

sub_test_case '#[]' do
test 'return entry' do
write_data(@file, TEST_CONTENT)
pf = Fluent::Plugin::TailInput::PositionFile.parse(@file)
pf = Fluent::Plugin::TailInput::PositionFile.load(@file, logger: $log)

f = pf['valid_path']
assert_equal Fluent::Plugin::TailInput::FilePositionEntry, f.class
Expand All @@ -82,7 +131,7 @@ def write_data(f, content)

test 'does not change other value position if other entry try to write' do
write_data(@file, TEST_CONTENT)
pf = Fluent::Plugin::TailInput::PositionFile.parse(@file)
pf = Fluent::Plugin::TailInput::PositionFile.load(@file, logger: $log)

f = pf['nonexist_path']
assert_equal 0, f.read_inode
Expand All @@ -103,7 +152,7 @@ def write_data(f, content)
sub_test_case '#unwatch' do
test 'deletes entry by path' do
write_data(@file, TEST_CONTENT)
pf = Fluent::Plugin::TailInput::PositionFile.parse(@file)
pf = Fluent::Plugin::TailInput::PositionFile.load(@file, logger: $log)
p1 = pf['valid_path']
assert_equal Fluent::Plugin::TailInput::FilePositionEntry, p1.class

Expand Down
2 changes: 1 addition & 1 deletion test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1088,7 +1088,7 @@ def test_z_refresh_watchers
plugin = create_driver(EX_CONFIG, false).instance
sio = StringIO.new
plugin.instance_eval do
@pf = Fluent::Plugin::TailInput::PositionFile.parse(sio)
@pf = Fluent::Plugin::TailInput::PositionFile.load(sio, logger: $log)
@loop = Coolio::Loop.new
end

Expand Down

0 comments on commit d7cebc6

Please sign in to comment.