Skip to content

Commit

Permalink
Merge pull request #3560 from kenhys/fix-lower-chunk-overflow
Browse files Browse the repository at this point in the history
Don't raise exception when each message size is smaller than chunk_limit_size
  • Loading branch information
ashie committed Dec 24, 2021
2 parents 85ff1a3 + 5d93f36 commit 8229ed9
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 23 deletions.
66 changes: 43 additions & 23 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -767,24 +767,37 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
raise ShouldRetry unless chunk.writable?
staged_chunk_used = true if chunk.staged?

original_bytesize = chunk.bytesize
original_bytesize = committed_bytesize = chunk.bytesize
begin
while writing_splits_index < splits.size
split = splits[writing_splits_index]
formatted_split = format ? format.call(split) : split.first
if split.size == 1 && original_bytesize == 0
if format == nil && @compress != :text
# The actual size of chunk is not determined until after chunk.append.
# so, keep already processed 'split' content here.
# (allow performance regression a bit)
formatted_split = format ? format.call(split) : nil

if split.size == 1 # Check BufferChunkOverflowError
determined_bytesize = nil
if @compress != :text
determined_bytesize = nil
elsif formatted_split
determined_bytesize = formatted_split.bytesize
elsif split.first.respond_to?(:bytesize)
determined_bytesize = split.first.bytesize
end

if determined_bytesize && determined_bytesize > @chunk_limit_size
# It is a obvious case that BufferChunkOverflowError should be raised here.
# But if it raises here, already processed 'split' or
# the proceeding 'split' will be lost completely.
# So it is a last resort to delay raising such a exception
errors << "a #{determined_bytesize} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
end

if determined_bytesize.nil? || chunk.bytesize + determined_bytesize > @chunk_limit_size
# The split will (might) cause size over so keep already processed
# 'split' content here (allow performance regression a bit).
chunk.commit
else
big_record_size = formatted_split.bytesize
if chunk.bytesize + big_record_size > @chunk_limit_size
errors << "a #{big_record_size} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
end
committed_bytesize = chunk.bytesize
end
end

Expand All @@ -793,19 +806,26 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
else
chunk.append(split, compress: @compress)
end
adding_bytes = chunk.bytesize - committed_bytesize

if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over?
adding_bytes = chunk.instance_eval { @adding_bytes } || "N/A" # 3rd party might not have 'adding_bytes'
chunk.rollback
committed_bytesize = chunk.bytesize

if split.size == 1 && original_bytesize == 0
# It is obviously case that BufferChunkOverflowError should be raised here,
# but if it raises here, already processed 'split' or
# the proceeding 'split' will be lost completely.
# so it is a last resort to delay raising such a exception
errors << "concatenated/appended a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
if split.size == 1 # Check BufferChunkOverflowError again
if adding_bytes > @chunk_limit_size
errors << "concatenated/appended a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
else
# As already processed content is kept after rollback, then unstaged chunk should be queued.
# After that, re-process current split again.
# New chunk should be allocated, to do it, modify @stage and so on.
synchronize { @stage.delete(modified_metadata) }
staged_chunk_used = false
chunk.unstaged!
break
end
end

if chunk_size_full?(chunk) || split.size == 1
Expand Down
77 changes: 77 additions & 0 deletions test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,51 @@ def create_chunk_es(metadata, es)
assert_equal [@dm0], @p.queue.map(&:metadata)
assert_equal [5000], @p.queue.map(&:size)
end

test "confirm that every message which is smaller than chunk threshold does not raise BufferChunkOverflowError" do
assert_equal [@dm0], @p.stage.keys
assert_equal [], @p.queue.map(&:metadata)
timestamp = event_time('2016-04-11 16:00:02 +0000')
es = Fluent::ArrayEventStream.new([[timestamp, {"message" => "a" * 1_000_000}],
[timestamp, {"message" => "b" * 1_000_000}],
[timestamp, {"message" => "c" * 1_000_000}]])

# https://github.com/fluent/fluentd/issues/1849
# Even though 1_000_000 < 1_280_000 (chunk_limit_size), it raised BufferChunkOverflowError before.
# It should not be raised and message a,b,c should be stored into 3 chunks.
assert_nothing_raised do
@p.write({@dm0 => es}, format: @format)
end
messages = []
# pick up first letter to check whether chunk is queued in expected order
3.times do |index|
chunk = @p.queue[index]
es = Fluent::MessagePackEventStream.new(chunk.chunk)
es.ensure_unpacked!
records = es.instance_eval{ @unpacked_records }
records.each do |record|
messages << record["message"][0]
end
end
es = Fluent::MessagePackEventStream.new(@p.stage[@dm0].chunk)
es.ensure_unpacked!
staged_message = es.instance_eval{ @unpacked_records }.first["message"]
# message a and b are queued, message c is staged
assert_equal([
[@dm0],
"c" * 1_000_000,
[@dm0, @dm0, @dm0],
[5000, 1, 1],
[["x"] * 5000, "a", "b"].flatten
],
[
@p.stage.keys,
staged_message,
@p.queue.map(&:metadata),
@p.queue.map(&:size),
messages
])
end
end

sub_test_case 'custom format with configuration for test with lower chunk limit size' do
Expand Down Expand Up @@ -1078,6 +1123,38 @@ def create_chunk_es(metadata, es)
@p.write({@dm0 => es})
end
end

test 'confirm that every array message which is smaller than chunk threshold does not raise BufferChunkOverflowError' do
assert_equal [@dm0], @p.stage.keys
assert_equal [], @p.queue.map(&:metadata)

assert_equal 1_280_000, @p.chunk_limit_size

es = ["a" * 1_000_000, "b" * 1_000_000, "c" * 1_000_000]
assert_nothing_raised do
@p.write({@dm0 => es})
end
queue_messages = @p.queue.collect do |chunk|
# collect first character of each message
chunk.chunk[0]
end
assert_equal([
[@dm0],
1,
"c",
[@dm0, @dm0, @dm0],
[5000, 1, 1],
["x", "a", "b"]
],
[
@p.stage.keys,
@p.stage[@dm0].size,
@p.stage[@dm0].chunk[0],
@p.queue.map(&:metadata),
@p.queue.map(&:size),
queue_messages
])
end
end

sub_test_case 'with configuration for test with lower limits' do
Expand Down

0 comments on commit 8229ed9

Please sign in to comment.