Skip to content

Commit

Permalink
Merge pull request #3018 from jiping-s/fix-buffer-timekey-metrics
Browse files Browse the repository at this point in the history
buffer: Fix calculation of timekey stats
  • Loading branch information
repeatedly committed Aug 31, 2020
2 parents 45a12aa + 5e5e0c3 commit 3832075
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 32 deletions.
45 changes: 13 additions & 32 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -213,18 +213,13 @@ def start
@stage, @queue = resume
@stage.each_pair do |metadata, chunk|
@stage_size += chunk.bytesize
if chunk.metadata && chunk.metadata.timekey
add_timekey(metadata.timekey)
end
end
@queue.each do |chunk|
@queued_num[chunk.metadata] ||= 0
@queued_num[chunk.metadata] += 1
@queue_size += chunk.bytesize
if chunk.metadata && chunk.metadata.timekey
add_timekey(chunk.metadata.timekey)
end
end
update_timekeys
log.debug "buffer started", instance: self.object_id, stage_size: @stage_size, queue_size: @queue_size
end

Expand Down Expand Up @@ -275,10 +270,6 @@ def new_metadata(timekey: nil, tag: nil, variables: nil)

def metadata(timekey: nil, tag: nil, variables: nil)
meta = Metadata.new(timekey, tag, variables)
if (t = meta.timekey)
add_timekey(t)
end
meta
end

def timekeys
Expand Down Expand Up @@ -472,9 +463,21 @@ def enqueue_unstaged_chunk(chunk)
end
end

def update_timekeys
synchronize do
@timekeys = (@stage.values + @queue).each_with_object({}) do |chunk, keys|
if chunk && chunk.metadata && chunk.metadata.timekey
t = chunk.metadata.timekey
keys[t] = keys.fetch(t, 0) + 1
end
end
end
end

# At flush_at_shutdown, all staged chunks should be enqueued for buffer flush. Set true to force_enqueue for it.
def enqueue_all(force_enqueue = false)
log.on_trace { log.trace "enqueueing all chunks in buffer", instance: self.object_id }
update_timekeys

if block_given?
synchronize{ @stage.keys }.each do |metadata|
Expand Down Expand Up @@ -553,10 +556,6 @@ def purge_chunk(chunk_id)
log.trace "chunk purged", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: metadata
end

if metadata && metadata.timekey
del_timekey(metadata.timekey)
end

nil
end

Expand Down Expand Up @@ -809,24 +808,6 @@ def optimistic_queued?(metadata = nil)
!@queue.empty?
end
end

def add_timekey(t)
@mutex.synchronize do
@timekeys[t] += 1
end
nil
end

def del_timekey(t)
@mutex.synchronize do
if @timekeys[t] <= 1
@timekeys.delete(t)
else
@timekeys[t] -= 1
end
end
nil
end
end
end
end
4 changes: 4 additions & 0 deletions test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,8 @@ def create_chunk_es(metadata, es)
assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata)
assert_equal [@dm2,@dm3,m], @p.stage.keys

@p.update_timekeys

assert @p.timekeys.include?(timekey)
end

Expand Down Expand Up @@ -675,6 +677,8 @@ def create_chunk_es(metadata, es)
assert_equal [@dm2,@dm3,m], @p.stage.keys
assert_equal 1, @p.stage[m].append_count

@p.update_timekeys

assert @p.timekeys.include?(timekey)
end

Expand Down

0 comments on commit 3832075

Please sign in to comment.