Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ignore_if_prev_successes to copy plugin #3190

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 10 additions & 3 deletions lib/fluent/plugin/out_copy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ class CopyOutput < MultiOutput
desc 'Pass different record to each `store` plugin by specified method'
config_param :copy_mode, :enum, list: [:no_copy, :shallow, :deep, :marshal], default: :no_copy

attr_reader :ignore_errors
attr_reader :ignore_errors, :ignore_if_prev_successes

def initialize
super
@ignore_errors = []
@ignore_if_prev_successes = []
end

def configure(conf)
Expand All @@ -40,6 +41,7 @@ def configure(conf)
@copy_proc = gen_copy_proc
@stores.each { |store|
@ignore_errors << (store.arg == 'ignore_error')
@ignore_if_prev_successes << (store.arg == 'ignore_if_prev_success')
TiagoGoddard marked this conversation as resolved.
Show resolved Hide resolved
}
end

Expand All @@ -55,10 +57,15 @@ def process(tag, es)
}
es = m
end

success = []
TiagoGoddard marked this conversation as resolved.
Show resolved Hide resolved
outputs.each_with_index do |output, i|
begin
output.emit_events(tag, @copy_proc ? @copy_proc.call(es) : es)
if i > 0 && success[i - 1] && @ignore_if_prev_successes[i]
log.info "ignore copy because prev_success in #{output.plugin_id}", index: i
TiagoGoddard marked this conversation as resolved.
Show resolved Hide resolved
else
output.emit_events(tag, @copy_proc ? @copy_proc.call(es) : es)
success[i] = true;
TiagoGoddard marked this conversation as resolved.
Show resolved Hide resolved
end
rescue => e
if @ignore_errors[i]
log.error "ignore emit error in #{output.plugin_id}", error: e
Expand Down
32 changes: 32 additions & 0 deletions test/plugin/test_out_copy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -217,5 +217,37 @@ def test_ignore_error
end
end
end

IGNORE_IF_PREV_SUCCESS_CONFIG = %[
<store ignore_error>
@type test
name c0
</store>
<store ignore_if_prev_success ignore_error>
@type test
name c1
</store>
<store ignore_if_prev_success>
@type test
name c2
</store>
]

def test_ignore_if_prev_success
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this code test ignore_if_prev_success feature?
This code is copied from ignore_error and no modification, so this code tests ignore_error.
This code should test 2nd/3rd <store>'s process is not called.

d = create_driver(IGNORE_IF_PREV_SUCCESS_CONFIG)

# override to raise an error
d.instance.outputs[0].define_singleton_method(:process) do |tag, es|
raise ArgumentError, 'Failed'
end

time = Time.parse("2011-01-02 13:14:15 UTC").to_i
assert_nothing_raised do
d.run(default_tag: 'test') do
d.feed(time, {"a"=>1})
end
end
end

end