Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[For #3649] Fix timeout handling #3686

Merged
merged 2 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 8 additions & 8 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1290,16 +1290,17 @@ def update_retry_state(chunk_id, using_secondary, error = nil)

# @retry exists

# Ensure that the current time is greater than or equal to @retry.next_time to avoid the situation when
# @retry.step is called almost as many times as the number of flush threads in a short time.
if Time.now >= @retry.next_time
@retry.step
else
@retry.recalc_next_time # to prevent all flush threads from retrying at the same time
end

if @retry.limit?
handle_limit_reached(error)
else
# Ensure that the current time is greater than or equal to @retry.next_time to avoid the situation when
# @retry.step is called almost as many times as the number of flush threads in a short time.
if Time.now >= @retry.next_time
@retry.step
else
@retry.recalc_next_time # to prevent all flush threads from retrying at the same time
end
if error
if using_secondary
msg = "failed to flush the buffer with secondary output."
Expand All @@ -1311,7 +1312,6 @@ def update_retry_state(chunk_id, using_secondary, error = nil)
log.warn_backtrace error.backtrace
end
end
handle_limit_reached(error) if @retry.limit_step?
end
end
end
Expand Down
27 changes: 12 additions & 15 deletions lib/fluent/plugin_helper/retry_state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ def initialize(title, wait, timeout, forever, max_steps, randomize, randomize_wi

@timeout = timeout
@timeout_at = @start + timeout
@has_reached_timeout = false
@has_timed_out = false
@current = :primary

if randomize_width < 0 || randomize_width > 0.5
Expand Down Expand Up @@ -123,32 +125,27 @@ def step
@current = :secondary
@secondary_transition_steps = @steps
end

@next_time = calc_next_time

if @has_reached_timeout
@has_timed_out = @next_time >= @timeout_at
else
@has_reached_timeout = @next_time >= @timeout_at
end

nil
end

def recalc_next_time
@next_time = calc_next_time
end

# Use @next_time for time by default to keep backward compatibility
def limit?(time: @next_time, steps: @steps)
timeout?(time) || limit_step?(steps)
end

def timeout?(time = current_time)
if @forever
false
else
time >= @timeout_at
end
end

def limit_step?(steps = @steps)
def limit?
if @forever
false
else
!!(@max_steps && steps >= @max_steps)
@has_timed_out || !!(@max_steps && @steps >= @max_steps)
end
end
end
Expand Down
23 changes: 10 additions & 13 deletions test/plugin_helper/test_retry_state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def ==(obj)
override_current_time(s, s.next_time)
s.step
assert_equal s.timeout_at, s.next_time
s.step
assert s.limit?
end

Expand All @@ -115,7 +116,6 @@ def ==(obj)
assert_equal 5, i
override_current_time(s, s.next_time)
s.step
assert_equal (s.current_time + 3), s.next_time
assert s.limit?
end

Expand Down Expand Up @@ -179,7 +179,9 @@ def ==(obj)
assert s.secondary?

s.step
assert_equal s.timeout_at, s.next_time
assert_equal s.timeout_at, s.next_time # 100

s.step
assert s.limit?
end

Expand Down Expand Up @@ -285,6 +287,7 @@ def ==(obj)
assert_equal 3, s.steps
assert_equal s.timeout_at, s.next_time

s.step
assert s.limit?
end

Expand Down Expand Up @@ -334,8 +337,6 @@ def ==(obj)
override_current_time(s, s.next_time)
s.step
assert_equal 6, s.steps
assert_equal (s.current_time + 10), s.next_time

assert s.limit?
end

Expand Down Expand Up @@ -405,7 +406,9 @@ def ==(obj)
assert s.secondary?

s.step
assert_equal s.timeout_at, s.next_time
assert_equal s.timeout_at, s.next_time # 100

s.step
assert s.limit?
end

Expand Down Expand Up @@ -988,15 +991,9 @@ def ==(obj)
msg << "[#{next_elapsed}s elapsed point] #{retry_count}th-Retry(#{s.secondary? ? "SEC" : "PRI"}) is triggered.\n"

# Update retry statement
if s.limit?
msg << "--- Reach limit of timeout. ---\n"
break
end

s.step

if s.limit_step?
msg << "--- Reach limit of max step. ---\n"
if s.limit?
msg << "--- Reach limit. ---\n"
break
end
end
Expand Down