In the previous version, if BufferChunkOverflowError occurs,
whole data is not sent because of raised exception.
For example, incoming partial data exceeds chunk_limit_size,
"message a" and "message b" is lost.
message a\n
longer.....message data exceeds chunk_limit_size\n
message b\n
It is not appropriate because the problem exists only a partial split
data.
In this commit, when BufferChunkOverflowError is detected in
write_step_by_step, collect errors and propagate it to write method
scope.
By this way, a valid partial split is processed and not lost.
Here is the micro benchmark result:
Before:
Warming up --------------------------------------
no BufferChunkOverflowError
11.389k i/100ms
raise BufferChunkOverflowError in every 100 calls
3.976k i/100ms
Calculating -------------------------------------
no BufferChunkOverflowError
114.973k (± 2.6%) i/s - 1.150M in 10.012362s
raise BufferChunkOverflowError in every 100 calls
40.086k (± 3.8%) i/s - 401.576k in 10.031710s
Comparison:
no BufferChunkOverflowError: 114972.7 i/s
raise BufferChunkOverflowError in every 100 calls: 40086.1 i/s - 2.87x (± 0.00) slower
After:
Warming up --------------------------------------
no BufferChunkOverflowError
11.491k i/100ms
raise BufferChunkOverflowError in every 100 calls
4.690k i/100ms
Calculating -------------------------------------
no BufferChunkOverflowError
114.295k (± 1.6%) i/s - 1.149M in 10.056319s
raise BufferChunkOverflowError in every 100 calls
48.076k (± 9.0%) i/s - 478.380k in 10.030903s
Comparison:
no BufferChunkOverflowError: 114294.8 i/s
raise BufferChunkOverflowError in every 100 calls: 48076.2 i/s - 2.38x (± 0.00) slower
It seems that it has a bit performance drawbacks above test
case. Compare Before:
Calculating and After: Calculating section, but almost same.
Each with BufferChunkOverflowError and without
BufferChunkOverflowError micro benchmark, almost same.
Here is the micro benchmark test case:
test '#write BufferChunkOverflowError micro benchmark' do
require 'benchmark/ips'
es = Fluent::ArrayEventStream.new([ [event_time('2016-04-11 16:00:02 +0000'), {"message" => "x"}] ])
large = Fluent::ArrayEventStream.new([ [event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}] ])
Benchmark.ips do |x|
x.config(:time => 10, :warmup => 2)
x.report("no BufferChunkOverflowError") do
@p.write({@dm0 => es}, format: @Format)
end
x.report("raise BufferChunkOverflowError in every 100 calls") do |n|
n.times do |i|
if i % 100 == 0
assert_raise Fluent::Plugin::Buffer::BufferChunkOverflowError do
@p.write({@dm0 => large}, format: @Format)
end
else
@p.write({@dm0 => es}, format: @Format)
end
end
end
x.compare!
end
end
Signed-off-by: Kentaro Hayashi <hayashi@clear-code.com>