Skip to content

Commit

Permalink
buffer: Make clear the code for checking BufferChunkOverflow
Browse files Browse the repository at this point in the history
Checking original_bytesize isn't needed to detect a big record than
chunk_limit_size.

Signed-off-by: Takuro Ashie <ashie@clear-code.com>
  • Loading branch information
ashie committed Dec 23, 2021
1 parent aad2feb commit 5d93f36
Showing 1 changed file with 32 additions and 28 deletions.
60 changes: 32 additions & 28 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -771,26 +771,33 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
begin
while writing_splits_index < splits.size
split = splits[writing_splits_index]
formatted_split = format ? format.call(split) : split.first
if split.size == 1 && original_bytesize == 0
if format == nil && @compress != :text
# The actual size of chunk is not determined until after chunk.append.
# so, keep already processed 'split' content here.
# (allow performance regression a bit)
formatted_split = format ? format.call(split) : nil

if split.size == 1 # Check BufferChunkOverflowError
determined_bytesize = nil
if @compress != :text
determined_bytesize = nil
elsif formatted_split
determined_bytesize = formatted_split.bytesize
elsif split.first.respond_to?(:bytesize)
determined_bytesize = split.first.bytesize
end

if determined_bytesize && determined_bytesize > @chunk_limit_size
# It is a obvious case that BufferChunkOverflowError should be raised here.
# But if it raises here, already processed 'split' or
# the proceeding 'split' will be lost completely.
# So it is a last resort to delay raising such a exception
errors << "a #{determined_bytesize} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
end

if determined_bytesize.nil? || chunk.bytesize + determined_bytesize > @chunk_limit_size
# The split will (might) cause size over so keep already processed
# 'split' content here (allow performance regression a bit).
chunk.commit
committed_bytesize = chunk.bytesize
else
big_record_size = formatted_split.bytesize
if big_record_size > @chunk_limit_size
# Just skip to next split (current split is ignored)
errors << "a #{big_record_size} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
elsif chunk.bytesize + big_record_size > @chunk_limit_size
# No doubt that the split is expected to cause size over, keep 'split' content here.
chunk.commit
committed_bytesize = chunk.bytesize
end
end
end

Expand All @@ -804,23 +811,20 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over?
chunk.rollback
committed_bytesize = chunk.bytesize
if split.size == 1 && original_bytesize == 0
if adding_bytes < @chunk_limit_size

if split.size == 1 # Check BufferChunkOverflowError again
if adding_bytes > @chunk_limit_size
errors << "concatenated/appended a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
else
# As already processed content is kept after rollback, then unstaged chunk should be queued.
# After that, re-process current split again.
# New chunk should be allocated, to do it, modify @stage and so on.
synchronize { @stage.delete(modified_metadata) }
staged_chunk_used = false
chunk.unstaged!
break
else
# It is obviously case that BufferChunkOverflowError should be raised here,
# but if it raises here, already processed 'split' or
# the proceeding 'split' will be lost completely.
# so it is a last resort to delay raising such a exception
errors << "concatenated/appended a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
end
end

Expand Down

0 comments on commit 5d93f36

Please sign in to comment.