Skip to content

Commit

Permalink
in_tail: Add switch_io_after_read parameter. fix #2478
Browse files Browse the repository at this point in the history
Signed-off-by: Masahiro Nakagawa <repeatedly@gmail.com>
  • Loading branch information
repeatedly committed Jul 26, 2019
1 parent 36b6260 commit 520f89b
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
12 changes: 8 additions & 4 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ def initialize
config_param :open_on_every_update, :bool, default: false
desc 'Limit the watching files that the modification time is within the specified time range (when use \'*\' in path).'
config_param :limit_recently_modified, :time, default: nil
desc 'After read lines, switch IO operation to other file'
config_param :switch_io_after_read, :bool, default: false # TODO: set true in fluentd v2
desc 'Enable the option to skip the refresh of watching list on startup.'
config_param :skip_refresh_on_startup, :bool, default: false
desc 'Ignore repeated permission error logs'
Expand Down Expand Up @@ -272,7 +274,7 @@ def refresh_watchers

def setup_watcher(path, pe)
line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @enable_stat_watcher, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, &method(:receive_lines))
tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @enable_stat_watcher, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, @switch_io_after_read, &method(:receive_lines))
tw.attach do |watcher|
event_loop_attach(watcher.timer_trigger) if watcher.timer_trigger
event_loop_attach(watcher.stat_trigger) if watcher.stat_trigger
Expand Down Expand Up @@ -483,7 +485,8 @@ def parse_multilines(lines, tail_watcher)
end

class TailWatcher
def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, enable_stat_watcher, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines)
# TODO: Use struct to reduce the number of argument
def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, enable_stat_watcher, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, switch_io_after_read, &receive_lines)
@path = path
@rotate_wait = rotate_wait
@pe = pe || MemoryPositionEntry.new
Expand All @@ -505,10 +508,11 @@ def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, e
@from_encoding = from_encoding
@encoding = encoding
@open_on_every_update = open_on_every_update
@read_more = !switch_io_after_read
end

attr_reader :path
attr_reader :log, :pe, :read_lines_limit, :open_on_every_update
attr_reader :log, :pe, :read_lines_limit, :open_on_every_update, :read_more
attr_reader :from_encoding, :encoding
attr_reader :stat_trigger, :enable_watch_timer, :enable_stat_watcher
attr_accessor :timer_trigger
Expand Down Expand Up @@ -738,7 +742,7 @@ def handle_notify
end
if @lines.size >= @watcher.read_lines_limit
# not to use too much memory in case the file is very large
read_more = true
read_more = @watcher.read_more
break
end
end
Expand Down
5 changes: 3 additions & 2 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true)
assert_equal "#{TMP_DIR}/tail.pos", d.instance.pos_file
assert_equal 1000, d.instance.read_lines_limit
assert_equal false, d.instance.ignore_repeated_permission_error
assert_false d.instance.switch_io_after_read
end

data("empty" => config_element,
Expand Down Expand Up @@ -1061,7 +1062,7 @@ def test_z_refresh_watchers
Timecop.freeze(2010, 1, 2, 3, 4, 5) do
flexstub(Fluent::Plugin::TailInput::TailWatcher) do |watcherclass|
EX_PATHS.each do |path|
watcherclass.should_receive(:new).with(path, EX_ROTATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, true, 1000, any, any, any, any, any, any).once.and_return do
watcherclass.should_receive(:new).with(path, EX_ROTATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, true, 1000, any, any, any, any, any, any, any).once.and_return do
flexmock('TailWatcher') { |watcher|
watcher.should_receive(:attach).once
watcher.should_receive(:unwatched=).zero_or_more_times
Expand All @@ -1079,7 +1080,7 @@ def test_z_refresh_watchers

Timecop.freeze(2010, 1, 2, 3, 4, 6) do
flexstub(Fluent::Plugin::TailInput::TailWatcher) do |watcherclass|
watcherclass.should_receive(:new).with('test/plugin/data/2010/01/20100102-030406.log', EX_ROTATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, true, 1000, any, any, any, any, any, any).once.and_return do
watcherclass.should_receive(:new).with('test/plugin/data/2010/01/20100102-030406.log', EX_ROTATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, true, 1000, any, any, any, any, any, any, any).once.and_return do
flexmock('TailWatcher') do |watcher|
watcher.should_receive(:attach).once
watcher.should_receive(:unwatched=).zero_or_more_times
Expand Down

0 comments on commit 520f89b

Please sign in to comment.