Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

buffer: Fix calculation of timekey stats #3018

Merged
merged 2 commits into from
Aug 31, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
45 changes: 13 additions & 32 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,13 @@ def start
@stage, @queue = resume
@stage.each_pair do |metadata, chunk|
@stage_size += chunk.bytesize
if chunk.metadata && chunk.metadata.timekey
add_timekey(metadata.timekey)
end
end
@queue.each do |chunk|
@queued_num[chunk.metadata] ||= 0
@queued_num[chunk.metadata] += 1
@queue_size += chunk.bytesize
if chunk.metadata && chunk.metadata.timekey
add_timekey(chunk.metadata.timekey)
end
end
update_timekeys
log.debug "buffer started", instance: self.object_id, stage_size: @stage_size, queue_size: @queue_size
end

Expand Down Expand Up @@ -259,10 +254,6 @@ def new_metadata(timekey: nil, tag: nil, variables: nil)

def metadata(timekey: nil, tag: nil, variables: nil)
meta = Metadata.new(timekey, tag, variables)
if (t = meta.timekey)
add_timekey(t)
end
meta
end

def timekeys
Expand Down Expand Up @@ -456,9 +447,21 @@ def enqueue_unstaged_chunk(chunk)
end
end

def update_timekeys
synchronize do
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The synchronize here is not some Ruby keyword as I imagined. Please take a look - maybe there is something I could do to reduce the lock(s) needed. The method does have to loop through everything in stage and queue, though the consistency of data it reads doesn't matter (e.g. it's okay for the same buffer to be found in stage and queue).

@timekeys = (@stage.values + @queue).each_with_object({}) do |chunk, keys|
if chunk && chunk.metadata && chunk.metadata.timekey
t = chunk.metadata.timekey
keys[t] = keys.fetch(t, 0) + 1
end
end
end
end

# At flush_at_shutdown, all staged chunks should be enqueued for buffer flush. Set true to force_enqueue for it.
def enqueue_all(force_enqueue = false)
log.on_trace { log.trace "enqueueing all chunks in buffer", instance: self.object_id }
update_timekeys

if block_given?
synchronize{ @stage.keys }.each do |metadata|
Expand Down Expand Up @@ -537,10 +540,6 @@ def purge_chunk(chunk_id)
log.trace "chunk purged", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: metadata
end

if metadata && metadata.timekey
del_timekey(metadata.timekey)
end

nil
end

Expand Down Expand Up @@ -789,24 +788,6 @@ def optimistic_queued?(metadata = nil)
!@queue.empty?
end
end

def add_timekey(t)
@mutex.synchronize do
@timekeys[t] += 1
end
nil
end

def del_timekey(t)
@mutex.synchronize do
if @timekeys[t] <= 1
@timekeys.delete(t)
else
@timekeys[t] -= 1
end
end
nil
end
end
end
end