Skip to content

Commit

Permalink
Merge pull request #3287 from fluent/revise-feat-ignore-prev-success
Browse files Browse the repository at this point in the history
Revise feat ignore_if_prev_successes to copy plugin
  • Loading branch information
cosmo0920 committed Mar 9, 2021
2 parents 19f18cf + 3041046 commit dcc47e8
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 5 deletions.
23 changes: 18 additions & 5 deletions lib/fluent/plugin/out_copy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,28 @@ class CopyOutput < MultiOutput
desc 'Pass different record to each `store` plugin by specified method'
config_param :copy_mode, :enum, list: [:no_copy, :shallow, :deep, :marshal], default: :no_copy

attr_reader :ignore_errors
attr_reader :ignore_errors, :ignore_if_prev_successes

def initialize
super
@ignore_errors = []
@ignore_if_prev_successes = []
end

def configure(conf)
super

@copy_proc = gen_copy_proc
@stores.each { |store|
@ignore_errors << (store.arg == 'ignore_error')
@stores.each_with_index { |store, i|
if i == 0 && store.arg.include?('ignore_if_prev_success')
raise Fluent::ConfigError, "ignore_if_prev_success must specify 2nd or later <store> directives"
end
@ignore_errors << (store.arg.include?('ignore_error'))
@ignore_if_prev_successes << (store.arg.include?('ignore_if_prev_success'))
}
if @ignore_errors.uniq.size == 1 && @ignore_errors.include?(true) && @ignore_if_prev_successes.include?(false)
log.warn "ignore_errors are specified in all <store>, but ignore_if_prev_success is not specified. Is this intended?"
end
end

def multi_workers_ready?
Expand All @@ -55,10 +63,15 @@ def process(tag, es)
}
es = m
end

success = Array.new(outputs.size)
outputs.each_with_index do |output, i|
begin
output.emit_events(tag, @copy_proc ? @copy_proc.call(es) : es)
if i > 0 && success[i - 1] && @ignore_if_prev_successes[i]
log.debug "ignore copy because prev_success in #{output.plugin_id}", index: i
else
output.emit_events(tag, @copy_proc ? @copy_proc.call(es) : es)
success[i] = true
end
rescue => e
if @ignore_errors[i]
log.error "ignore emit error in #{output.plugin_id}", error: e
Expand Down
87 changes: 87 additions & 0 deletions test/plugin/test_out_copy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
require 'fluent/test/driver/multi_output'
require 'fluent/plugin/out_copy'
require 'fluent/event'
require 'flexmock/test_unit'

class CopyOutputTest < Test::Unit::TestCase
include FlexMock::TestCase

class << self
def startup
$LOAD_PATH.unshift File.expand_path(File.join(File.dirname(__FILE__), '..', 'scripts'))
Expand Down Expand Up @@ -54,6 +57,48 @@ def test_configure
assert_equal :no_copy, d.instance.copy_mode
end

ERRORNEOUS_IGNORE_IF_PREV_SUCCESS_CONFIG = %[
<store ignore_if_prev_success ignore_error>
@type test
name c0
</store>
<store ignore_if_prev_success ignore_error>
@type test
name c1
</store>
<store ignore_if_prev_success>
@type test
name c2
</store>
]
def test_configure_with_errorneus_ignore_if_prev_success
assert_raise(Fluent::ConfigError) do
create_driver(ERRORNEOUS_IGNORE_IF_PREV_SUCCESS_CONFIG)
end
end

ALL_IGNORE_ERROR_WITHOUT_IGNORE_IF_PREV_SUCCESS_CONFIG = %[
@log_level info
<store ignore_error>
@type test
name c0
</store>
<store ignore_error>
@type test
name c1
</store>
<store ignore_error>
@type test
name c2
</store>
]
def test_configure_all_ignore_errors_without_ignore_if_prev_success
d = create_driver(ALL_IGNORE_ERROR_WITHOUT_IGNORE_IF_PREV_SUCCESS_CONFIG)
expected = /ignore_errors are specified in all <store>, but ignore_if_prev_success is not specified./
matches = d.logs.grep(expected)
assert_equal(1, matches.length, "Logs do not contain '#{expected}' '#{d.logs}'")
end

def test_configure_with_deep_copy_and_use_shallow_copy_mode
d = create_driver(%[
deep_copy true
Expand Down Expand Up @@ -217,5 +262,47 @@ def test_ignore_error
end
end
end

IGNORE_IF_PREV_SUCCESS_CONFIG = %[
<store ignore_error>
@type test
name c0
</store>
<store ignore_if_prev_success ignore_error>
@type test
name c1
</store>
<store ignore_if_prev_success>
@type test
name c2
</store>
]

def test_ignore_if_prev_success
d = create_driver(IGNORE_IF_PREV_SUCCESS_CONFIG)

# override to raise an error
d.instance.outputs[0].define_singleton_method(:process) do |tag, es|
raise ArgumentError, 'Failed'
end

# check ingore_if_prev_success functionality:
# 1. output 2 is succeeded.
# 2. output 3 is not called.
flexstub(d.instance.outputs[1]) do |output|
output.should_receive(:process).once
end
flexstub(d.instance.outputs[2]) do |output|
output.should_receive(:process).never
end

time = Time.parse("2011-01-02 13:14:15 UTC").to_i
assert_nothing_raised do
d.run(default_tag: 'test') do
d.feed(time, {"a"=>1})
end
end
end

end

0 comments on commit dcc47e8

Please sign in to comment.