Skip to content

Commit

Permalink
Fix a bug that BufferChunkOverflowError cause a whole data loss
Browse files Browse the repository at this point in the history
In the previous version, if BufferChunkOverflowError occurs,
whole data is not sent because of raised exception.

For example, incoming partial data exceeds chunk_limit_size,
"message a" and "message b" is lost.

  message a\n
  longer.....message data exceeds chunk_limit_size\n
  message b\n

It is not appropriate because the problem exists only a partial split
data.

In this commit, when BufferChunkOverflowError is detected in
write_step_by_step, collect errors and propagate it to write method
scope.
By this way, a valid partial split is processed and not lost.

Here is the micro benchmark result:

It seems that it has no performance drawbacks.
Compare Before: Calculating and After: Calculating section.
Each with BufferChunkOverflowError, and without
BufferChunkOverflowError
micro benchmark, almost same.

Before:

  Warming up --------------------------------------
  no BufferChunkOverflowError
                          11.488k i/100ms
  raise BufferChunkOverflowError in every 100 calls
                           4.110k i/100ms
  Calculating -------------------------------------
  no BufferChunkOverflowError
                          113.353k (± 1.0%) i/s -      1.137M in  10.034300s
  raise BufferChunkOverflowError in every 100 calls
                           40.599k (± 3.5%) i/s -    406.890k in  10.034076s

  Comparison:
  no BufferChunkOverflowError:   113353.4 i/s
  raise BufferChunkOverflowError in every 100 calls:    40599.5 i/s - 2.79x  (± 0.00) slower

After:

  Warming up --------------------------------------
  no BufferChunkOverflowError
                          11.399k i/100ms
  raise BufferChunkOverflowError in every 100 calls
                           4.301k i/100ms
  Calculating -------------------------------------
  no BufferChunkOverflowError
                          113.855k (± 1.1%) i/s -      1.140M in  10.013090s
  raise BufferChunkOverflowError in every 100 calls
                           43.683k (± 4.0%) i/s -    438.702k in  10.059251s

  Comparison:
  no BufferChunkOverflowError:   113854.6 i/s
  raise BufferChunkOverflowError in every 100 calls:    43683.3 i/s - 2.61x  (± 0.00) slower

Here is the micro benchmark test case:

  test '#write BufferChunkOverflowError micro benchmark' do
    require 'benchmark/ips'
    es = Fluent::ArrayEventStream.new([ [event_time('2016-04-11 16:00:02 +0000'), {"message" => "x"}] ])
    large = Fluent::ArrayEventStream.new([ [event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}] ])
    Benchmark.ips do |x|
      x.config(:time => 10, :warmup => 2)
      x.report("no BufferChunkOverflowError") do
        @p.write({@dm0 => es}, format: @Format)
      end
      x.report("raise BufferChunkOverflowError in every 100 calls") do |n|
        n.times do |i|
          if i % 100 == 0
            assert_raise Fluent::Plugin::Buffer::BufferChunkOverflowError do
              @p.write({@dm0 => large}, format: @Format)
            end
          else
            @p.write({@dm0 => es}, format: @Format)
          end
        end
      end
      x.compare!
    end
  end

Signed-off-by: Kentaro Hayashi <hayashi@clear-code.com>
  • Loading branch information
kenhys committed Nov 16, 2021
1 parent f03bb92 commit 2722b79
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 6 deletions.
27 changes: 22 additions & 5 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,14 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
unstaged_chunks = {} # metadata => [chunk, chunk, ...]
chunks_to_enqueue = []
staged_bytesizes_by_chunk = {}
# track internal BufferChunkOverflowError in write_step_by_step
buffer_chunk_overflow_errors = []

begin
# sort metadata to get lock of chunks in same order with other threads
metadata_and_data.keys.sort.each do |metadata|
data = metadata_and_data[metadata]
write_once(metadata, data, format: format, size: size) do |chunk, adding_bytesize|
write_once(metadata, data, format: format, size: size) do |chunk, adding_bytesize, error|
chunk.mon_enter # add lock to prevent to be committed/rollbacked from other threads
operated_chunks << chunk
if chunk.staged?
Expand All @@ -352,6 +354,9 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
unstaged_chunks[metadata] ||= []
unstaged_chunks[metadata] << chunk
end
if error and not error.empty?
buffer_chunk_overflow_errors << error
end
end
end

Expand Down Expand Up @@ -444,6 +449,10 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
end
chunk.mon_exit rescue nil # this may raise ThreadError for chunks already committed
end
unless buffer_chunk_overflow_errors.empty?
# Notify delayed BufferChunkOverflowError here
raise BufferChunkOverflowError, buffer_chunk_overflow_errors.join(", ")
end
end
end

Expand Down Expand Up @@ -716,6 +725,7 @@ def write_once(metadata, data, format: nil, size: nil, &block)

def write_step_by_step(metadata, data, format, splits_count, &block)
splits = []
errors = []
if splits_count > data.size
splits_count = data.size
end
Expand Down Expand Up @@ -768,11 +778,18 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
end

if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over?
chunk.rollback

if split.size == 1 && original_bytesize == 0
big_record_size = format ? format.call(split).bytesize : split.first.bytesize
raise BufferChunkOverflowError, "a #{big_record_size}bytes record is larger than buffer chunk limit size"
# It is obviously case that BufferChunkOverflowError should be raised here,
# but if it raises here, already processed 'split' or
# the proceeding 'split' will be lost completely.
# so it is a last resort to delay raising such a exception
errors << "a #{big_record_size} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size"
writing_splits_index += 1
chunk.stepback
next
else
chunk.rollback
end

if chunk_size_full?(chunk) || split.size == 1
Expand All @@ -795,7 +812,7 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
raise
end

block.call(chunk, chunk.bytesize - original_bytesize)
block.call(chunk, chunk.bytesize - original_bytesize, errors)
end
end
rescue ShouldRetry
Expand Down
18 changes: 18 additions & 0 deletions lib/fluent/plugin/buffer/memory_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ def initialize(metadata, compress: :text)
@chunk_bytes = 0
@adding_bytes = 0
@adding_size = 0
@stepback_size = 0
@stepback_bytes = 0
end

def concat(bulk, bulk_size)
Expand All @@ -35,6 +37,8 @@ def concat(bulk, bulk_size)
@chunk << bulk
@adding_bytes += bulk.bytesize
@adding_size += bulk_size
@stepback_bytes = bulk.bytesize
@stepback_size = bulk_size
true
end

Expand All @@ -45,12 +49,25 @@ def commit
@adding_bytes = @adding_size = 0
@modified_at = Fluent::Clock.real_now
@modified_at_object = nil
@stepback_bytes = @stepback_size = 0
true
end

def stepback
@adding_bytes -= @stepback_bytes
@adding_size -= @stepback_size
@stepback_size = @stepback_bytes = 0
if @adding_bytes == 0 && @chunk_bytes == @adding_bytes
@chunk = ''.force_encoding("ASCII-8BIT")
else
@chunk = @chunk.slice(@chunk_bytes, @adding_bytes)
end
end

def rollback
@chunk.slice!(@chunk_bytes, @adding_bytes)
@adding_bytes = @adding_size = 0
@stepback_bytes = @stepback_size = 0
true
end

Expand All @@ -70,6 +87,7 @@ def purge
super
@chunk = ''.force_encoding("ASCII-8BIT")
@chunk_bytes = @size = @adding_bytes = @adding_size = 0
@stepback_size = @stepback_bytes = 0
true
end

Expand Down
52 changes: 51 additions & 1 deletion test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class DummyOutputPlugin < Fluent::Plugin::Base
end
class DummyMemoryChunkError < StandardError; end
class DummyMemoryChunk < Fluent::Plugin::Buffer::MemoryChunk
attr_reader :append_count, :rollbacked, :closed, :purged
attr_reader :append_count, :rollbacked, :closed, :purged, :chunk, :stepback
attr_accessor :failing
def initialize(metadata, compress: :text)
super
Expand All @@ -45,6 +45,10 @@ def purge
super
@purged = true
end
def stepback
super
@stepback = true
end
end
class DummyPlugin < Fluent::Plugin::Buffer
def create_metadata(timekey=nil, tag=nil, variables=nil)
Expand Down Expand Up @@ -944,6 +948,52 @@ def create_chunk_es(metadata, es)
@p.write({@dm0 => es}, format: @format)
end
end

data(
first_chunk: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}]]),
intermediate_chunk: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}]]),
last_chunk: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}]]),
multiple_chunks: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}]])
)
test '#write exceeds chunk_limit_size, raise BufferChunkOverflowError, but not lost whole messages' do |(es)|
assert_equal [@dm0], @p.stage.keys
assert_equal [], @p.queue.map(&:metadata)

assert_equal 1_280_000, @p.chunk_limit_size

nth = []
es.entries.each_with_index do |entry, index|
if entry.last["message"].size == @p.chunk_limit_size
nth << index
end
end
messages = []
nth.each do |n|
messages << "a 1280025 bytes record (nth: #{n}) is larger than buffer chunk limit size"
end

assert_raise Fluent::Plugin::Buffer::BufferChunkOverflowError.new(messages.join(", ")) do
@p.write({@dm0 => es}, format: @format)
end
# message a and b are concatenated and staged
staged_messages = Fluent::MessagePackFactory.msgpack_unpacker.feed_each(@p.stage[@dm0].chunk).collect do |record|
record.last
end
assert_equal([2, staged_messages],
[@p.stage[@dm0].size, [{"message" => "a"}, {"message" => "b"}]])
# only es0 message is queued
assert_equal [@dm0], @p.queue.map(&:metadata)
assert_equal [5000], @p.queue.map(&:size)
end
end

sub_test_case 'custom format with configuration for test with lower chunk limit size' do
Expand Down

0 comments on commit 2722b79

Please sign in to comment.