Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update seek position of position file entry #2922

Merged
merged 3 commits into from
Apr 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 23 additions & 6 deletions lib/fluent/plugin/in_tail/position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ class TailInput < Fluent::Plugin::Input
class PositionFile
UNWATCHED_POSITION = 0xffffffffffffffff
POSITION_FILE_ENTRY_REGEX = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.freeze
POSITION_FILE_ENTRY_FORMAT = "%s\t%016x\t%016x\n".freeze

def self.load(file, logger:)
pf = new(file, logger: logger)
Expand Down Expand Up @@ -83,8 +82,8 @@ def try_compact
size = nil

@file_mutex.synchronize do
last_modified = @file.mtime
size = @file.size
last_modified = @file.mtime
end

entries = fetch_compacted_entries
Expand All @@ -93,7 +92,13 @@ def try_compact
if last_modified == @file.mtime && size == @file.size
@file.pos = 0
@file.truncate(0)
@file.write(entries.join)
@file.write(entries.values.map(&:to_entry_fmt).join)

entries.each do |path, val|
if (m = @map[path])
m.seek = val.seek
end
end
else
# skip
end
Expand All @@ -104,7 +109,7 @@ def try_compact

def compact
@file_mutex.synchronize do
entries = fetch_compacted_entries
entries = fetch_compacted_entries.values.map(&:to_entry_fmt)

@file.pos = 0
@file.truncate(0)
Expand All @@ -116,6 +121,7 @@ def fetch_compacted_entries
entries = {}

@file.pos = 0
file_pos = 0
@file.each_line do |line|
m = POSITION_FILE_ENTRY_REGEX.match(line)
if m.nil?
Expand All @@ -133,11 +139,20 @@ def fetch_compacted_entries
@logger.warn("#{path} already exists. use latest one: deleted #{entries[path]}") if @logger
end

entries[path] = (POSITION_FILE_ENTRY_FORMAT % [path, pos, ino])
entries[path] = Entry.new(path, pos, ino, file_pos + path.size + 1)
file_pos += line.size
end
end

entries.values
entries
end
end

Entry = Struct.new(:path, :pos, :ino, :seek) do
POSITION_FILE_ENTRY_FORMAT = "%s\t%016x\t%016x\n".freeze

def to_entry_fmt
POSITION_FILE_ENTRY_FORMAT % [path, pos, ino]
end
end

Expand All @@ -158,6 +173,8 @@ def initialize(file, file_mutex, seek, pos, inode)
@inode = inode
end

attr_writer :seek

def update(ino, pos)
@file_mutex.synchronize {
@file.pos = @seek
Expand Down
24 changes: 24 additions & 0 deletions test/plugin/in_tail/test_position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,30 @@ def write_data(f, content)
lines = @file.readlines
assert_equal 5, lines.size
end

test 'update seek postion of remained position entry' do
pf = Fluent::Plugin::TailInput::PositionFile.new(@file, logger: $log)
pf['path1']
pf['path2']
pf['path3']
pf.unwatch('path1')

pf.try_compact

@file.seek(0)
lines = @file.readlines
assert_equal "path2\t0000000000000000\t0000000000000000\n", lines[0]
assert_equal "path3\t0000000000000000\t0000000000000000\n", lines[1]
assert_equal 2, lines.size

pf.unwatch('path2')
pf.unwatch('path3')
@file.seek(0)
lines = @file.readlines
assert_equal "path2\t#{UNWATCHED_STR}\t0000000000000000\n", lines[0]
assert_equal "path3\t#{UNWATCHED_STR}\t0000000000000000\n", lines[1]
assert_equal 2, lines.size
end
end

sub_test_case '#load' do
Expand Down