Skip to content

Commit

Permalink
Merge pull request #2853 from ganmacs/fix-for-queued_num
Browse files Browse the repository at this point in the history
When chunks are queued, their `metadata.seq` should be 0
  • Loading branch information
ganmacs committed Mar 4, 2020
2 parents 1fa83cb + 8881a9e commit da256cd
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 7 deletions.
1 change: 1 addition & 0 deletions lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def resume
# these chunks(unstaged chunks) has shared the same metadata
# So perform enqueue step again https://github.com/fluent/fluentd/blob/9d113029d4550ce576d8825bfa9612aa3e55bff0/lib/fluent/plugin/buffer.rb#L364
if chunk_size_full?(chunk) || stage.key?(chunk.metadata)
chunk.metadata.seq = 0 # metadata.seq should be 0 for counting @queued_num
queue << chunk.enqueued!
else
stage[chunk.metadata] = chunk
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
u = unstaged_chunks[m].pop
u.synchronize do
if u.unstaged? && !chunk_size_full?(u)
# `u.metadata.seq` and `m.seq` can be different but Buffer#enqueue_chunk expect them to be the same value
u.metadata.seq = 0
synchronize {
@stage[m] = u.staged!
@stage_size += u.bytesize
Expand Down Expand Up @@ -426,6 +428,7 @@ def enqueue_chunk(metadata)
if chunk.empty?
chunk.close
else
chunk.metadata.seq = 0 # metadata.seq should be 0 for counting @queued_num
@queue << chunk
@queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1
chunk.enqueued!
Expand All @@ -444,6 +447,7 @@ def enqueue_unstaged_chunk(chunk)
synchronize do
chunk.synchronize do
metadata = chunk.metadata
metadata.seq = 0 # metadata.seq should be 0 for counting @queued_num
@queue << chunk
@queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1
chunk.enqueued!
Expand Down
2 changes: 1 addition & 1 deletion test/plugin/test_buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,7 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime)
assert_equal 3, s.size
assert_equal [0, 1, 2], s.keys.map(&:seq).sort
assert_equal 1, e.size
assert_equal [2], e.map { |e| e.metadata.seq }
assert_equal [0], e.map { |e| e.metadata.seq }
end
end

Expand Down
23 changes: 17 additions & 6 deletions test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -903,13 +903,26 @@ def create_chunk_es(metadata, es)
# metadata whose seq is 4 is created, but overwrite with original metadata(seq=0) for next use of this chunk https://github.com/fluent/fluentd/blob/9d113029d4550ce576d8825bfa9612aa3e55bff0/lib/fluent/plugin/buffer.rb#L357
assert_equal [@dm0], @p.stage.keys
assert_equal 5400, @p.stage[@dm0].size
r = [@dm0]
3.times { |i| r << r[i].dup_next }
assert_equal [@dm0, *r], @p.queue.map(&:metadata)
assert_equal [@dm0, @dm0, @dm0, @dm0, @dm0], @p.queue.map(&:metadata)
assert_equal [5000, 9900, 9900, 9900, 9900], @p.queue.map(&:size) # splits: 45000 / 100 => 450 * ...
# 9900 * 4 + 5400 == 45000
end

test '#dequeue_chunk succeeds when chunk is splited' do
assert_equal [@dm0], @p.stage.keys
assert_equal [], @p.queue.map(&:metadata)

assert_equal 1_280_000, @p.chunk_limit_size

es = Fluent::ArrayEventStream.new([ [event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * (128 - 22)}] ] * 45000)
@p.write({@dm0 => es}, format: @format)
@p.enqueue_all(true)

dequeued_chunks = 6.times.map { |e| @p.dequeue_chunk } # splits: 45000 / 100 => 450 * ...
assert_equal [5000, 9900, 9900, 9900, 9900, 5400], dequeued_chunks.map(&:size)
assert_equal [@dm0, @dm0, @dm0, @dm0, @dm0, @dm0], dequeued_chunks.map(&:metadata)
end

test '#write raises BufferChunkOverflowError if a record is biggar than chunk limit size' do
assert_equal [@dm0], @p.stage.keys
assert_equal [], @p.queue.map(&:metadata)
Expand Down Expand Up @@ -993,9 +1006,7 @@ def create_chunk_es(metadata, es)

assert_equal [@dm0], @p.stage.keys
assert_equal 900, @p.stage[@dm0].size
r = [@dm0]
4.times { |i| r << r[i].dup_next }
assert_equal r, @p.queue.map(&:metadata)
assert_equal [@dm0, @dm0, @dm0, @dm0, @dm0], @p.queue.map(&:metadata)
assert_equal [9500, 9900, 9900, 9900, 9900], @p.queue.map(&:size) # splits: 45000 / 100 => 450 * ...
##### 900 + 9500 + 9900 * 4 == 5000 + 45000
end
Expand Down

0 comments on commit da256cd

Please sign in to comment.