Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revise feat ignore_if_prev_successes to copy plugin #3287

Merged
merged 6 commits into from
Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 18 additions & 5 deletions lib/fluent/plugin/out_copy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,28 @@ class CopyOutput < MultiOutput
desc 'Pass different record to each `store` plugin by specified method'
config_param :copy_mode, :enum, list: [:no_copy, :shallow, :deep, :marshal], default: :no_copy

attr_reader :ignore_errors
attr_reader :ignore_errors, :ignore_if_prev_successes

def initialize
super
@ignore_errors = []
@ignore_if_prev_successes = []
end

def configure(conf)
super

@copy_proc = gen_copy_proc
@stores.each { |store|
@ignore_errors << (store.arg == 'ignore_error')
@stores.each_with_index { |store, i|
if i == 0 && store.arg.include?('ignore_if_prev_success')
raise Fluent::ConfigError, "ignore_if_prev_success must specify 2nd or later <store> directives"
end
@ignore_errors << (store.arg.include?('ignore_error'))
@ignore_if_prev_successes << (store.arg.include?('ignore_if_prev_success'))
}
if @ignore_errors.uniq.size == 1 && @ignore_errors.include?(true) && @ignore_if_prev_successes.include?(false)
log.warn "ignore_errors are specified in all <store>, but ignore_if_prev_success is not specified. Is this intended?"
end
end

def multi_workers_ready?
Expand All @@ -55,10 +63,15 @@ def process(tag, es)
}
es = m
end

success = Array.new(outputs.size)
outputs.each_with_index do |output, i|
begin
output.emit_events(tag, @copy_proc ? @copy_proc.call(es) : es)
if i > 0 && success[i - 1] && @ignore_if_prev_successes[i]
cosmo0920 marked this conversation as resolved.
Show resolved Hide resolved
log.debug "ignore copy because prev_success in #{output.plugin_id}", index: i
else
output.emit_events(tag, @copy_proc ? @copy_proc.call(es) : es)
success[i] = true
end
rescue => e
if @ignore_errors[i]
log.error "ignore emit error in #{output.plugin_id}", error: e
Expand Down
87 changes: 87 additions & 0 deletions test/plugin/test_out_copy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
require 'fluent/test/driver/multi_output'
require 'fluent/plugin/out_copy'
require 'fluent/event'
require 'flexmock/test_unit'

class CopyOutputTest < Test::Unit::TestCase
include FlexMock::TestCase

class << self
def startup
$LOAD_PATH.unshift File.expand_path(File.join(File.dirname(__FILE__), '..', 'scripts'))
Expand Down Expand Up @@ -54,6 +57,48 @@ def test_configure
assert_equal :no_copy, d.instance.copy_mode
end

ERRORNEOUS_IGNORE_IF_PREV_SUCCESS_CONFIG = %[
<store ignore_if_prev_success ignore_error>
@type test
name c0
</store>
<store ignore_if_prev_success ignore_error>
@type test
name c1
</store>
<store ignore_if_prev_success>
@type test
name c2
</store>
]
def test_configure_with_errorneus_ignore_if_prev_success
assert_raise(Fluent::ConfigError) do
create_driver(ERRORNEOUS_IGNORE_IF_PREV_SUCCESS_CONFIG)
end
end

ALL_IGNORE_ERROR_WITHOUT_IGNORE_IF_PREV_SUCCESS_CONFIG = %[
@log_level info
<store ignore_error>
@type test
name c0
</store>
<store ignore_error>
@type test
name c1
</store>
<store ignore_error>
@type test
name c2
</store>
]
def test_configure_all_ignore_errors_without_ignore_if_prev_success
d = create_driver(ALL_IGNORE_ERROR_WITHOUT_IGNORE_IF_PREV_SUCCESS_CONFIG)
expected = /ignore_errors are specified in all <store>, but ignore_if_prev_success is not specified./
matches = d.logs.grep(expected)
assert_equal(1, matches.length, "Logs do not contain '#{expected}' '#{d.logs}'")
end

def test_configure_with_deep_copy_and_use_shallow_copy_mode
d = create_driver(%[
deep_copy true
Expand Down Expand Up @@ -217,5 +262,47 @@ def test_ignore_error
end
end
end

IGNORE_IF_PREV_SUCCESS_CONFIG = %[
<store ignore_error>
@type test
name c0
</store>
<store ignore_if_prev_success ignore_error>
@type test
name c1
</store>
<store ignore_if_prev_success>
@type test
name c2
</store>
]

def test_ignore_if_prev_success
d = create_driver(IGNORE_IF_PREV_SUCCESS_CONFIG)

# override to raise an error
d.instance.outputs[0].define_singleton_method(:process) do |tag, es|
raise ArgumentError, 'Failed'
end

# check ingore_if_prev_success functionality:
# 1. output 2 is succeeded.
# 2. output 3 is not called.
flexstub(d.instance.outputs[1]) do |output|
cosmo0920 marked this conversation as resolved.
Show resolved Hide resolved
output.should_receive(:process).once
end
flexstub(d.instance.outputs[2]) do |output|
output.should_receive(:process).never
end

time = Time.parse("2011-01-02 13:14:15 UTC").to_i
assert_nothing_raised do
d.run(default_tag: 'test') do
d.feed(time, {"a"=>1})
end
end
end

end