Skip to content

Commit

Permalink
Merge pull request #3196 from fluent/refactor-path-ino-and-path-with-…
Browse files Browse the repository at this point in the history
…inode-arguments

Refactor path and inode arguments related code
  • Loading branch information
cosmo0920 committed Dec 14, 2020
2 parents b277720 + 978551e commit 1d34032
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 104 deletions.
70 changes: 35 additions & 35 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,11 @@ def expand_paths

def existence_path
hash = {}
@tails.each_key {|path_ino|
@tails.each_key {|target_info|
if @follow_inodes
hash[path_ino.ino] = path_ino
hash[target_info.ino] = target_info
else
hash[path_ino.path] = path_ino
hash[target_info.path] = target_info
end
}
hash
Expand All @@ -353,9 +353,9 @@ def refresh_watchers
start_watchers(added_hash) unless added_hash.empty?
end

def setup_watcher(path_with_inode, pe)
def setup_watcher(target_info, pe)
line_buffer_timer_flusher = @multiline_mode ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(path_with_inode, pe, log, @read_from_head, @follow_inodes, method(:update_watcher), line_buffer_timer_flusher, method(:io_handler))
tw = TailWatcher.new(target_info, pe, log, @read_from_head, @follow_inodes, method(:update_watcher), line_buffer_timer_flusher, method(:io_handler))

if @enable_watch_timer
tt = TimerTrigger.new(1, log) { tw.on_notify }
Expand Down Expand Up @@ -386,78 +386,77 @@ def setup_watcher(path_with_inode, pe)
raise e
end

def start_watchers(paths_with_inodes)
paths_with_inodes.each_value { |path_with_inode|
path = path_with_inode.path
ino = path_with_inode.ino
def start_watchers(targets_info)
targets_info.each_value { |target_info|
pe = nil
if @pf
pe = @pf[path, ino]
pe = @pf[target_info]
if @read_from_head && pe.read_inode.zero?
begin
pe.update(Fluent::FileWrapper.stat(path).ino, 0)
pe.update(Fluent::FileWrapper.stat(target_info.path).ino, 0)
rescue Errno::ENOENT
$log.warn "#{path} not found. Continuing without tailing it."
$log.warn "#{target_info.path} not found. Continuing without tailing it."
end
end
end

begin
tw = setup_watcher(path_with_inode, pe)
tw = setup_watcher(target_info, pe)
rescue WatcherSetupError => e
log.warn "Skip #{path} because unexpected setup error happens: #{e}"
log.warn "Skip #{target_info.path} because unexpected setup error happens: #{e}"
next
end
target_info = TargetInfo.new(path, Fluent::FileWrapper.stat(path).ino)
target_info = TargetInfo.new(target_info.path, Fluent::FileWrapper.stat(target_info.path).ino)
@tails[target_info] = tw
}
end

def stop_watchers(paths_with_inodes, immediate: false, unwatched: false, remove_watcher: true)
paths_with_inodes.each_value { |path_with_inode|
def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watcher: true)
targets_info.each_value { |target_info|
if remove_watcher
tw = @tails.delete(path_with_inode)
tw = @tails.delete(target_info)
else
tw = @tails[path_with_inode]
tw = @tails[target_info]
end
if tw
tw.unwatched = unwatched
if immediate
detach_watcher(tw, path_with_inode.ino, false)
detach_watcher(tw, target_info.ino, false)
else
detach_watcher_after_rotate_wait(tw, path_with_inode.ino)
detach_watcher_after_rotate_wait(tw, target_info.ino)
end
end
}
end

def close_watcher_handles
@tails.keys.each do |path_with_inode|
tw = @tails.delete(path_with_inode)
@tails.keys.each do |target_info|
tw = @tails.delete(target_info)
if tw
tw.close
end
end
end

# refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety.
def update_watcher(path_with_inode, pe)
log.info("detected rotation of #{path}; waiting #{@rotate_wait} seconds")
def update_watcher(target_info, pe)
log.info("detected rotation of #{target_info.path}; waiting #{@rotate_wait} seconds")

if @pf
unless pe.read_inode == @pf[path_with_inode.path, pe.read_inode].read_inode
pe_inode = pe.read_inode
target_info_from_position_entry = TargetInfo.new(target_info.path, pe_inode)
unless pe_inode == @pf[target_info_from_position_entry].read_inode
log.debug "Skip update_watcher because watcher has been already updated by other inotify event"
return
end
end

target_info = TargetInfo.new(path_with_inode.path, pe.read_inode)
rotated_tw = @tails[target_info]

new_target_info = TargetInfo.new(path_with_inode.path, path_with_inode.ino)
rotated_target_info = TargetInfo.new(target_info.path, pe.read_inode)
rotated_tw = @tails[rotated_target_info]
new_target_info = target_info.dup

if @follow_inodes
new_position_entry = @pf[path_with_inode.path, path_with_inode.ino]
new_position_entry = @pf[target_info]

if new_position_entry.read_inode == 0
@tails[new_target_info] = setup_watcher(new_target_info, new_position_entry)
Expand All @@ -481,7 +480,8 @@ def detach_watcher(tw, ino, close_io = true)
tw.close if close_io

if tw.unwatched && @pf
@pf.unwatch(tw.path, ino)
target_info = TargetInfo.new(tw.path, ino)
@pf.unwatch(target_info)
end
end

Expand Down Expand Up @@ -657,9 +657,9 @@ def on_timer
end

class TailWatcher
def initialize(path_with_inode, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build)
@path = path_with_inode.path
@ino = path_with_inode.ino
def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build)
@path = target_info.path
@ino = target_info.ino
@pe = pe || MemoryPositionEntry.new
@read_from_head = read_from_head
@follow_inodes = follow_inodes
Expand Down
31 changes: 12 additions & 19 deletions lib/fluent/plugin/in_tail/position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,34 +37,26 @@ def initialize(file, follow_inodes, existing_paths, logger: nil)
@existing_paths = existing_paths
end

def [](path, inode)
if @follow_inodes && m = @map[inode]
return m
elsif !@follow_inodes && m = @map[path]
def [](target_info)
if m = @map[@follow_inodes ? target_info.ino : target_info.path]
return m
end

@file_mutex.synchronize {
@file.seek(0, IO::SEEK_END)
seek = @file.pos + path.bytesize + 1
@file.write "#{path}\t0000000000000000\t0000000000000000\n"
seek = @file.pos + target_info.path.bytesize + 1
@file.write "#{target_info.path}\t0000000000000000\t0000000000000000\n"
if @follow_inodes
@map[inode] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0)
@map[target_info.ino] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0)
else
@map[path] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0)
@map[target_info.path] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0)
end
}
end

def unwatch(path, inode)
if @follow_inodes
if (entry = @map.delete(inode))
entry.update_pos(UNWATCHED_POSITION)
end
else
if (entry = @map.delete(path))
entry.update_pos(UNWATCHED_POSITION)
end
def unwatch(target_info)
if (entry = @map.delete(@follow_inodes ? target_info.ino : target_info.path))
entry.update_pos(UNWATCHED_POSITION)
end
end

Expand Down Expand Up @@ -112,8 +104,9 @@ def try_compact
@file.truncate(0)
@file.write(entries.values.map(&:to_entry_fmt).join)

entries.each do |path, val|
if (m = @map[path])
# entry contains path/ino key and value.
entries.each do |key, val|
if (m = @map[key])
m.seek = val.seek
end
end
Expand Down
40 changes: 25 additions & 15 deletions test/plugin/in_tail/test_position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,15 @@ def follow_inodes_block

test 'update seek postion of remained position entry' do
pf = Fluent::Plugin::TailInput::PositionFile.new(@file, false, {}, **{logger: $log})
pf['path1', -1]
pf['path2', -1]
pf['path3', -1]
target_info1 = Fluent::Plugin::TailInput::TargetInfo.new('path1', -1)
target_info2 = Fluent::Plugin::TailInput::TargetInfo.new('path2', -1)
target_info3 = Fluent::Plugin::TailInput::TargetInfo.new('path3', -1)
pf[target_info1]
pf[target_info2]
pf[target_info3]

pf.unwatch('path1', 1234)
target_info1_2 = Fluent::Plugin::TailInput::TargetInfo.new('path1', 1234)
pf.unwatch(target_info1_2)

pf.try_compact

Expand All @@ -101,8 +105,10 @@ def follow_inodes_block
assert_equal "path3\t0000000000000000\t0000000000000000\n", lines[1]
assert_equal 2, lines.size

pf.unwatch('path2', 1235)
pf.unwatch('path3', 1236)
target_info2_2 = Fluent::Plugin::TailInput::TargetInfo.new('path2', 1235)
target_info3_2 = Fluent::Plugin::TailInput::TargetInfo.new('path3', 1236)
pf.unwatch(target_info2_2)
pf.unwatch(target_info3_2)
@file.seek(0)
lines = @file.readlines
assert_equal "path2\t#{UNWATCHED_STR}\t0000000000000000\n", lines[0]
Expand Down Expand Up @@ -141,7 +147,8 @@ def follow_inodes_block
write_data(@file, TEST_CONTENT)
pf = Fluent::Plugin::TailInput::PositionFile.load(@file, false, {}, **{logger: $log})

f = pf['valid_path', Fluent::FileWrapper.stat(@file).ino]
valid_target_info = Fluent::Plugin::TailInput::TargetInfo.new('valid_path', Fluent::FileWrapper.stat(@file).ino)
f = pf[valid_target_info]
assert_equal Fluent::Plugin::TailInput::FilePositionEntry, f.class
assert_equal 2, f.read_pos
assert_equal 1, f.read_inode
Expand All @@ -150,7 +157,8 @@ def follow_inodes_block
lines = @file.readlines
assert_equal 2, lines.size

f = pf['nonexist_path', -1]
nonexistent_target_info = Fluent::Plugin::TailInput::TargetInfo.new('nonexist_path', -1)
f = pf[nonexistent_target_info]
assert_equal Fluent::Plugin::TailInput::FilePositionEntry, f.class
assert_equal 0, f.read_pos
assert_equal 0, f.read_inode
Expand All @@ -165,17 +173,17 @@ def follow_inodes_block
write_data(@file, TEST_CONTENT)
pf = Fluent::Plugin::TailInput::PositionFile.load(@file, false, {}, logger: $log)

f = pf['nonexist_path', -1]
f = pf[Fluent::Plugin::TailInput::TargetInfo.new('nonexist_path', -1)]
assert_equal 0, f.read_inode
assert_equal 0, f.read_pos

pf['valid_path', Fluent::FileWrapper.stat(@file).ino].update(1, 2)
pf[Fluent::Plugin::TailInput::TargetInfo.new('valid_path', Fluent::FileWrapper.stat(@file).ino)].update(1, 2)

f = pf['nonexist_path', -1]
f = pf[Fluent::Plugin::TailInput::TargetInfo.new('nonexist_path', -1)]
assert_equal 0, f.read_inode
assert_equal 0, f.read_pos

pf['nonexist_path', -1].update(1, 2)
pf[Fluent::Plugin::TailInput::TargetInfo.new('nonexist_path', -1)].update(1, 2)
assert_equal 1, f.read_inode
assert_equal 2, f.read_pos
end
Expand All @@ -186,14 +194,16 @@ def follow_inodes_block
write_data(@file, TEST_CONTENT)
pf = Fluent::Plugin::TailInput::PositionFile.load(@file, false, {}, logger: $log)
inode1 = Fluent::FileWrapper.stat(@file).ino
p1 = pf['valid_path', inode1]
target_info1 = Fluent::Plugin::TailInput::TargetInfo.new('valid_path', inode1)
p1 = pf[target_info1]
assert_equal Fluent::Plugin::TailInput::FilePositionEntry, p1.class

pf.unwatch('valid_path', inode1)
pf.unwatch(target_info1)
assert_equal p1.read_pos, Fluent::Plugin::TailInput::PositionFile::UNWATCHED_POSITION

inode2 = Fluent::FileWrapper.stat(@file).ino
p2 = pf['valid_path', inode2]
target_info2 = Fluent::Plugin::TailInput::TargetInfo.new('valid_path', inode2)
p2 = pf[target_info2]
assert_equal Fluent::Plugin::TailInput::FilePositionEntry, p2.class

assert_not_equal p1, p2
Expand Down

0 comments on commit 1d34032

Please sign in to comment.