Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't raise exception when each message size is smaller than chunk_limit_size #3560

Merged
merged 2 commits into from
Dec 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
66 changes: 43 additions & 23 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -767,24 +767,37 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
raise ShouldRetry unless chunk.writable?
staged_chunk_used = true if chunk.staged?

original_bytesize = chunk.bytesize
original_bytesize = committed_bytesize = chunk.bytesize
begin
while writing_splits_index < splits.size
split = splits[writing_splits_index]
formatted_split = format ? format.call(split) : split.first
if split.size == 1 && original_bytesize == 0
if format == nil && @compress != :text
# The actual size of chunk is not determined until after chunk.append.
# so, keep already processed 'split' content here.
# (allow performance regression a bit)
formatted_split = format ? format.call(split) : nil

if split.size == 1 # Check BufferChunkOverflowError
determined_bytesize = nil
if @compress != :text
determined_bytesize = nil
elsif formatted_split
determined_bytesize = formatted_split.bytesize
elsif split.first.respond_to?(:bytesize)
determined_bytesize = split.first.bytesize
end

if determined_bytesize && determined_bytesize > @chunk_limit_size
# It is a obvious case that BufferChunkOverflowError should be raised here.
# But if it raises here, already processed 'split' or
# the proceeding 'split' will be lost completely.
# So it is a last resort to delay raising such a exception
errors << "a #{determined_bytesize} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
end

if determined_bytesize.nil? || chunk.bytesize + determined_bytesize > @chunk_limit_size
# The split will (might) cause size over so keep already processed
# 'split' content here (allow performance regression a bit).
chunk.commit
else
big_record_size = formatted_split.bytesize
if chunk.bytesize + big_record_size > @chunk_limit_size
errors << "a #{big_record_size} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
end
committed_bytesize = chunk.bytesize
end
end

Expand All @@ -793,19 +806,26 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
else
chunk.append(split, compress: @compress)
end
adding_bytes = chunk.bytesize - committed_bytesize

if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over?
adding_bytes = chunk.instance_eval { @adding_bytes } || "N/A" # 3rd party might not have 'adding_bytes'
chunk.rollback
committed_bytesize = chunk.bytesize

if split.size == 1 && original_bytesize == 0
# It is obviously case that BufferChunkOverflowError should be raised here,
# but if it raises here, already processed 'split' or
# the proceeding 'split' will be lost completely.
# so it is a last resort to delay raising such a exception
errors << "concatenated/appended a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
if split.size == 1 # Check BufferChunkOverflowError again
if adding_bytes > @chunk_limit_size
errors << "concatenated/appended a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
else
# As already processed content is kept after rollback, then unstaged chunk should be queued.
ashie marked this conversation as resolved.
Show resolved Hide resolved
# After that, re-process current split again.
# New chunk should be allocated, to do it, modify @stage and so on.
synchronize { @stage.delete(modified_metadata) }
staged_chunk_used = false
chunk.unstaged!
break
Comment on lines +824 to +827
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this unstage the current chunk?

end
end

if chunk_size_full?(chunk) || split.size == 1
Expand Down
77 changes: 77 additions & 0 deletions test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,51 @@ def create_chunk_es(metadata, es)
assert_equal [@dm0], @p.queue.map(&:metadata)
assert_equal [5000], @p.queue.map(&:size)
end

test "confirm that every message which is smaller than chunk threshold does not raise BufferChunkOverflowError" do
assert_equal [@dm0], @p.stage.keys
assert_equal [], @p.queue.map(&:metadata)
timestamp = event_time('2016-04-11 16:00:02 +0000')
es = Fluent::ArrayEventStream.new([[timestamp, {"message" => "a" * 1_000_000}],
[timestamp, {"message" => "b" * 1_000_000}],
[timestamp, {"message" => "c" * 1_000_000}]])

# https://github.com/fluent/fluentd/issues/1849
# Even though 1_000_000 < 1_280_000 (chunk_limit_size), it raised BufferChunkOverflowError before.
# It should not be raised and message a,b,c should be stored into 3 chunks.
assert_nothing_raised do
@p.write({@dm0 => es}, format: @format)
end
messages = []
# pick up first letter to check whether chunk is queued in expected order
3.times do |index|
chunk = @p.queue[index]
es = Fluent::MessagePackEventStream.new(chunk.chunk)
es.ensure_unpacked!
records = es.instance_eval{ @unpacked_records }
records.each do |record|
messages << record["message"][0]
end
end
es = Fluent::MessagePackEventStream.new(@p.stage[@dm0].chunk)
es.ensure_unpacked!
staged_message = es.instance_eval{ @unpacked_records }.first["message"]
# message a and b are queued, message c is staged
assert_equal([
[@dm0],
"c" * 1_000_000,
[@dm0, @dm0, @dm0],
[5000, 1, 1],
[["x"] * 5000, "a", "b"].flatten
],
[
@p.stage.keys,
staged_message,
@p.queue.map(&:metadata),
@p.queue.map(&:size),
messages
])
end
end

sub_test_case 'custom format with configuration for test with lower chunk limit size' do
Expand Down Expand Up @@ -1078,6 +1123,38 @@ def create_chunk_es(metadata, es)
@p.write({@dm0 => es})
end
end

test 'confirm that every array message which is smaller than chunk threshold does not raise BufferChunkOverflowError' do
assert_equal [@dm0], @p.stage.keys
assert_equal [], @p.queue.map(&:metadata)

assert_equal 1_280_000, @p.chunk_limit_size

es = ["a" * 1_000_000, "b" * 1_000_000, "c" * 1_000_000]
assert_nothing_raised do
@p.write({@dm0 => es})
end
queue_messages = @p.queue.collect do |chunk|
# collect first character of each message
chunk.chunk[0]
end
assert_equal([
[@dm0],
1,
"c",
[@dm0, @dm0, @dm0],
[5000, 1, 1],
["x", "a", "b"]
],
[
@p.stage.keys,
@p.stage[@dm0].size,
@p.stage[@dm0].chunk[0],
@p.queue.map(&:metadata),
@p.queue.map(&:size),
queue_messages
])
end
end

sub_test_case 'with configuration for test with lower limits' do
Expand Down