Skip to content

Commit

Permalink
buffer: minimize data lost by overflow exception for compressed data
Browse files Browse the repository at this point in the history
In the previous commit, it assumed format is specified and
not compressed case.

In this commit, it supports compressed data, too.

NOTE: In the test case, the boundary value which cause the overflow is
calculated in the advance because there is a case that the value
varies on machine.

See fluent#3553 (comment)

Signed-off-by: Kentaro Hayashi <hayashi@clear-code.com>
  • Loading branch information
kenhys committed Nov 22, 2021
1 parent 46dad90 commit 8727c8d
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 5 deletions.
29 changes: 24 additions & 5 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -773,11 +773,18 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
split = splits[writing_splits_index]
formatted_split = format ? format.call(split) : split.first
if split.size == 1 && original_bytesize == 0
big_record_size = formatted_split.bytesize
if chunk.bytesize + big_record_size > @chunk_limit_size
errors << "a #{big_record_size} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size"
writing_splits_index += 1
next
if format == nil && @compress != :text
# The actual size of chunk is not determined until after chunk.append.
# so, keep already processed 'split' content here.
# (allow performance regression a bit)
chunk.commit
elsif format
big_record_size = formatted_split.bytesize
if chunk.bytesize + big_record_size > @chunk_limit_size
errors << "a #{big_record_size} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size"
writing_splits_index += 1
next
end
end
end

Expand All @@ -788,8 +795,19 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
end

if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over?
adding_bytes = chunk.instance_eval { @adding_bytes } || "N/A" # 3rd party might not have 'adding_bytes'
chunk.rollback

if split.size == 1 && original_bytesize == 0
# It is obviously case that BufferChunkOverflowError should be raised here,
# but if it raises here, already processed 'split' or
# the proceeding 'split' will be lost completely.
# so it is a last resort to delay raising such a exception
errors << "a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size"
writing_splits_index += 1
next
end

if chunk_size_full?(chunk) || split.size == 1
enqueue_chunk_before_retry = true
else
Expand All @@ -811,6 +829,7 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
end

block.call(chunk, chunk.bytesize - original_bytesize, errors)
errors = []
end
end
rescue ShouldRetry
Expand Down
25 changes: 25 additions & 0 deletions test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1247,6 +1247,7 @@ def create_chunk_es(metadata, es)
sub_test_case 'when compress is gzip' do
setup do
@p = create_buffer({'compress' => 'gzip'})
@dm0 = create_metadata(Time.parse('2016-04-11 16:00:00 +0000').to_i, nil, nil)
end

test '#compress returns :gzip' do
Expand All @@ -1257,6 +1258,30 @@ def create_chunk_es(metadata, es)
chunk = @p.generate_chunk(create_metadata)
assert chunk.singleton_class.ancestors.include?(Fluent::Plugin::Buffer::Chunk::Decompressable)
end

test '#write compressed data which exceeds chunk_limit_size, it raises BufferChunkOverflowError' do
@p = create_buffer({'compress' => 'gzip', 'chunk_limit_size' => 70})
timestamp = event_time('2016-04-11 16:00:02 +0000')
es = Fluent::ArrayEventStream.new([[timestamp, {"message" => "012345"}], # overflow
[timestamp, {"message" => "aaa"}],
[timestamp, {"message" => "bbb"}]])
assert_equal [], @p.queue.map(&:metadata)
assert_equal 70, @p.chunk_limit_size

# calculate the actual boundary value. it varies on machine
c = @p.generate_chunk(create_metadata)
c.append(Fluent::ArrayEventStream.new([[timestamp, {"message" => "012345"}]]), compress: :gzip)
overflow_bytes = c.bytesize

messages = "a #{overflow_bytes} bytes record (nth: 0) is larger than buffer chunk limit size"
assert_raise Fluent::Plugin::Buffer::BufferChunkOverflowError.new(messages) do
# test format == nil && compress == :gzip
@p.write({@dm0 => es})
end
# message a and b occupies each chunks in full, so both of messages are queued (no staged chunk)
assert_equal([2, [@dm0, @dm0], [1, 1], nil],
[@p.queue.size, @p.queue.map(&:metadata), @p.queue.map(&:size), @p.stage[@dm0]])
end
end

sub_test_case '#statistics' do
Expand Down

0 comments on commit 8727c8d

Please sign in to comment.