Skip to content

Commit

Permalink
Split complicated clause into read lines limits and log throttling
Browse files Browse the repository at this point in the history
limits

Signed-off-by: Hiroshi Hatake <hatake@calyptia.com>
  • Loading branch information
cosmo0920 committed May 11, 2021
1 parent 478b487 commit 147ffbf
Showing 1 changed file with 20 additions and 14 deletions.
34 changes: 20 additions & 14 deletions lib/fluent/plugin/in_tail.rb
Expand Up @@ -928,16 +928,21 @@ def opened?

private

def need_to_get_out_for_log_ingestion_loop?(start_reading)
# sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion
time_spent_reading = Fluent::Clock.now - start_reading
@log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}")
if time_spent_reading < 1
needed_sleeping_time = 1 - time_spent_reading
@log.debug("log ingestion for `#{@path}' is suspended temporary. Read it again later.")
return true
def limit_bytes_per_second_reached?(start_reading, number_bytes_read)
if number_bytes_read >= @read_bytes_limit_per_second && @read_bytes_limit_per_second > 0
# sleep to stop reading files when we reach the read bytes per second limit, to throttle the log ingestion
time_spent_reading = Fluent::Clock.now - start_reading
@log.debug("time_spent_reading: #{time_spent_reading} #{ @watcher.path}")
if time_spent_reading < 1
needed_sleeping_time = 1 - time_spent_reading
@log.debug("log ingestion for `#{@path}' is suspended temporary. Read it again later.")
return true
end

false
else
false
end
false
end

def handle_notify
Expand All @@ -955,14 +960,15 @@ def handle_notify
@fifo << data
@fifo.read_lines(@lines)

limit_bytes_per_second_reached = (number_bytes_read >= @read_bytes_limit_per_second && @read_bytes_limit_per_second > 0)
@log.debug("reading file: #{@path}")
if @lines.size >= @read_lines_limit || limit_bytes_per_second_reached
if @lines.size >= @read_lines_limit
# not to use too much memory in case the file is very large
read_more = true

read_more = false if limit_bytes_per_second_reached && need_to_get_out_for_log_ingestion_loop?(start_reading)

break
end
if limit_bytes_per_second_reached?(start_reading, number_bytes_read)
# Just get out from tailing loop.
read_more = false
break
end
end
Expand Down

0 comments on commit 147ffbf

Please sign in to comment.