Skip to content

Commit

Permalink
Fix a bug that BufferChunkOverflowError cause a whole data loss
Browse files Browse the repository at this point in the history
In the previous version, if BufferChunkOverflowError occurs,
whole data is not sent because of exception.
It is not appropriate because the problem exists only a partial split
data.

In this commit, detect BufferChunkOverflowError and postpone a timing
to raise it as an exception.
So a valid partial split is processed as intended.

Signed-off-by: Kentaro Hayashi <hayashi@clear-code.com>
  • Loading branch information
kenhys committed Nov 10, 2021
1 parent abca46f commit 8ee2874
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 15 deletions.
58 changes: 43 additions & 15 deletions lib/fluent/plugin/buffer.rb
Expand Up @@ -332,26 +332,33 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
unstaged_chunks = {} # metadata => [chunk, chunk, ...]
chunks_to_enqueue = []
staged_bytesizes_by_chunk = {}
# track internal BufferChunkOverflowError in write_step_by_step
@delay_buffer_chunk_overflow_error = nil

begin
# sort metadata to get lock of chunks in same order with other threads
metadata_and_data.keys.sort.each do |metadata|
data = metadata_and_data[metadata]
write_once(metadata, data, format: format, size: size) do |chunk, adding_bytesize|
chunk.mon_enter # add lock to prevent to be committed/rollbacked from other threads
operated_chunks << chunk
if chunk.staged?
#
# https://github.com/fluent/fluentd/issues/2712
# write_once is supposed to write to a chunk only once
# but this block **may** run multiple times from write_step_by_step and previous write may be rollbacked
# So we should be counting the stage_size only for the last successful write
#
staged_bytesizes_by_chunk[chunk] = adding_bytesize
elsif chunk.unstaged?
unstaged_chunks[metadata] ||= []
unstaged_chunks[metadata] << chunk
begin
write_once(metadata, data, format: format, size: size) do |chunk, adding_bytesize|
chunk.mon_enter # add lock to prevent to be committed/rollbacked from other threads
operated_chunks << chunk
if chunk.staged?
#
# https://github.com/fluent/fluentd/issues/2712
# write_once is supposed to write to a chunk only once
# but this block **may** run multiple times from write_step_by_step and previous write may be rollbacked
# So we should be counting the stage_size only for the last successful write
#
staged_bytesizes_by_chunk[chunk] = adding_bytesize
elsif chunk.unstaged?
unstaged_chunks[metadata] ||= []
unstaged_chunks[metadata] << chunk
end
end
rescue BufferChunkOverflowError
# If BufferChunkOverflowError is not caught here, operated_chunks will be lost
# So only do catch exception here.
end
end

Expand Down Expand Up @@ -444,6 +451,12 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
end
chunk.mon_exit rescue nil # this may raise ThreadError for chunks already committed
end
if @delay_buffer_chunk_overflow_error
# Notify delayed chunk overflow here
message = @delay_buffer_chunk_overflow_error
@delay_buffer_chunk_overflow_error = nil
raise BufferChunkOverflowError, message
end
end
end

Expand Down Expand Up @@ -761,6 +774,12 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
begin
while writing_splits_index < splits.size
split = splits[writing_splits_index]
if split.size == 1 && original_bytesize == 0 && (chunk.bytesize + split.first.to_msgpack.bytesize) > @chunk_limit_size
# If it will be predictable that BufferChunkOverflowError occur,
# commit current chunk to keep it, otherwise valid 'split' data can be lost by
# the following chunk.rollback
chunk.commit
end
if format
chunk.concat(format.call(split), split.size)
else
Expand All @@ -772,7 +791,12 @@ def write_step_by_step(metadata, data, format, splits_count, &block)

if split.size == 1 && original_bytesize == 0
big_record_size = format ? format.call(split).bytesize : split.first.bytesize
raise BufferChunkOverflowError, "a #{big_record_size}bytes record is larger than buffer chunk limit size"
writing_splits_index += 1
# It is obviously BufferChunkOverflowError here, but if it raises here,
# already processed 'split' or the proceeding 'split' will be lost completely.
# so it is a last resort to delay raising such a exception
@delay_buffer_chunk_overflow_error = "a #{big_record_size}bytes record is larger than buffer chunk limit size"
next
end

if chunk_size_full?(chunk) || split.size == 1
Expand Down Expand Up @@ -807,6 +831,10 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
end
enqueue_chunk(metadata) if enqueue_chunk_before_retry
retry
ensure
if @delay_buffer_chunk_overflow_error
raise BufferChunkOverflowError, @delay_buffer_chunk_limit_error
end
end

STATS_KEYS = [
Expand Down
27 changes: 27 additions & 0 deletions test/plugin/test_buffer.rb
Expand Up @@ -944,6 +944,33 @@ def create_chunk_es(metadata, es)
@p.write({@dm0 => es}, format: @format)
end
end

data(
first_chunk: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a" * 1000}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "b" * 2000}]]),
intermediate_chunk: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a" * 1000}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "b" * 2000}]]),
last_chunk: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a" * 1000}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "b" * 2000}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}]])
)
test '#write exceeds chunk_limit_size, raise BufferChunkOverflowError, but not lost whole messages' do |(es)|
assert_equal [@dm0], @p.stage.keys
assert_equal [], @p.queue.map(&:metadata)

assert_equal 1_280_000, @p.chunk_limit_size

assert_raise Fluent::Plugin::Buffer::BufferChunkOverflowError do
@p.write({@dm0 => es}, format: @format)
end
# message a and b are concatenated and staged
assert_equal 2, @p.stage[@dm0].size
# only es0 message is queued
assert_equal [@dm0], @p.queue.map(&:metadata)
assert_equal [5000], @p.queue.map(&:size)
end
end

sub_test_case 'custom format with configuration for test with lower chunk limit size' do
Expand Down

0 comments on commit 8ee2874

Please sign in to comment.