Skip to content

Commit

Permalink
position_file: Don't remove existing entries on try_compact
Browse files Browse the repository at this point in the history
When both follow_inodes and pos_file_compaction_interval are specified,
file entries that didn't exist on start up will be deleted unexpectedly
when compaction is triggered. This patch fixes this issue.

Signed-off-by: Takuro Ashie <ashie@clear-code.com>
  • Loading branch information
ashie committed Jul 15, 2021
1 parent 3929f6f commit a242b06
Showing 1 changed file with 18 additions and 16 deletions.
34 changes: 18 additions & 16 deletions lib/fluent/plugin/in_tail/position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,18 @@ class PositionFile
UNWATCHED_POSITION = 0xffffffffffffffff
POSITION_FILE_ENTRY_REGEX = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.freeze

def self.load(file, follow_inodes, existing_paths, logger:)
pf = new(file, follow_inodes, existing_paths, logger: logger)
pf.load
def self.load(file, follow_inodes, existing_targets, logger:)
pf = new(file, follow_inodes, logger: logger)
pf.load(existing_targets)
pf
end

def initialize(file, follow_inodes, existing_paths, logger: nil)
def initialize(file, follow_inodes, existing_targets, logger: nil)
@file = file
@logger = logger
@file_mutex = Mutex.new
@map = {}
@follow_inodes = follow_inodes
@existing_paths = existing_paths
end

def [](target_info)
Expand All @@ -60,8 +59,8 @@ def unwatch(target_info)
end
end

def load
compact
def load(existing_targets = nil)
compact(existing_targets)

map = {}
@file_mutex.synchronize do
Expand Down Expand Up @@ -118,17 +117,17 @@ def try_compact

private

def compact
def compact(existing_targets = nil)
@file_mutex.synchronize do
entries = fetch_compacted_entries.values.map(&:to_entry_fmt)
entries = fetch_compacted_entries(existing_targets).values.map(&:to_entry_fmt)

@file.pos = 0
@file.truncate(0)
@file.write(entries.join)
end
end

def fetch_compacted_entries
def fetch_compacted_entries(existing_targets = nil)
entries = {}

@file.pos = 0
Expand Down Expand Up @@ -159,15 +158,18 @@ def fetch_compacted_entries
end
end

entries = remove_deleted_files_entries(entries, @existing_paths) if @follow_inodes
entries = remove_deleted_files_entries(entries, existing_targets) if @follow_inodes
entries
end

def remove_deleted_files_entries(existent_entries, existing_paths)
filtered_entries = existent_entries.select {|file_entry|
existing_paths.key?(file_entry)
}
filtered_entries
def remove_deleted_files_entries(existent_entries, existing_targets)
if existing_targets
existent_entries.select { |path_or_ino|
existing_targets.key?(path_or_ino)
}
else
existent_entries
end
end
end

Expand Down

0 comments on commit a242b06

Please sign in to comment.