Skip to content

Commit

Permalink
in_tail: Ensure to reach EOF after rotate if throttling is enabled
Browse files Browse the repository at this point in the history
Fix #2478

Signed-off-by: Takuro Ashie <ashie@clear-code.com>
  • Loading branch information
ashie committed Jun 10, 2021
1 parent 980425e commit 2515497
Showing 1 changed file with 32 additions and 1 deletion.
33 changes: 32 additions & 1 deletion lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -522,8 +522,25 @@ def detach_watcher(tw, ino, close_io = true)
def detach_watcher_after_rotate_wait(tw, ino)
# Call event_loop_attach/event_loop_detach is high-cost for short-live object.
# If this has a problem with large number of files, use @_event_loop directly instead of timer_execute.
timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do
if @open_on_every_update
# Detach now because it's already closed, waiting it doesn't make sense.
detach_watcher(tw, ino)
elsif @read_bytes_limit_per_second < 0
# throttling isn't enabled, just wait @rotate_wait
timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do
detach_watcher(tw, ino)
end
else
# When the throttling feature is enabled, it might not reach EOF yet.
# Should ensure to read all contents before closing it, with keeping throttling.
start_time_to_wait = Fluent::Clock.now
timer = timer_execute(:in_tail_close_watcher, 1, repeat: true) do
elapsed = Fluent::Clock.now - start_time_to_wait
if tw.eof? && elapsed >= @rotate_wait
timer.detach
detach_watcher(tw, ino)
end
end
end
end

Expand Down Expand Up @@ -736,6 +753,10 @@ def close
end
end

def eof?
@io_handler.eof?
end

def on_notify
begin
stat = Fluent::FileWrapper.stat(@path)
Expand Down Expand Up @@ -923,6 +944,7 @@ def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:,
@shutdown_start_time = nil
@shutdown_timeout = SHUTDOWN_TIMEOUT
@shutdown_mutex = Mutex.new
@eof = false

@log.info "following tail of #{@path}"
end
Expand All @@ -949,6 +971,10 @@ def opened?
!!@io
end

def eof?
@eof
end

private

def limit_bytes_per_second_reached?
Expand Down Expand Up @@ -989,6 +1015,7 @@ def handle_notify
while true
@start_reading_time ||= Fluent::Clock.now
data = io.readpartial(BYTES_TO_READ, @iobuf)
@eof = false
@number_bytes_read += data.bytesize
@fifo << data
@fifo.read_lines(@lines)
Expand All @@ -1006,6 +1033,7 @@ def handle_notify
end
end
rescue EOFError
@eof = true
end
end

Expand Down Expand Up @@ -1043,14 +1071,17 @@ def with_io
else
@io ||= open
yield @io
@eof = true if @io.nil?
end
rescue WatcherSetupError => e
close
@eof = true
raise e
rescue
@log.error $!.to_s
@log.error_backtrace
close
@eof = true
end
end

Expand Down

0 comments on commit 2515497

Please sign in to comment.