Skip to content

Commit

Permalink
Merge pull request #3553 from kenhys/fix-loss-buffer-chunk
Browse files Browse the repository at this point in the history
Fix a bug that BufferChunkOverflowError cause a whole data loss
  • Loading branch information
ashie committed Nov 22, 2021
2 parents f03bb92 + 1e400c1 commit 9e904e6
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 6 deletions.
44 changes: 39 additions & 5 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,14 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
unstaged_chunks = {} # metadata => [chunk, chunk, ...]
chunks_to_enqueue = []
staged_bytesizes_by_chunk = {}
# track internal BufferChunkOverflowError in write_step_by_step
buffer_chunk_overflow_errors = []

begin
# sort metadata to get lock of chunks in same order with other threads
metadata_and_data.keys.sort.each do |metadata|
data = metadata_and_data[metadata]
write_once(metadata, data, format: format, size: size) do |chunk, adding_bytesize|
write_once(metadata, data, format: format, size: size) do |chunk, adding_bytesize, error|
chunk.mon_enter # add lock to prevent to be committed/rollbacked from other threads
operated_chunks << chunk
if chunk.staged?
Expand All @@ -352,6 +354,9 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
unstaged_chunks[metadata] ||= []
unstaged_chunks[metadata] << chunk
end
if error && !error.empty?
buffer_chunk_overflow_errors << error
end
end
end

Expand Down Expand Up @@ -444,6 +449,10 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
end
chunk.mon_exit rescue nil # this may raise ThreadError for chunks already committed
end
unless buffer_chunk_overflow_errors.empty?
# Notify delayed BufferChunkOverflowError here
raise BufferChunkOverflowError, buffer_chunk_overflow_errors.join(", ")
end
end
end

Expand Down Expand Up @@ -716,6 +725,7 @@ def write_once(metadata, data, format: nil, size: nil, &block)

def write_step_by_step(metadata, data, format, splits_count, &block)
splits = []
errors = []
if splits_count > data.size
splits_count = data.size
end
Expand Down Expand Up @@ -761,18 +771,41 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
begin
while writing_splits_index < splits.size
split = splits[writing_splits_index]
formatted_split = format ? format.call(split) : split.first
if split.size == 1 && original_bytesize == 0
if format == nil && @compress != :text
# The actual size of chunk is not determined until after chunk.append.
# so, keep already processed 'split' content here.
# (allow performance regression a bit)
chunk.commit
else
big_record_size = formatted_split.bytesize
if chunk.bytesize + big_record_size > @chunk_limit_size
errors << "a #{big_record_size} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size"
writing_splits_index += 1
next
end
end
end

if format
chunk.concat(format.call(split), split.size)
chunk.concat(formatted_split, split.size)
else
chunk.append(split, compress: @compress)
end

if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over?
adding_bytes = chunk.instance_eval { @adding_bytes } || "N/A" # 3rd party might not have 'adding_bytes'
chunk.rollback

if split.size == 1 && original_bytesize == 0
big_record_size = format ? format.call(split).bytesize : split.first.bytesize
raise BufferChunkOverflowError, "a #{big_record_size}bytes record is larger than buffer chunk limit size"
# It is obviously case that BufferChunkOverflowError should be raised here,
# but if it raises here, already processed 'split' or
# the proceeding 'split' will be lost completely.
# so it is a last resort to delay raising such a exception
errors << "a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size"
writing_splits_index += 1
next
end

if chunk_size_full?(chunk) || split.size == 1
Expand All @@ -795,7 +828,8 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
raise
end

block.call(chunk, chunk.bytesize - original_bytesize)
block.call(chunk, chunk.bytesize - original_bytesize, errors)
errors = []
end
end
rescue ShouldRetry
Expand Down
73 changes: 72 additions & 1 deletion test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class DummyOutputPlugin < Fluent::Plugin::Base
end
class DummyMemoryChunkError < StandardError; end
class DummyMemoryChunk < Fluent::Plugin::Buffer::MemoryChunk
attr_reader :append_count, :rollbacked, :closed, :purged
attr_reader :append_count, :rollbacked, :closed, :purged, :chunk
attr_accessor :failing
def initialize(metadata, compress: :text)
super
Expand Down Expand Up @@ -944,6 +944,52 @@ def create_chunk_es(metadata, es)
@p.write({@dm0 => es}, format: @format)
end
end

data(
first_chunk: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}]]),
intermediate_chunk: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}]]),
last_chunk: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}]]),
multiple_chunks: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}]])
)
test '#write exceeds chunk_limit_size, raise BufferChunkOverflowError, but not lost whole messages' do |(es)|
assert_equal [@dm0], @p.stage.keys
assert_equal [], @p.queue.map(&:metadata)

assert_equal 1_280_000, @p.chunk_limit_size

nth = []
es.entries.each_with_index do |entry, index|
if entry.last["message"].size == @p.chunk_limit_size
nth << index
end
end
messages = []
nth.each do |n|
messages << "a 1280025 bytes record (nth: #{n}) is larger than buffer chunk limit size"
end

assert_raise Fluent::Plugin::Buffer::BufferChunkOverflowError.new(messages.join(", ")) do
@p.write({@dm0 => es}, format: @format)
end
# message a and b are concatenated and staged
staged_messages = Fluent::MessagePackFactory.msgpack_unpacker.feed_each(@p.stage[@dm0].chunk).collect do |record|
record.last
end
assert_equal([2, [{"message" => "a"}, {"message" => "b"}]],
[@p.stage[@dm0].size, staged_messages])
# only es0 message is queued
assert_equal [@dm0], @p.queue.map(&:metadata)
assert_equal [5000], @p.queue.map(&:size)
end
end

sub_test_case 'custom format with configuration for test with lower chunk limit size' do
Expand Down Expand Up @@ -1201,6 +1247,7 @@ def create_chunk_es(metadata, es)
sub_test_case 'when compress is gzip' do
setup do
@p = create_buffer({'compress' => 'gzip'})
@dm0 = create_metadata(Time.parse('2016-04-11 16:00:00 +0000').to_i, nil, nil)
end

test '#compress returns :gzip' do
Expand All @@ -1211,6 +1258,30 @@ def create_chunk_es(metadata, es)
chunk = @p.generate_chunk(create_metadata)
assert chunk.singleton_class.ancestors.include?(Fluent::Plugin::Buffer::Chunk::Decompressable)
end

test '#write compressed data which exceeds chunk_limit_size, it raises BufferChunkOverflowError' do
@p = create_buffer({'compress' => 'gzip', 'chunk_limit_size' => 70})
timestamp = event_time('2016-04-11 16:00:02 +0000')
es = Fluent::ArrayEventStream.new([[timestamp, {"message" => "012345"}], # overflow
[timestamp, {"message" => "aaa"}],
[timestamp, {"message" => "bbb"}]])
assert_equal [], @p.queue.map(&:metadata)
assert_equal 70, @p.chunk_limit_size

# calculate the actual boundary value. it varies on machine
c = @p.generate_chunk(create_metadata)
c.append(Fluent::ArrayEventStream.new([[timestamp, {"message" => "012345"}]]), compress: :gzip)
overflow_bytes = c.bytesize

messages = "a #{overflow_bytes} bytes record (nth: 0) is larger than buffer chunk limit size"
assert_raise Fluent::Plugin::Buffer::BufferChunkOverflowError.new(messages) do
# test format == nil && compress == :gzip
@p.write({@dm0 => es})
end
# message a and b occupies each chunks in full, so both of messages are queued (no staged chunk)
assert_equal([2, [@dm0, @dm0], [1, 1], nil],
[@p.queue.size, @p.queue.map(&:metadata), @p.queue.map(&:size), @p.stage[@dm0]])
end
end

sub_test_case '#statistics' do
Expand Down

0 comments on commit 9e904e6

Please sign in to comment.