Skip to content

Commit

Permalink
Don't raise exception when each message size is smaller than chunk_li…
Browse files Browse the repository at this point in the history
…mit_size

Follow up fluent#3553

In the previous versions, even though each record size is smaller
than chunk limit size, but whole message size exceeds
chunk limit size, BufferChunkOverflowError is raised unexpectedly.

For example, if chunk limit size is 1_280_000, process 3 event
stream (every 1_000_00 bytes), it throws an exception like this:

  Fluent::Plugin::Buffer::BufferChunkOverflowError(<a 1000025 bytes
  record (nth: 1) is larger than buffer chunk limit size, a 1000025
  bytes record (nth: 2) is larger than buffer chunk limit size>)

Now changed not to raise exception if it's record size is smaller
enough than chunk limit size.

The idea is based on that adding byte size is smaller than chunk
limit size, chunk should be unstaged and pushed into queue,
If not, it should be skipped like fluent#3553.

Signed-off-by: Kentaro Hayashi <hayashi@clear-code.com>
  • Loading branch information
kenhys committed Nov 22, 2021
1 parent 9e904e6 commit 1d9716e
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 11 deletions.
30 changes: 21 additions & 9 deletions lib/fluent/plugin/buffer.rb
Expand Up @@ -780,10 +780,13 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
chunk.commit
else
big_record_size = formatted_split.bytesize
if chunk.bytesize + big_record_size > @chunk_limit_size
errors << "a #{big_record_size} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size"
if big_record_size > @chunk_limit_size
errors << "a #{big_record_size} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
elsif chunk.bytesize + big_record_size > @chunk_limit_size
# keep already processed 'split' content here.
chunk.commit
end
end
end
Expand All @@ -799,13 +802,22 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
chunk.rollback

if split.size == 1 && original_bytesize == 0
# It is obviously case that BufferChunkOverflowError should be raised here,
# but if it raises here, already processed 'split' or
# the proceeding 'split' will be lost completely.
# so it is a last resort to delay raising such a exception
errors << "a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size"
writing_splits_index += 1
next
if adding_bytes < @chunk_limit_size
# already processed content is kept after rollback, then unstaged chunk should be queued.
# After that, re-process current split again (new chunk should be allocated, so modify @stage and so on)
@stage.delete(modified_metadata)
staged_chunk_used = false
chunk.unstaged!
break
else
# It is obviously case that BufferChunkOverflowError should be raised here,
# but if it raises here, already processed 'split' or
# the proceeding 'split' will be lost completely.
# so it is a last resort to delay raising such a exception
errors << "a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
end
end

if chunk_size_full?(chunk) || split.size == 1
Expand Down
44 changes: 42 additions & 2 deletions test/plugin/test_buffer.rb
Expand Up @@ -974,7 +974,7 @@ def create_chunk_es(metadata, es)
end
messages = []
nth.each do |n|
messages << "a 1280025 bytes record (nth: #{n}) is larger than buffer chunk limit size"
messages << "a 1280025 bytes record (nth: #{n}) is larger than buffer chunk limit size (1280000)"
end

assert_raise Fluent::Plugin::Buffer::BufferChunkOverflowError.new(messages.join(", ")) do
Expand All @@ -990,6 +990,46 @@ def create_chunk_es(metadata, es)
assert_equal [@dm0], @p.queue.map(&:metadata)
assert_equal [5000], @p.queue.map(&:size)
end

test "confirm bunch of messages which is smaller than chunk threshold does not raise BufferChunkOverflowError" do
assert_equal [@dm0], @p.stage.keys
assert_equal [], @p.queue.map(&:metadata)
timestamp = event_time('2016-04-11 16:00:02 +0000')
es = Fluent::ArrayEventStream.new([[timestamp, {"message" => "a" * 1_000_000}],
[timestamp, {"message" => "b" * 1_000_000}],
[timestamp, {"message" => "c" * 1_000_000}]])

# https://github.com/fluent/fluentd/issues/1849
# Even though 1_000_000 < 1_280_000 (chunk_limit_size), it raised BufferChunkOverflowError before.
# It should not be raised and message a,b,c should be stored into 3 chunks.
assert_nothing_raised do
@p.write({@dm0 => es}, format: @format)
end
messages = []
# pick up first letter to check whether chunk is queued in expected order
3.times do |index|
chunk = @p.queue[index]
es = Fluent::MessagePackEventStream.new(chunk.chunk)
es.ensure_unpacked!
records = es.instance_eval{ @unpacked_records }
records.each do |record|
messages << record["message"][0]
end
end
# message a and b are queued, message c is staged
assert_equal([
[@dm0],
[@dm0, @dm0, @dm0],
[5000, 1, 1],
[["x"] * 5000, "a", "b"].flatten
],
[
@p.stage.keys,
@p.queue.map(&:metadata),
@p.queue.map(&:size),
messages
])
end
end

sub_test_case 'custom format with configuration for test with lower chunk limit size' do
Expand Down Expand Up @@ -1273,7 +1313,7 @@ def create_chunk_es(metadata, es)
c.append(Fluent::ArrayEventStream.new([[timestamp, {"message" => "012345"}]]), compress: :gzip)
overflow_bytes = c.bytesize

messages = "a #{overflow_bytes} bytes record (nth: 0) is larger than buffer chunk limit size"
messages = "a #{overflow_bytes} bytes record (nth: 0) is larger than buffer chunk limit size (70)"
assert_raise Fluent::Plugin::Buffer::BufferChunkOverflowError.new(messages) do
# test format == nil && compress == :gzip
@p.write({@dm0 => es})
Expand Down

0 comments on commit 1d9716e

Please sign in to comment.