Skip to content

Commit

Permalink
Merge pull request #3118 from fluent/refactor-buffer-code
Browse files Browse the repository at this point in the history
Refactor buffer code
  • Loading branch information
repeatedly committed Sep 1, 2020
2 parents b809f58 + 251b3d8 commit b5f4a6d
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
22 changes: 15 additions & 7 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ def initialize

@stage_size = @queue_size = 0
@timekeys = Hash.new(0)
@enable_update_timekeys = false
@mutex = Mutex.new
end

Expand All @@ -207,6 +208,10 @@ def configure(conf)
end
end

def enable_update_timekeys
@enable_update_timekeys = true
end

def start
super

Expand Down Expand Up @@ -268,8 +273,9 @@ def new_metadata(timekey: nil, tag: nil, variables: nil)
Metadata.new(timekey, tag, variables)
end

# Keep this method for existing code
def metadata(timekey: nil, tag: nil, variables: nil)
meta = Metadata.new(timekey, tag, variables)
Metadata.new(timekey, tag, variables)
end

def timekeys
Expand Down Expand Up @@ -465,8 +471,10 @@ def enqueue_unstaged_chunk(chunk)

def update_timekeys
synchronize do
@timekeys = (@stage.values + @queue).each_with_object({}) do |chunk, keys|
if chunk && chunk.metadata && chunk.metadata.timekey
chunks = @stage.values
chunks.concat(@queue)
@timekeys = chunks.each_with_object({}) do |chunk, keys|
if chunk.metadata && chunk.metadata.timekey
t = chunk.metadata.timekey
keys[t] = keys.fetch(t, 0) + 1
end
Expand All @@ -477,7 +485,7 @@ def update_timekeys
# At flush_at_shutdown, all staged chunks should be enqueued for buffer flush. Set true to force_enqueue for it.
def enqueue_all(force_enqueue = false)
log.on_trace { log.trace "enqueueing all chunks in buffer", instance: self.object_id }
update_timekeys
update_timekeys if @enable_update_timekeys

if block_given?
synchronize{ @stage.keys }.each do |metadata|
Expand Down Expand Up @@ -787,11 +795,11 @@ def statistics
'total_queued_size' => stage_size + queue_size,
}

if (m = timekeys.min)
tkeys = timekeys
if (m = tkeys.min)
stats['oldest_timekey'] = m
end

if (m = timekeys.max)
if (m = tkeys.max)
stats['newest_timekey'] = m
end

Expand Down
1 change: 1 addition & 0 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ def configure(conf)
buffer_conf = conf.elements(name: 'buffer').first || Fluent::Config::Element.new('buffer', '', {}, [])
@buffer = Plugin.new_buffer(buffer_type, parent: self)
@buffer.configure(buffer_conf)
@buffer.enable_update_timekeys if @chunk_key_time

@flush_at_shutdown = @buffer_config.flush_at_shutdown
if @flush_at_shutdown.nil?
Expand Down

0 comments on commit b5f4a6d

Please sign in to comment.