Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix a bug that BufferChunkOverflowError cause a whole data loss
In the previous version, if BufferChunkOverflowError occurs, whole data is not sent because of raised exception. For example, incoming partial data exceeds chunk_limit_size, "message a" and "message b" is lost. message a\n longer.....message data exceeds chunk_limit_size\n message b\n It is not appropriate because the problem exists only a partial split data. In this commit, detect BufferChunkOverflowError and postpone a timing to raise exception. Thus a valid partial split is processed as intended. Here is the micro benchmark result: It has slightly performance drawbacks, but not so critical. Before: Calculating ------------------------------------- no BufferChunkOverflowError 116.903k (± 1.4%) i/s - 1.172M in 10.030026s raise BufferChunkOverflowError in every 100 calls 40.906k (± 4.1%) i/s - 411.060k in 10.065067s Comparison: no BufferChunkOverflowError: 116903.2 i/s raise BufferChunkOverflowError in every 100 calls: 40905.8 i/s - 2.86x (± 0.00) slower After: Calculating ------------------------------------- no BufferChunkOverflowError 116.276k (± 1.1%) i/s - 1.163M in 10.007341s raise BufferChunkOverflowError in every 100 calls 40.179k (± 4.9%) i/s - 401.472k in 10.016561s Comparison: no BufferChunkOverflowError: 116276.1 i/s raise BufferChunkOverflowError in every 100 calls: 40179.0 i/s - 2.89x (± 0.00) slower Here is the micro benchmark test case: test '#write BufferChunkOverflowError micro benchmark' do require 'benchmark/ips' es = Fluent::ArrayEventStream.new([ [event_time('2016-04-11 16:00:02 +0000'), {"message" => "x"}] ]) large = Fluent::ArrayEventStream.new([ [event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}] ]) Benchmark.ips do |x| x.config(:time => 10, :warmup => 2) x.report("no BufferChunkOverflowError") do @p.write({@dm0 => es}, format: @Format) end x.report("raise BufferChunkOverflowError in every 100 calls") do |n| n.times do |i| if i % 100 == 0 assert_raise Fluent::Plugin::Buffer::BufferChunkOverflowError do @p.write({@dm0 => large}, format: @Format) end else @p.write({@dm0 => es}, format: @Format) end end end x.compare! end end Signed-off-by: Kentaro Hayashi <hayashi@clear-code.com>
- Loading branch information